be in separate files. This way, adding a new set of modules would require
shipping extra data_structures* files instead of modifying the existing one.
For this to work, Event.type couldn't be an enum, it could be an int and use
-#defines or constants defined the specialized data_structures* files.
+#defines or constants defined in the specialized data_structures* files.
Event.event could be a void*.
++ Stage 2: Event matching
mix) and it will influence the overall behavior of the module.
eg. TCP, UDP, MPI
-matchEvent() takes an Event pointer. An actual matching module doesn't have
-to be able to process every type of event. It has to check that the passed
-event is of a type it can process.
+matchEvent() takes an Event pointer. An actual matching module doesn't have to
+be able to process every type of event. It will only be passed events of a
+type it can process (according to the .canMatch field of its MatchingModule
+struct).
++ Communication between stages 2 and 3: event groups
Communication consists of events grouped in Message, Exchange or Broadcast
{
free((ConnectionKey*) data);
}
+
+
+/*
+ * A GHashFunc for g_hash_table_new()
+ *
+ * Args:
+ * key DatagramKey*
+ */
+guint ghfDatagramKeyHash(gconstpointer key)
+{
+ DatagramKey* datagramKey;
+ uint32_t a, b, c;
+
+ datagramKey= (DatagramKey*) key;
+
+ a= datagramKey->saddr;
+ b= datagramKey->daddr;
+ c= datagramKey->source + (datagramKey->dest << 16);
+ mix(a, b, c);
+
+ a+= datagramKey->ulen; // 16 bits left here
+ b+= *((uint32_t*) datagramKey->dataKey);
+ c+= *((uint32_t*) ((void*) datagramKey->dataKey + 4));
+ final(a, b, c);
+
+ return c;
+}
+
+
+/*
+ * A GEqualFunc for g_hash_table_new()
+ *
+ * Args:
+ * a, b DatagramKey*
+ *
+ * Returns:
+ * TRUE if both values are equal
+ */
+gboolean gefDatagramKeyEqual(gconstpointer a, gconstpointer b)
+{
+ const DatagramKey* dA, * dB;
+
+ dA= (DatagramKey*) a;
+ dB= (DatagramKey*) b;
+
+ if (dA->saddr == dB->saddr && dA->daddr == dB->daddr &&
+ dA->source == dB->source && dA->dest == dB->dest &&
+ dA->ulen == dB->ulen &&
+ memcmp(dA->dataKey, dB->dataKey, sizeof(dA->dataKey)) == 0)
+ {
+ return TRUE;
+ }
+ else
+ {
+ return FALSE;
+ }
+}
+
+
+/*
+ * A GDestroyNotify function for g_hash_table_new_full()
+ *
+ * Args:
+ * data: DatagramKey*
+ */
+void gdnDestroyDatagramKey(gpointer data)
+{
+ free((DatagramKey*) data);
+}
+
+
+/*
+ * A GDestroyNotify function for g_hash_table_new_full()
+ *
+ * Args:
+ * data: Broadcast*
+ */
+void gdnDestroyBroadcast(gpointer data)
+{
+ destroyBroadcast((Broadcast*) data);
+}
+
+
+/*
+ * Free a Broadcast struct and its associated ressources
+ *
+ * Args:
+ * broadcast: Broadcast*
+ */
+void destroyBroadcast(Broadcast* const broadcast)
+{
+ g_queue_foreach(broadcast->events, &gfDestroyEvent, NULL);
+ g_queue_clear(broadcast->events);
+ free(broadcast);
+}
+
+
+/*
+ * A GFunc for g_queue_foreach()
+ *
+ * Args:
+ * data Event*
+ * user_data NULL
+ */
+void gfDestroyEvent(gpointer data, gpointer user_data)
+{
+ Event* event= data;
+
+ event->destroy(event);
+}
IN,
};
+enum EventType
+{
+ TCP,
+ UDP,
+ TYPE_COUNT,
+};
+
typedef struct
{
enum Direction direction;
typedef struct
{
uint32_t saddr, daddr;
+ uint16_t source, dest;
+ uint16_t ulen;
uint8_t dataKey[8];
} DatagramKey;
// specific event structures and functions could be in separate files and
// type could be an int
- enum {TCP, UDP} type;
+ enum EventType type;
// event could be a void*, this union is to avoid having to cast
union {
TCPEvent* tcpEvent;
void destroyEvent(Event* const event);
void destroyTCPEvent(Event* const event);
void destroyUDPEvent(Event* const event);
+void gfDestroyEvent(gpointer data, gpointer user_data);
// Message-related functions
void printTCPSegment(const Message* const segment);
// Exchange-related functions
void destroyTCPExchange(Exchange* const exchange);
+
+// Broadcast-related functions
+void gdnDestroyBroadcast(gpointer data);
+void destroyBroadcast(Broadcast* const broadcast);
+
#endif
typedef struct
{
char* name;
+ bool canMatch[TYPE_COUNT];
void (*initMatching)(struct _SyncState* const syncState);
void (*destroyMatching)(struct _SyncState* const syncState);
static MatchingModule matchingModuleTCP = {
.name= "TCP",
+ .canMatch[TCP]= true,
+ .canMatch[UDP]= false,
.initMatching= &initMatchingTCP,
.destroyMatching= &destroyMatchingTCP,
.matchEvent= &matchEventTCP,
* Args:
* syncState container for synchronization data.
* event new event to match
- * eventType type of event to match
*/
static void matchEventTCP(SyncState* const syncState, Event* const event)
{
MatchingDataTCP* matchingData;
+ g_assert(event->type == TCP);
+
matchingData= (MatchingDataTCP*) syncState->matchingData;
if (event->event.tcpEvent->direction == IN)
#include <config.h>
#endif
+#include "data_structures.h"
#include "event_processing_lttng_common.h"
void createQuarks()
{
LTT_CHANNEL_NET= g_quark_from_static_string("net");
- LTT_CHANNEL_NETIF_STATE= g_quark_from_static_string("netif_state");
LTT_EVENT_DEV_XMIT_EXTENDED= g_quark_from_static_string("dev_xmit_extended");
LTT_EVENT_DEV_RECEIVE= g_quark_from_static_string("dev_receive");
LTT_EVENT_TCPV4_RCV_EXTENDED= g_quark_from_static_string("tcpv4_rcv_extended");
- LTT_EVENT_NETWORK_IPV4_INTERFACE=
- g_quark_from_static_string("network_ipv4_interface");
+ LTT_EVENT_UDPV4_RCV_EXTENDED= g_quark_from_static_string("udpv4_rcv_extended");
LTT_FIELD_SKB= g_quark_from_static_string("skb");
LTT_FIELD_PROTOCOL= g_quark_from_static_string("protocol");
LTT_FIELD_RST= g_quark_from_static_string("rst");
LTT_FIELD_SYN= g_quark_from_static_string("syn");
LTT_FIELD_FIN= g_quark_from_static_string("fin");
- LTT_FIELD_NAME= g_quark_from_static_string("name");
- LTT_FIELD_ADDRESS= g_quark_from_static_string("address");
- LTT_FIELD_UP= g_quark_from_static_string("up");
+ LTT_FIELD_UNICAST= g_quark_from_static_string("unicast");
+ LTT_FIELD_ULEN= g_quark_from_static_string("ulen");
+ LTT_FIELD_DATA_START= g_quark_from_static_string("data_start");
}
* hookFunction: call back function when event is encountered
* hookData: data that will be made accessible to hookFunction in
* arg0->hook_data
+ * eventTypes: types of events for which to register hooks
*/
void registerHooks(GArray* hookListList, LttvTracesetContext* const
- traceSetContext, LttvHook hookFunction, gpointer hookData)
+ traceSetContext, LttvHook hookFunction, gpointer hookData, const bool
+ const* eventTypes)
{
unsigned int i, j, k;
unsigned int traceNb= lttv_traceset_number(traceSetContext->ts);
GQuark channelName;
GQuark eventName;
GQuark* fields;
+ bool eventTypes[TYPE_COUNT];
} eventHookInfoList[] = {
{
.channelName= LTT_CHANNEL_NET,
LTT_FIELD_SOURCE, LTT_FIELD_DEST, LTT_FIELD_SEQ,
LTT_FIELD_ACK_SEQ, LTT_FIELD_DOFF, LTT_FIELD_ACK,
LTT_FIELD_RST, LTT_FIELD_SYN, LTT_FIELD_FIN),
+ .eventTypes[TCP]= true,
+ .eventTypes[UDP]= true,
}, {
.channelName= LTT_CHANNEL_NET,
.eventName= LTT_EVENT_DEV_RECEIVE,
.fields= FIELD_ARRAY(LTT_FIELD_SKB, LTT_FIELD_PROTOCOL),
+ .eventTypes[TCP]= true,
+ .eventTypes[UDP]= true,
}, {
.channelName= LTT_CHANNEL_NET,
.eventName= LTT_EVENT_TCPV4_RCV_EXTENDED,
LTT_FIELD_SOURCE, LTT_FIELD_DEST, LTT_FIELD_SEQ,
LTT_FIELD_ACK_SEQ, LTT_FIELD_DOFF, LTT_FIELD_ACK,
LTT_FIELD_RST, LTT_FIELD_SYN, LTT_FIELD_FIN),
+ .eventTypes[TCP]= true,
+ .eventTypes[UDP]= false,
}, {
- .channelName= LTT_CHANNEL_NETIF_STATE,
- .eventName= LTT_EVENT_NETWORK_IPV4_INTERFACE,
- .fields= FIELD_ARRAY(LTT_FIELD_NAME, LTT_FIELD_ADDRESS,
- LTT_FIELD_UP),
+ .channelName= LTT_CHANNEL_NET,
+ .eventName= LTT_EVENT_UDPV4_RCV_EXTENDED,
+ .fields= FIELD_ARRAY(LTT_FIELD_SKB, LTT_FIELD_SADDR,
+ LTT_FIELD_DADDR, LTT_FIELD_UNICAST, LTT_FIELD_ULEN,
+ LTT_FIELD_DATA_START),
+ .eventTypes[TCP]= false,
+ .eventTypes[UDP]= true,
}
}; // This is called a compound literal
unsigned int hookNb= sizeof(eventHookInfoList) / sizeof(*eventHookInfoList);
for (j= 0; j < hookNb; j++)
{
guint old_len;
+ bool registerHook;
+
+ registerHook= true;
+ for (k= 0; k < TYPE_COUNT; k++)
+ {
+ if (eventTypes[k] && eventHookInfoList[j].eventTypes[k] == false)
+ {
+ registerHook= false;
+ break;
+ }
+ }
+ if (!registerHook)
+ {
+ continue;
+ }
old_len= hookList->len;
retval= lttv_trace_find_hook(tc->t,
GQuark
- LTT_CHANNEL_NET,
- LTT_CHANNEL_NETIF_STATE;
+ LTT_CHANNEL_NET;
GQuark
LTT_EVENT_DEV_XMIT_EXTENDED,
LTT_EVENT_DEV_RECEIVE,
LTT_EVENT_TCPV4_RCV_EXTENDED,
- LTT_EVENT_NETWORK_IPV4_INTERFACE;
+ LTT_EVENT_UDPV4_RCV_EXTENDED;
GQuark
LTT_FIELD_SKB,
LTT_FIELD_RST,
LTT_FIELD_SYN,
LTT_FIELD_FIN,
- LTT_FIELD_NAME,
- LTT_FIELD_ADDRESS,
- LTT_FIELD_UP;
+ LTT_FIELD_UNICAST,
+ LTT_FIELD_ULEN,
+ LTT_FIELD_DATA_START;
void createQuarks();
void registerHooks(GArray* hookListList, LttvTracesetContext* const
- traceSetContext, LttvHook hookFunction, gpointer
- hookData);
+ traceSetContext, LttvHook hookFunction, gpointer hookData, const bool
+ const* eventTypes);
void unregisterHooks(GArray* hookListList, LttvTracesetContext* const
traceSetContext);
sizeof(GArray*), syncState->traceNb);
registerHooks(processingData->hookListList, traceSetContext,
- &processEventLTTVNull, syncState);
+ &processEventLTTVNull, syncState,
+ syncState->matchingModule->canMatch);
}
#include <netinet/in.h>
#include <stdint.h>
#include <stdlib.h>
+#include <string.h>
#include "sync_chain_lttv.h"
#include "event_processing_lttng_common.h"
}
registerHooks(processingData->hookListList, traceSetContext,
- &processEventLTTVStandard, syncState);
+ &processEventLTTVStandard, syncState,
+ syncState->matchingModule->canMatch);
}
printf("\treceived frames: %d\n", processingData->stats->totRecv);
printf("\treceived frames that are IP: %d\n",
processingData->stats->totRecvIp);
- printf("\treceived and processed packets that are TCP: %d\n",
- processingData->stats->totInE);
- printf("\tsent packets that are TCP: %d\n",
- processingData->stats->totOutE);
+ if (syncState->matchingModule->canMatch[TCP])
+ {
+ printf("\treceived and processed packets that are TCP: %d\n",
+ processingData->stats->totRecvTCP);
+ }
+ if (syncState->matchingModule->canMatch[UDP])
+ {
+ printf("\treceived and processed packets that are UDP: %d\n",
+ processingData->stats->totRecvUDP);
+ }
+ if (syncState->matchingModule->canMatch[TCP])
+ {
+ printf("\tsent packets that are TCP: %d\n",
+ processingData->stats->totOutE);
+ }
if (syncState->matchingModule->printMatchingStats != NULL)
{
return FALSE;
}
+ if (!syncState->matchingModule->canMatch[TCP])
+ {
+ return FALSE;
+ }
+
if (syncState->stats)
{
processingData->stats->totOutE++;
if (inE == NULL)
{
// This should only happen in case of lost events
- g_debug("No matching pending receive event found\n");
+ g_warning("No matching pending receive event found");
}
else
{
if (syncState->stats)
{
- processingData->stats->totInE++;
+ processingData->stats->totRecvTCP++;
}
// If it's there, remove it and proceed with a receive event
syncState->matchingModule->matchEvent(syncState, inE);
- g_debug("Input event %p for skb %p done\n", inE, skb);
+ g_debug("TCP input event %p for skb %p done\n", inE, skb);
}
}
- else if (info->name == LTT_EVENT_NETWORK_IPV4_INTERFACE)
+ else if (info->name == LTT_EVENT_UDPV4_RCV_EXTENDED)
{
- char* name;
- guint64 address;
- gint64 up;
- char addressString[17];
+ Event* inE;
+ void* skb;
- address= ltt_event_get_long_unsigned(event,
- lttv_trace_get_hook_field(traceHook, 1));
- up= ltt_event_get_long_int(event, lttv_trace_get_hook_field(traceHook,
- 2));
- /* name must be the last field to get or else copy the string, see the
- * doc for ltt_event_get_string()
- */
- name= ltt_event_get_string(event, lttv_trace_get_hook_field(traceHook,
- 0));
+ // Search pendingRecv for an event with the same skb
+ skb= (void*) (long) ltt_event_get_long_unsigned(event,
+ lttv_trace_get_hook_field(traceHook, 0));
- convertIP(addressString, address);
+ inE= (Event*)
+ g_hash_table_lookup(processingData->pendingRecv[traceNum], skb);
+ if (inE == NULL)
+ {
+ // This should only happen in case of lost events
+ g_warning("No matching pending receive event found");
+ }
+ else
+ {
+ guint64 dataStart;
- g_debug("name \"%s\" address %s up %lld\n", name, addressString, up);
+ if (syncState->stats)
+ {
+ processingData->stats->totRecvUDP++;
+ }
+
+ // If it's there, remove it and proceed with a receive event
+ g_hash_table_steal(processingData->pendingRecv[traceNum], skb);
+
+ inE->type= UDP;
+ inE->event.udpEvent= malloc(sizeof(UDPEvent));
+ inE->destroy= &destroyUDPEvent;
+ inE->event.udpEvent->direction= IN;
+ inE->event.udpEvent->datagramKey= malloc(sizeof(DatagramKey));
+ inE->event.udpEvent->datagramKey->saddr=
+ ltt_event_get_unsigned(event,
+ lttv_trace_get_hook_field(traceHook, 1));
+ inE->event.udpEvent->datagramKey->daddr=
+ ltt_event_get_unsigned(event,
+ lttv_trace_get_hook_field(traceHook, 2));
+ inE->event.udpEvent->unicast= ltt_event_get_unsigned(event,
+ lttv_trace_get_hook_field(traceHook, 3)) == 0 ? false : true;
+ inE->event.udpEvent->datagramKey->ulen=
+ ltt_event_get_unsigned(event,
+ lttv_trace_get_hook_field(traceHook, 4));
+ inE->event.udpEvent->datagramKey->source=
+ ltt_event_get_unsigned(event,
+ lttv_trace_get_hook_field(traceHook, 5));
+ inE->event.udpEvent->datagramKey->dest=
+ ltt_event_get_unsigned(event,
+ lttv_trace_get_hook_field(traceHook, 6));
+ dataStart= ltt_event_get_long_unsigned(event,
+ lttv_trace_get_hook_field(traceHook, 7));
+ g_assert_cmpuint(sizeof(inE->event.udpEvent->datagramKey->dataKey),
+ ==, sizeof(guint64));
+ if (inE->event.udpEvent->datagramKey->ulen - 8 >=
+ sizeof(inE->event.udpEvent->datagramKey->dataKey))
+ {
+ memcpy(inE->event.udpEvent->datagramKey->dataKey, &dataStart,
+ sizeof(inE->event.udpEvent->datagramKey->dataKey));
+ }
+ else
+ {
+ memset(inE->event.udpEvent->datagramKey->dataKey, 0,
+ sizeof(inE->event.udpEvent->datagramKey->dataKey));
+ memcpy(inE->event.udpEvent->datagramKey->dataKey, &dataStart,
+ inE->event.udpEvent->datagramKey->ulen - 8);
+ }
+
+ syncState->matchingModule->matchEvent(syncState, inE);
+
+ g_debug("UDP input event %p for skb %p done\n", inE, skb);
+ }
}
else
{
{
int totRecv,
totRecvIp,
- totInE,
+ totRecvTCP,
+ totRecvUDP,
totOutE;
} ProcessingStatsLTTVStandard;
syncState->processingModule= (ProcessingModule*) result->data;
graphsStream= NULL;
- if (syncState->graphs)
+ if (syncState->graphs &&
+ syncState->processingModule->writeProcessingGraphsPlots != NULL)
{
// Create the graph directory right away in case the module initialization
// functions have something to write in it.
cwd= changeToGraphDir(syncState->graphs);
- if (syncState->processingModule->writeProcessingGraphsPlots != NULL)
+ if ((graphsFp= open("graphs.gnu", O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR |
+ S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP | S_IROTH
+ | S_IWOTH | S_IXOTH)) == -1)
{
- if ((graphsFp= open("graphs.gnu", O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR |
- S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP | S_IROTH
- | S_IWOTH | S_IXOTH)) == -1)
- {
- g_error(strerror(errno));
- }
- if ((graphsStream= fdopen(graphsFp, "w")) == NULL)
- {
- g_error(strerror(errno));
- }
+ g_error(strerror(errno));
+ }
+ if ((graphsStream= fdopen(graphsFp, "w")) == NULL)
+ {
+ g_error(strerror(errno));
}
retval= chdir(cwd);
free(cwd);
}
- syncState->processingModule->initProcessing(syncState, traceSetContext);
+ // Identify matching and analysis modules
+ g_assert(g_queue_get_length(&matchingModules) == 1);
+ syncState->matchingModule= (MatchingModule*)
+ g_queue_peek_head(&matchingModules);
- // Identify and initialize matching and analysis modules
- syncState->matchingData= NULL;
- syncState->analysisData= NULL;
- if (optionSyncNull)
+ result= g_queue_find_custom(&analysisModules, optionSyncAnalysis,
+ &gcfCompareAnalysis);
+ if (result != NULL)
{
- syncState->matchingModule= NULL;
- syncState->analysisModule= NULL;
+ syncState->analysisModule= (AnalysisModule*) result->data;
}
else
{
- g_assert(g_queue_get_length(&matchingModules) == 1);
- syncState->matchingModule= (MatchingModule*)
- g_queue_peek_head(&matchingModules);
- syncState->matchingModule->initMatching(syncState);
+ g_error("Analysis module '%s' not found", optionSyncAnalysis);
+ }
- result= g_queue_find_custom(&analysisModules, optionSyncAnalysis,
- &gcfCompareAnalysis);
- if (result != NULL)
- {
- syncState->analysisModule= (AnalysisModule*) result->data;
- syncState->analysisModule->initAnalysis(syncState);
- }
- else
- {
- g_error("Analysis module '%s' not found", optionSyncAnalysis);
- }
+ syncState->processingModule->initProcessing(syncState, traceSetContext);
+
+ syncState->matchingData= NULL;
+ syncState->analysisData= NULL;
+
+ if (!optionSyncNull)
+ {
+ syncState->matchingModule->initMatching(syncState);
+ syncState->analysisModule->initAnalysis(syncState);
}
// Process traceset