From: Benjamin Poirier Date: Tue, 3 Nov 2009 16:31:07 +0000 (-0500) Subject: Add a module to distribute messages to many analysis modules X-Git-Tag: v0.12.26~42 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=d4721e1a5216f34570d7e10257f85601cb3991bc;p=lttv.git Add a module to distribute messages to many analysis modules Also perform RTT analysis in the eval module. Signed-off-by: Benjamin Poirier --- diff --git a/lttv/lttv/Makefile.am b/lttv/lttv/Makefile.am index f81466f1..791bf989 100644 --- a/lttv/lttv/Makefile.am +++ b/lttv/lttv/Makefile.am @@ -60,11 +60,12 @@ lttv_real_SOURCES = \ sync/event_processing_lttng_common.c\ sync/event_processing_lttng_standard.c\ sync/event_processing_lttng_null.c\ - sync/event_matching_tcp.c\ sync/event_matching_broadcast.c\ - sync/event_analysis_linreg.c\ + sync/event_matching_distributor.c\ + sync/event_matching_tcp.c\ sync/event_analysis_chull.c\ - sync/event_analysis_eval.c + sync/event_analysis_eval.c\ + sync/event_analysis_linreg.c lttvinclude_HEADERS = \ attribute.h\ diff --git a/lttv/lttv/sync/Makefile.am b/lttv/lttv/sync/Makefile.am index 7c28a548..84562f98 100644 --- a/lttv/lttv/sync/Makefile.am +++ b/lttv/lttv/sync/Makefile.am @@ -7,7 +7,8 @@ unittest_SOURCES = \ unittest.c\ data_structures.c\ event_matching_broadcast.c\ + event_matching_distributor.c\ event_matching_tcp.c\ - event_analysis_linreg.c\ event_analysis_chull.c\ - event_analysis_eval.c + event_analysis_eval.c\ + event_analysis_linreg.c diff --git a/lttv/lttv/sync/data_structures.c b/lttv/lttv/sync/data_structures.c index d4c804b6..c4d4d966 100644 --- a/lttv/lttv/sync/data_structures.c +++ b/lttv/lttv/sync/data_structures.c @@ -99,15 +99,12 @@ bool isAcking(const Message* const ackSegment, const Message* const * Convert an IP address from 32 bit form to dotted quad * * Args: - * str: A preallocated string of length >= 17 + * str: A preallocated string of length >= 16 * addr: Address */ void convertIP(char* const str, const uint32_t addr) { - struct in_addr iaddr; - - iaddr.s_addr= htonl(addr); - strcpy(str, inet_ntoa(iaddr)); + strcpy(str, inet_ntoa((struct in_addr) {.s_addr= addr})); } @@ -116,7 +113,7 @@ void convertIP(char* const str, const uint32_t addr) */ void printTCPSegment(const Message* const segment) { - char saddr[17], daddr[17]; + char saddr[16], daddr[16]; SegmentKey* segmentKey; g_assert(segment->inE->type == TCP); @@ -329,6 +326,7 @@ void destroyTCPEvent(Event* const event) destroyEvent(event); } + /* * Free the memory used by a base Event */ @@ -545,7 +543,7 @@ void gdnDestroyBroadcast(gpointer data) void destroyBroadcast(Broadcast* const broadcast) { g_queue_foreach(broadcast->events, &gfDestroyEvent, NULL); - g_queue_clear(broadcast->events); + g_queue_free(broadcast->events); free(broadcast); } @@ -576,5 +574,69 @@ void gfDestroyEvent(gpointer data, gpointer user_data) */ double wallTimeSub(const WallTime const* tA, const WallTime const* tB) { - return tA->seconds - tB->seconds + (tA->nanosec - tB->nanosec) / 1e9; + return (double) tA->seconds - tB->seconds + ((double) tA->nanosec - tB->nanosec) / 1e9; +} + + +/* + * Allocate and copy a base event + * + * Args: + * newEvent: new event, pointer will be updated + * event: event to copy + */ +void copyEvent(const Event* const event, Event** const newEvent) +{ + g_assert(event->event.tcpEvent == NULL); + + *newEvent= malloc(sizeof(Event)); + memcpy(*newEvent, event, sizeof(Event)); +} + + +/* + * Allocate and copy a TCP event + * + * Args: + * newEvent: new event, pointer will be updated + * event: event to copy + */ +void copyTCPEvent(const Event* const event, Event** const newEvent) +{ + g_assert(event->type == TCP); + + *newEvent= malloc(sizeof(Event)); + memcpy(*newEvent, event, sizeof(Event)); + + (*newEvent)->event.tcpEvent= malloc(sizeof(TCPEvent)); + memcpy((*newEvent)->event.tcpEvent, event->event.tcpEvent, + sizeof(TCPEvent)); + + (*newEvent)->event.tcpEvent->segmentKey= malloc(sizeof(SegmentKey)); + memcpy((*newEvent)->event.tcpEvent->segmentKey, + event->event.tcpEvent->segmentKey, sizeof(SegmentKey)); +} + + +/* + * Allocate and copy a UDP event + * + * Args: + * newEvent: new event, pointer will be updated + * event: event to copy + */ +void copyUDPEvent(const Event* const event, Event** const newEvent) +{ + g_assert(event->type == UDP); + + *newEvent= malloc(sizeof(Event)); + memcpy(*newEvent, event, sizeof(Event)); + + (*newEvent)->event.udpEvent= malloc(sizeof(UDPEvent)); + memcpy((*newEvent)->event.udpEvent, event->event.udpEvent, + sizeof(UDPEvent)); + + (*newEvent)->event.udpEvent->datagramKey= malloc(sizeof(DatagramKey)); + memcpy((*newEvent)->event.udpEvent->datagramKey, + event->event.udpEvent->datagramKey, sizeof(DatagramKey)); } diff --git a/lttv/lttv/sync/data_structures.h b/lttv/lttv/sync/data_structures.h index 1f70ade4..b6bc8980 100644 --- a/lttv/lttv/sync/data_structures.h +++ b/lttv/lttv/sync/data_structures.h @@ -100,6 +100,7 @@ typedef struct _Event UDPEvent* udpEvent; } event; + void (*copy)(const struct _Event* const event, struct _Event** const newEvent); void (*destroy)(struct _Event* const event); } Event; @@ -149,6 +150,9 @@ void gdnDestroyDatagramKey(gpointer data); // Event-related functions void gdnDestroyEvent(gpointer data); +void copyEvent(const Event* const event, Event** const newEvent); +void copyTCPEvent(const Event* const event, Event** const newEvent); +void copyUDPEvent(const Event* const event, Event** const newEvent); void destroyEvent(Event* const event); void destroyTCPEvent(Event* const event); void destroyUDPEvent(Event* const event); diff --git a/lttv/lttv/sync/event_analysis_eval.c b/lttv/lttv/sync/event_analysis_eval.c index 49f3e177..cfe387a9 100644 --- a/lttv/lttv/sync/event_analysis_eval.c +++ b/lttv/lttv/sync/event_analysis_eval.c @@ -17,6 +17,7 @@ */ #define _GNU_SOURCE +#define _ISOC99_SOURCE #ifdef HAVE_CONFIG_H #include @@ -62,6 +63,7 @@ static void positionStream(FILE* stream); static void gfSum(gpointer data, gpointer userData); static void gfSumSquares(gpointer data, gpointer userData); +static void ghfPrintExchangeRtt(gpointer key, gpointer value, gpointer user_data); static AnalysisModule analysisModuleEval= { @@ -147,6 +149,10 @@ static void initAnalysisEval(SyncState* const syncState) analysisData->stats->messageStats[i]= calloc(syncState->traceNb, sizeof(MessageStats)); } + + analysisData->stats->exchangeRtt= + g_hash_table_new_full(&ghfRttKeyHash, &gefRttKeyEqual, + &gdnDestroyRttKey, &gdnDestroyDouble); } } @@ -181,6 +187,9 @@ static void destroyAnalysisEval(SyncState* const syncState) free(analysisData->stats->messageStats[i]); } free(analysisData->stats->messageStats); + + g_hash_table_destroy(analysisData->stats->exchangeRtt); + free(analysisData->stats); } @@ -202,7 +211,7 @@ static void analyzeMessageEval(SyncState* const syncState, Message* const messag { AnalysisDataEval* analysisData; MessageStats* messageStats; - double* rttInfo; + double* rtt; double tt; struct RttKey rttKey; @@ -223,14 +232,19 @@ static void analyzeMessageEval(SyncState* const syncState, Message* const messag messageStats->inversionNb++; } - g_assert(message->inE->type == UDP); - rttKey.saddr= message->inE->event.udpEvent->datagramKey->saddr; - rttKey.daddr= message->inE->event.udpEvent->datagramKey->daddr; - rttInfo= g_hash_table_lookup(analysisData->rttInfo, &rttKey); + g_assert(message->inE->type == TCP); + rttKey.saddr= + message->inE->event.tcpEvent->segmentKey->connectionKey.saddr; + rttKey.daddr= + message->inE->event.tcpEvent->segmentKey->connectionKey.daddr; + rtt= g_hash_table_lookup(analysisData->rttInfo, &rttKey); + g_debug("rttInfo, looking up (%u, %u)->(%f)", rttKey.saddr, + rttKey.daddr, rtt ? *rtt : NAN); - if (rttInfo) + if (rtt) { - if (tt < *rttInfo / 2.) + g_debug("rttInfo, tt: %f rtt / 2: %f", tt, *rtt / 2.); + if (tt < *rtt / 2.) { messageStats->tooFastNb++; } @@ -253,9 +267,44 @@ static void analyzeMessageEval(SyncState* const syncState, Message* const messag */ static void analyzeExchangeEval(SyncState* const syncState, Exchange* const exchange) { - AnalysisDataEval* analysisData; + AnalysisDataEval* analysisData= syncState->analysisData; + Message* m1= g_queue_peek_tail(exchange->acks); + Message* m2= exchange->message; + struct RttKey* rttKey; + double* rtt, * exchangeRtt; - analysisData= (AnalysisDataEval*) syncState->analysisData; + if (!syncState->stats) + { + return; + } + + // (T2 - T1) - (T3 - T4) + rtt= malloc(sizeof(double)); + *rtt= wallTimeSub(&m1->inE->wallTime, &m1->outE->wallTime) - + wallTimeSub(&m2->outE->wallTime, &m2->inE->wallTime); + + g_assert(m1->inE->type == TCP); + rttKey= malloc(sizeof(struct RttKey)); + rttKey->saddr= + MIN(m1->inE->event.tcpEvent->segmentKey->connectionKey.saddr, + m1->inE->event.tcpEvent->segmentKey->connectionKey.daddr); + rttKey->daddr= + MAX(m1->inE->event.tcpEvent->segmentKey->connectionKey.saddr, + m1->inE->event.tcpEvent->segmentKey->connectionKey.daddr); + exchangeRtt= g_hash_table_lookup(analysisData->stats->exchangeRtt, + rttKey); + + if (exchangeRtt) + { + if (*rtt < *exchangeRtt) + { + g_hash_table_replace(analysisData->stats->exchangeRtt, rttKey, rtt); + } + } + else + { + g_hash_table_insert(analysisData->stats->exchangeRtt, rttKey, rtt); + } } @@ -305,7 +354,7 @@ static void analyzeBroadcastEval(SyncState* const syncState, Broadcast* const br * syncState container for synchronization data. * * Returns: - * Factors[traceNb] synchronization factors for each trace + * Factors[traceNb] identity factors for each trace */ static GArray* finalizeAnalysisEval(SyncState* const syncState) { @@ -375,6 +424,58 @@ static void printAnalysisStatsEval(SyncState* const syncState) messageStats->noRTTInfoNb, messageStats->total); } } + + printf("\tRound-trip times:\n" + "\t\tHost pair RTT from exchanges RTTs from file (ms)\n"); + g_hash_table_foreach(analysisData->stats->exchangeRtt, + &ghfPrintExchangeRtt, analysisData->rttInfo); +} + + +/* + * A GHFunc for g_hash_table_foreach() + * + * Args: + * key: RttKey* where saddr < daddr + * value: double*, RTT estimated from exchanges + * user_data GHashTable* rttInfo + */ +static void ghfPrintExchangeRtt(gpointer key, gpointer value, gpointer user_data) +{ + char addr1[16], addr2[16]; + struct RttKey* rttKey1= key; + struct RttKey rttKey2= {rttKey1->daddr, rttKey1->saddr}; + double* fileRtt1, *fileRtt2; + GHashTable* rttInfo= user_data; + + convertIP(addr1, rttKey1->saddr); + convertIP(addr2, rttKey1->daddr); + + fileRtt1= g_hash_table_lookup(rttInfo, rttKey1); + fileRtt2= g_hash_table_lookup(rttInfo, &rttKey2); + + printf("\t\t(%15s, %-15s) %-18.3f ", addr1, addr2, *(double*) value * 1e3); + + if (fileRtt1 || fileRtt2) + { + if (fileRtt1) + { + printf("%.3f", *fileRtt1 * 1e3); + } + if (fileRtt1 && fileRtt2) + { + printf(", "); + } + if (fileRtt2) + { + printf("%.3f", *fileRtt2 * 1e3); + } + } + else + { + printf("-"); + } + printf("\n"); } @@ -520,6 +621,8 @@ static void readRttInfo(GHashTable* rttInfo, FILE* rttStream) } *rtt/= 1e3; + g_debug("rttInfo, Inserting (%u, %u)->(%f)", rttKey->saddr, + rttKey->daddr, *rtt); g_hash_table_insert(rttInfo, rttKey, rtt); positionStream(rttStream); diff --git a/lttv/lttv/sync/event_analysis_eval.h b/lttv/lttv/sync/event_analysis_eval.h index 1515bec9..159ba615 100644 --- a/lttv/lttv/sync/event_analysis_eval.h +++ b/lttv/lttv/sync/event_analysis_eval.h @@ -24,6 +24,11 @@ #include "data_structures.h" +struct RttKey +{ + uint32_t saddr, daddr; +}; + typedef struct { unsigned int inversionNb, @@ -38,13 +43,12 @@ typedef struct unsigned int broadcastNb; MessageStats** messageStats; + /* double* exchangeRtt[RttKey] + * For this table, saddr and daddr are swapped as necessary such that + * saddr < daddr */ + GHashTable* exchangeRtt; } AnalysisStatsEval; -struct RttKey -{ - uint32_t saddr, daddr; -}; - typedef struct { // double* rttInfo[RttKey] diff --git a/lttv/lttv/sync/event_matching_broadcast.c b/lttv/lttv/sync/event_matching_broadcast.c index fbec98ec..32112014 100644 --- a/lttv/lttv/sync/event_matching_broadcast.c +++ b/lttv/lttv/sync/event_matching_broadcast.c @@ -198,13 +198,14 @@ static void matchEventBroadcast(SyncState* const syncState, Event* const event) matchingData->stats->totReceive++; } - // s'il est déjà dans pendingBroadcasts - // l'ajouter à son broadcast - // s'il y a traceNb éléments - // le retirer de pending et le livrer à analysis - // détruire le broadcast (et ses éléments) - // sinon - // créer un broadcast et l'ajouter à pending + /* if event in pendingBroadcasts: + * add it to its broadcast + * if this broadcast has traceNb events: + * remove it from pending and deliver it to analysis + * destroy the broadcast (and its elements) + * else: + * create a broadcast and add it to pending + */ result= g_hash_table_lookup_extended(matchingData->pendingBroadcasts, diff --git a/lttv/lttv/sync/event_matching_distributor.c b/lttv/lttv/sync/event_matching_distributor.c new file mode 100644 index 00000000..b9d74051 --- /dev/null +++ b/lttv/lttv/sync/event_matching_distributor.c @@ -0,0 +1,369 @@ +/* This file is part of the Linux Trace Toolkit viewer + * Copyright (C) 2009 Benjamin Poirier + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License Version 2 as + * published by the Free Software Foundation; + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, + * MA 02111-1307, USA. + */ + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include +#include + +#include "event_analysis.h" +#include "sync_chain.h" + +#include "event_matching_distributor.h" + + +struct InitAggregate +{ + SyncState* syncState; + GQueue* matchingModules; +}; + + +struct GraphAggregate +{ + /* Offset whithin Matching module of the field* containing the function + * pointer */ + size_t offset; + FILE* stream; + unsigned int i, j; +}; + + +// Functions common to all matching modules +static void initMatchingDistributor(SyncState* const syncState); +static void destroyMatchingDistributor(SyncState* const syncState); + +static void matchEventDistributor(SyncState* const syncState, Event* const + event); +static GArray* finalizeMatchingDistributor(SyncState* const syncState); +static void printMatchingStatsDistributor(SyncState* const syncState); +static void writeMatchingGraphsPlotsDistributor(FILE* stream, SyncState* const + syncState, const unsigned int i, const unsigned int j); +static void writeMatchingGraphsOptionsDistributor(FILE* stream, SyncState* + const syncState, const unsigned int i, const unsigned int j); + +// Functions specific to this module +static void registerMatchingDistributor() __attribute__((constructor (101))); + +void gfInitModule(gpointer data, gpointer user_data); +void gfDestroyModule(gpointer data, gpointer user_data); +void gfMatchEvent(gpointer data, gpointer user_data); +void gfFinalize(gpointer data, gpointer user_data); +void gfPrintStats(gpointer data, gpointer user_data); +void gfGraphFunctionCall(gpointer data, gpointer user_data); + + +static MatchingModule matchingModuleDistributor = { + .name= "distributor", + .canMatch[TCP]= true, + .canMatch[UDP]= true, + .initMatching= &initMatchingDistributor, + .destroyMatching= &destroyMatchingDistributor, + .matchEvent= &matchEventDistributor, + .finalizeMatching= &finalizeMatchingDistributor, + .printMatchingStats= &printMatchingStatsDistributor, + .writeMatchingGraphsPlots= &writeMatchingGraphsPlotsDistributor, + .writeMatchingGraphsOptions= &writeMatchingGraphsOptionsDistributor, +}; + + +/* + * Matching module registering function + */ +static void registerMatchingDistributor() +{ + g_queue_push_tail(&matchingModules, &matchingModuleDistributor); +} + + +/* + * Matching init function + * + * This function is called at the beginning of a synchronization run for a set + * of traces. + * + * Build the list and initialize other matching Modules + * + * Args: + * syncState container for synchronization data. + */ +static void initMatchingDistributor(SyncState* const syncState) +{ + MatchingDataDistributor* matchingData; + + matchingData= malloc(sizeof(MatchingDataDistributor)); + syncState->matchingData= matchingData; + + matchingData->distributedModules= g_queue_new(); + g_queue_foreach(&matchingModules, &gfInitModule, &(struct InitAggregate) + {syncState, matchingData->distributedModules}); +} + + +/* + * Matching destroy function + * + * Destroy other modules and free the matching specific data structures + * + * Args: + * syncState container for synchronization data. + */ +static void destroyMatchingDistributor(SyncState* const syncState) +{ + MatchingDataDistributor* matchingData= syncState->matchingData; + + g_queue_foreach(matchingData->distributedModules, &gfDestroyModule, NULL); + + g_queue_clear(matchingData->distributedModules); + free(syncState->matchingData); + syncState->matchingData= NULL; +} + + + +/* + * Copy event and distribute to matching modules + * + * Args: + * syncState container for synchronization data. + * event new event to match + */ +static void matchEventDistributor(SyncState* const syncState, Event* const event) +{ + MatchingDataDistributor* matchingData= syncState->matchingData; + + g_queue_foreach(matchingData->distributedModules, &gfMatchEvent, event); + event->destroy(event); +} + + +/* + * Call the distributed finalization functions and return identity factors + * + * Args: + * syncState container for synchronization data. + * + * Returns: + * Factors[traceNb] identity factors for each trace + */ +static GArray* finalizeMatchingDistributor(SyncState* const syncState) +{ + GArray* factors; + unsigned int i; + MatchingDataDistributor* matchingData= syncState->matchingData; + + g_queue_foreach(matchingData->distributedModules, &gfFinalize, NULL); + + factors= g_array_sized_new(FALSE, FALSE, sizeof(Factors), + syncState->traceNb); + g_array_set_size(factors, syncState->traceNb); + for (i= 0; i < syncState->traceNb; i++) + { + Factors* e; + + e= &g_array_index(factors, Factors, i); + e->drift= 1.; + e->offset= 0.; + } + + return factors; +} + + +/* + * Call the distributed statistics functions (when they exist). Must be called + * after finalizeMatching. + * + * Args: + * syncState container for synchronization data. + */ +static void printMatchingStatsDistributor(SyncState* const syncState) +{ + MatchingDataDistributor* matchingData= syncState->matchingData; + + g_queue_foreach(matchingData->distributedModules, &gfPrintStats, NULL); +} + + +/* + * Call the distributed graph lines functions (when they exist). + * + * Args: + * stream: stream where to write the data + * syncState: container for synchronization data + * i: first trace number + * j: second trace number, garanteed to be larger than i + */ +static void writeMatchingGraphsPlotsDistributor(FILE* stream, SyncState* const + syncState, const unsigned int i, const unsigned int j) +{ + MatchingDataDistributor* matchingData= syncState->matchingData; + + g_queue_foreach(matchingData->distributedModules, &gfGraphFunctionCall, + &(struct GraphAggregate) {offsetof(MatchingModule, + writeMatchingGraphsPlots), stream, i, j}); +} + + +/* + * Call the distributed graph options functions (when they exist). + * + * Args: + * stream: stream where to write the data + * syncState: container for synchronization data + * i: first trace number + * j: second trace number, garanteed to be larger than i + */ +static void writeMatchingGraphsOptionsDistributor(FILE* stream, SyncState* + const syncState, const unsigned int i, const unsigned int j) +{ + MatchingDataDistributor* matchingData= syncState->matchingData; + + g_queue_foreach(matchingData->distributedModules, &gfGraphFunctionCall, + &(struct GraphAggregate) {offsetof(MatchingModule, + writeMatchingGraphsOptions), stream, i, j}); +} + + +/* + * A GFunc for g_queue_foreach() + * + * Add and initialize matching module + * + * Args: + * data MatchingModule*, module to add + * user_data InitAggregate* + */ +void gfInitModule(gpointer data, gpointer user_data) +{ + SyncState* parallelSS; + struct InitAggregate* aggregate= user_data; + MatchingModule* matchingModule= data; + + if (strcmp(matchingModule->name, matchingModuleDistributor.name) == 0) + { + return; + } + + parallelSS= malloc(sizeof(SyncState)); + memcpy(parallelSS, aggregate->syncState, sizeof(SyncState)); + g_queue_push_tail(aggregate->matchingModules, parallelSS); + + parallelSS->matchingModule= matchingModule; + parallelSS->matchingModule->initMatching(parallelSS); +} + + +/* + * A GFunc for g_queue_foreach() + * + * Destroy and remove matching module + * + * Args: + * data SyncState* containing the module to destroy + * user_data NULL + */ +void gfDestroyModule(gpointer data, gpointer user_data) +{ + SyncState* parallelSS= data; + + parallelSS->matchingModule->destroyMatching(parallelSS); + free(parallelSS); +} + + +/* + * A GFunc for g_queue_foreach() + * + * Args: + * data SyncState* containing the distributed matching module + * user_data Event* original event + */ +void gfMatchEvent(gpointer data, gpointer user_data) +{ + SyncState* parallelSS= data; + const Event* event= user_data; + Event* newEvent; + + if (parallelSS->matchingModule->canMatch[event->type]) + { + event->copy(event, &newEvent); + parallelSS->matchingModule->matchEvent(parallelSS, newEvent); + } +} + + +/* + * A GFunc for g_queue_foreach() + * + * Args: + * data SyncState* containing the distributed matching module + * user_data NULL + */ +void gfFinalize(gpointer data, gpointer user_data) +{ + GArray* factors; + SyncState* parallelSS= data; + + factors= parallelSS->matchingModule->finalizeMatching(parallelSS); + g_array_free(factors, TRUE); +} + + +/* + * A GFunc for g_queue_foreach() + * + * Args: + * data SyncState* containing the distributed matching module + * user_data NULL + */ +void gfPrintStats(gpointer data, gpointer user_data) +{ + SyncState* parallelSS= data; + + if (parallelSS->matchingModule->printMatchingStats != NULL) + { + parallelSS->matchingModule->printMatchingStats(parallelSS); + } +} + + +/* + * A GFunc for g_queue_foreach() + * + * Call a certain matching function + * + * Args: + * data SyncState* containing the distributed matching module + * user_data size_t, + */ +void gfGraphFunctionCall(gpointer data, gpointer user_data) +{ + SyncState* parallelSS= data; + struct GraphAggregate* aggregate= user_data; + void (*graphFunction)(FILE* , struct _SyncState*, const unsigned int, + const unsigned int)= (void*) data + (size_t) aggregate->offset; + + if (graphFunction != NULL) + { + graphFunction(aggregate->stream, parallelSS, aggregate->i, aggregate->j); + } +} diff --git a/lttv/lttv/sync/event_matching_distributor.h b/lttv/lttv/sync/event_matching_distributor.h new file mode 100644 index 00000000..b4399e5c --- /dev/null +++ b/lttv/lttv/sync/event_matching_distributor.h @@ -0,0 +1,34 @@ +/* This file is part of the Linux Trace Toolkit viewer + * Copyright (C) 2009 Benjamin Poirier + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License Version 2 as + * published by the Free Software Foundation; + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, + * MA 02111-1307, USA. + */ + +#ifndef EVENT_MATCHING_DISTRIBUTOR_H +#define EVENT_MATCHING_DISTRIBUTOR_H + +#include + +#include "data_structures.h" +#include "sync_chain.h" + + +typedef struct +{ + // SyncState* distributedModules[] + GQueue* distributedModules; +} MatchingDataDistributor; + +#endif diff --git a/lttv/lttv/sync/event_processing_lttng_common.c b/lttv/lttv/sync/event_processing_lttng_common.c index 8f1d2dcb..f25e0b61 100644 --- a/lttv/lttv/sync/event_processing_lttng_common.c +++ b/lttv/lttv/sync/event_processing_lttng_common.c @@ -147,12 +147,12 @@ void registerHooks(GArray* hookListList, LttvTracesetContext* const guint old_len; bool registerHook; - registerHook= true; + registerHook= false; for (k= 0; k < TYPE_COUNT; k++) { - if (eventTypes[k] && eventHookInfoList[j].eventTypes[k] == false) + if (eventTypes[k] && eventHookInfoList[j].eventTypes[k]) { - registerHook= false; + registerHook= true; break; } } diff --git a/lttv/lttv/sync/event_processing_lttng_standard.c b/lttv/lttv/sync/event_processing_lttng_standard.c index af1c93bb..b79ca4af 100644 --- a/lttv/lttv/sync/event_processing_lttng_standard.c +++ b/lttv/lttv/sync/event_processing_lttng_standard.c @@ -425,16 +425,17 @@ static gboolean processEventLTTVStandard(void* hookData, void* callData) outE->cpuTime= tsc; outE->wallTime= wTime; outE->type= TCP; + outE->copy= ©TCPEvent; outE->destroy= &destroyTCPEvent; outE->event.tcpEvent= malloc(sizeof(TCPEvent)); outE->event.tcpEvent->direction= OUT; outE->event.tcpEvent->segmentKey= malloc(sizeof(SegmentKey)); outE->event.tcpEvent->segmentKey->connectionKey.saddr= - ltt_event_get_unsigned(event, lttv_trace_get_hook_field(traceHook, - 3)); + htonl(ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 3))); outE->event.tcpEvent->segmentKey->connectionKey.daddr= - ltt_event_get_unsigned(event, lttv_trace_get_hook_field(traceHook, - 4)); + htonl(ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 4))); outE->event.tcpEvent->segmentKey->tot_len= ltt_event_get_unsigned(event, lttv_trace_get_hook_field(traceHook, 5)); @@ -493,6 +494,7 @@ static gboolean processEventLTTVStandard(void* hookData, void* callData) inE->cpuTime= tsc; inE->wallTime= wTime; inE->event.tcpEvent= NULL; + inE->copy= ©Event; inE->destroy= &destroyEvent; skb= (void*) (long) ltt_event_get_long_unsigned(event, @@ -535,15 +537,16 @@ static gboolean processEventLTTVStandard(void* hookData, void* callData) inE->type= TCP; inE->event.tcpEvent= malloc(sizeof(TCPEvent)); + inE->copy= ©TCPEvent; inE->destroy= &destroyTCPEvent; inE->event.tcpEvent->direction= IN; inE->event.tcpEvent->segmentKey= malloc(sizeof(SegmentKey)); inE->event.tcpEvent->segmentKey->connectionKey.saddr= - ltt_event_get_unsigned(event, - lttv_trace_get_hook_field(traceHook, 1)); + htonl(ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 1))); inE->event.tcpEvent->segmentKey->connectionKey.daddr= - ltt_event_get_unsigned(event, - lttv_trace_get_hook_field(traceHook, 2)); + htonl(ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 2))); inE->event.tcpEvent->segmentKey->tot_len= ltt_event_get_unsigned(event, lttv_trace_get_hook_field(traceHook, 3)); @@ -613,15 +616,16 @@ static gboolean processEventLTTVStandard(void* hookData, void* callData) inE->type= UDP; inE->event.udpEvent= malloc(sizeof(UDPEvent)); + inE->copy= ©UDPEvent; inE->destroy= &destroyUDPEvent; inE->event.udpEvent->direction= IN; inE->event.udpEvent->datagramKey= malloc(sizeof(DatagramKey)); inE->event.udpEvent->datagramKey->saddr= - ltt_event_get_unsigned(event, - lttv_trace_get_hook_field(traceHook, 1)); + htonl(ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 1))); inE->event.udpEvent->datagramKey->daddr= - ltt_event_get_unsigned(event, - lttv_trace_get_hook_field(traceHook, 2)); + htonl(ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 2))); inE->event.udpEvent->unicast= ltt_event_get_unsigned(event, lttv_trace_get_hook_field(traceHook, 3)) == 0 ? false : true; inE->event.udpEvent->datagramKey->ulen= diff --git a/lttv/lttv/sync/sync_chain_lttv.c b/lttv/lttv/sync/sync_chain_lttv.c index 9652f346..cd8532a9 100644 --- a/lttv/lttv/sync/sync_chain_lttv.c +++ b/lttv/lttv/sync/sync_chain_lttv.c @@ -206,24 +206,8 @@ void syncTraceset(LttvTracesetContext* const traceSetContext) syncState->graphs= NULL; } - // Identify and initialize processing module - syncState->processingData= NULL; - if (optionSyncNull.present) - { - result= g_queue_find_custom(&processingModules, "LTTV-null", - &gcfCompareProcessing); - } - else - { - result= g_queue_find_custom(&processingModules, "LTTV-standard", - &gcfCompareProcessing); - } - g_assert(result != NULL); - syncState->processingModule= (ProcessingModule*) result->data; - graphsStream= NULL; - if (syncState->graphs && - syncState->processingModule->writeProcessingGraphsPlots != NULL) + if (syncState->graphs) { char* cwd; int graphsFp; @@ -251,11 +235,27 @@ void syncTraceset(LttvTracesetContext* const traceSetContext) free(cwd); } - // Identify matching and analysis modules + // Identify and initialize modules + syncState->processingData= NULL; + if (optionSyncNull.present) + { + result= g_queue_find_custom(&processingModules, "LTTV-null", + &gcfCompareProcessing); + } + else + { + result= g_queue_find_custom(&processingModules, "LTTV-standard", + &gcfCompareProcessing); + } + g_assert(result != NULL); + syncState->processingModule= (ProcessingModule*) result->data; + + syncState->matchingData= NULL; result= g_queue_find_custom(&matchingModules, "TCP", &gcfCompareMatching); g_assert(result != NULL); syncState->matchingModule= (MatchingModule*) result->data; + syncState->analysisData= NULL; result= g_queue_find_custom(&analysisModules, optionSyncAnalysis.arg, &gcfCompareAnalysis); if (result != NULL) @@ -267,16 +267,12 @@ void syncTraceset(LttvTracesetContext* const traceSetContext) g_error("Analysis module '%s' not found", optionSyncAnalysis.arg); } - syncState->processingModule->initProcessing(syncState, traceSetContext); - - syncState->matchingData= NULL; - syncState->analysisData= NULL; - if (!optionSyncNull.present) { - syncState->matchingModule->initMatching(syncState); syncState->analysisModule->initAnalysis(syncState); + syncState->matchingModule->initMatching(syncState); } + syncState->processingModule->initProcessing(syncState, traceSetContext); // Process traceset lttv_process_traceset_seek_time(traceSetContext, ltt_time_zero); diff --git a/lttv/modules/text/sync_chain_batch.c b/lttv/modules/text/sync_chain_batch.c index 731720c8..3dfc7287 100644 --- a/lttv/modules/text/sync_chain_batch.c +++ b/lttv/modules/text/sync_chain_batch.c @@ -303,14 +303,8 @@ void setupSyncChain(LttvTracesetContext* const traceSetContext) syncState->graphs= NULL; } - syncState->processingData= NULL; - result= g_queue_find_custom(&processingModules, "LTTV-standard", - &gcfCompareProcessing); - syncState->processingModule= (ProcessingModule*) result->data; - tracesetChainState->graphsStream= NULL; - if (syncState->graphs && - syncState->processingModule->writeProcessingGraphsPlots != NULL) + if (syncState->graphs) { char* cwd; int graphsFp; @@ -338,18 +332,22 @@ void setupSyncChain(LttvTracesetContext* const traceSetContext) free(cwd); } - syncState->matchingData= NULL; - result= g_queue_find_custom(&matchingModules, "broadcast", &gcfCompareMatching); - syncState->matchingModule= (MatchingModule*) result->data; - syncState->analysisData= NULL; result= g_queue_find_custom(&analysisModules, "eval", &gcfCompareAnalysis); syncState->analysisModule= (AnalysisModule*) result->data; + syncState->analysisModule->initAnalysis(syncState); - syncState->processingModule->initProcessing(syncState, traceSetContext); + syncState->matchingData= NULL; + result= g_queue_find_custom(&matchingModules, "broadcast", &gcfCompareMatching); + syncState->matchingModule= (MatchingModule*) result->data; syncState->matchingModule->initMatching(syncState); - syncState->analysisModule->initAnalysis(syncState); + + syncState->processingData= NULL; + result= g_queue_find_custom(&processingModules, "LTTV-standard", + &gcfCompareProcessing); + syncState->processingModule= (ProcessingModule*) result->data; + syncState->processingModule->initProcessing(syncState, traceSetContext); }