Add a module to distribute messages to many analysis modules
authorBenjamin Poirier <benjamin.poirier@polymtl.ca>
Tue, 3 Nov 2009 16:31:07 +0000 (11:31 -0500)
committerBenjamin Poirier <benjamin.poirier@polymtl.ca>
Fri, 18 Dec 2009 19:03:25 +0000 (14:03 -0500)
Also perform RTT analysis in the eval module.

Signed-off-by: Benjamin Poirier <benjamin.poirier@polymtl.ca>
13 files changed:
lttv/lttv/Makefile.am
lttv/lttv/sync/Makefile.am
lttv/lttv/sync/data_structures.c
lttv/lttv/sync/data_structures.h
lttv/lttv/sync/event_analysis_eval.c
lttv/lttv/sync/event_analysis_eval.h
lttv/lttv/sync/event_matching_broadcast.c
lttv/lttv/sync/event_matching_distributor.c [new file with mode: 0644]
lttv/lttv/sync/event_matching_distributor.h [new file with mode: 0644]
lttv/lttv/sync/event_processing_lttng_common.c
lttv/lttv/sync/event_processing_lttng_standard.c
lttv/lttv/sync/sync_chain_lttv.c
lttv/modules/text/sync_chain_batch.c

index f81466f17237613944bb4c114d70d87d68d4fe0a..791bf98977b1b483d3f93761cde630ef8597ac75 100644 (file)
@@ -60,11 +60,12 @@ lttv_real_SOURCES = \
        sync/event_processing_lttng_common.c\
        sync/event_processing_lttng_standard.c\
        sync/event_processing_lttng_null.c\
-       sync/event_matching_tcp.c\
        sync/event_matching_broadcast.c\
-       sync/event_analysis_linreg.c\
+       sync/event_matching_distributor.c\
+       sync/event_matching_tcp.c\
        sync/event_analysis_chull.c\
-       sync/event_analysis_eval.c
+       sync/event_analysis_eval.c\
+       sync/event_analysis_linreg.c
 
 lttvinclude_HEADERS = \
        attribute.h\
index 7c28a54879a17596bd28ea4de8e23523f1ef98cd..84562f9825d6886e595e0630b3fd1587b1f092d2 100644 (file)
@@ -7,7 +7,8 @@ unittest_SOURCES = \
        unittest.c\
        data_structures.c\
        event_matching_broadcast.c\
+       event_matching_distributor.c\
        event_matching_tcp.c\
-       event_analysis_linreg.c\
        event_analysis_chull.c\
-       event_analysis_eval.c
+       event_analysis_eval.c\
+       event_analysis_linreg.c
index d4c804b68523c20f7778ea4a6ae25060b8c0a3a5..c4d4d966f31025db80058b69bfadb4762411f8ef 100644 (file)
@@ -99,15 +99,12 @@ bool isAcking(const Message* const ackSegment, const Message* const
  * Convert an IP address from 32 bit form to dotted quad
  *
  * Args:
- *   str:          A preallocated string of length >= 17
+ *   str:          A preallocated string of length >= 16
  *   addr:         Address
  */
 void convertIP(char* const str, const uint32_t addr)
 {
-       struct in_addr iaddr;
-
-       iaddr.s_addr= htonl(addr);
-       strcpy(str, inet_ntoa(iaddr));
+       strcpy(str, inet_ntoa((struct in_addr) {.s_addr= addr}));
 }
 
 
@@ -116,7 +113,7 @@ void convertIP(char* const str, const uint32_t addr)
  */
 void printTCPSegment(const Message* const segment)
 {
-       char saddr[17], daddr[17];
+       char saddr[16], daddr[16];
        SegmentKey* segmentKey;
 
        g_assert(segment->inE->type == TCP);
@@ -329,6 +326,7 @@ void destroyTCPEvent(Event* const event)
        destroyEvent(event);
 }
 
+
 /*
  * Free the memory used by a base Event
  */
@@ -545,7 +543,7 @@ void gdnDestroyBroadcast(gpointer data)
 void destroyBroadcast(Broadcast* const broadcast)
 {
        g_queue_foreach(broadcast->events, &gfDestroyEvent, NULL);
-       g_queue_clear(broadcast->events);
+       g_queue_free(broadcast->events);
        free(broadcast);
 }
 
@@ -576,5 +574,69 @@ void gfDestroyEvent(gpointer data, gpointer user_data)
  */
 double wallTimeSub(const WallTime const* tA, const WallTime const* tB)
 {
-       return tA->seconds - tB->seconds + (tA->nanosec - tB->nanosec) / 1e9;
+       return (double) tA->seconds - tB->seconds + ((double) tA->nanosec - tB->nanosec) / 1e9;
+}
+
+
+/*
+ * Allocate and copy a base event
+ *
+ * Args:
+ *   newEvent:     new event, pointer will be updated
+ *   event:        event to copy
+ */
+void copyEvent(const Event* const event, Event** const newEvent)
+{
+       g_assert(event->event.tcpEvent == NULL);
+
+       *newEvent= malloc(sizeof(Event));
+       memcpy(*newEvent, event, sizeof(Event));
+}
+
+
+/*
+ * Allocate and copy a TCP event
+ *
+ * Args:
+ *   newEvent:     new event, pointer will be updated
+ *   event:        event to copy
+ */
+void copyTCPEvent(const Event* const event, Event** const newEvent)
+{
+       g_assert(event->type == TCP);
+
+       *newEvent= malloc(sizeof(Event));
+       memcpy(*newEvent, event, sizeof(Event));
+
+       (*newEvent)->event.tcpEvent= malloc(sizeof(TCPEvent));
+       memcpy((*newEvent)->event.tcpEvent, event->event.tcpEvent,
+               sizeof(TCPEvent));
+
+       (*newEvent)->event.tcpEvent->segmentKey= malloc(sizeof(SegmentKey));
+       memcpy((*newEvent)->event.tcpEvent->segmentKey,
+               event->event.tcpEvent->segmentKey, sizeof(SegmentKey));
+}
+
+
+/*
+ * Allocate and copy a UDP event
+ *
+ * Args:
+ *   newEvent:     new event, pointer will be updated
+ *   event:        event to copy
+ */
+void copyUDPEvent(const Event* const event, Event** const newEvent)
+{
+       g_assert(event->type == UDP);
+
+       *newEvent= malloc(sizeof(Event));
+       memcpy(*newEvent, event, sizeof(Event));
+
+       (*newEvent)->event.udpEvent= malloc(sizeof(UDPEvent));
+       memcpy((*newEvent)->event.udpEvent, event->event.udpEvent,
+               sizeof(UDPEvent));
+
+       (*newEvent)->event.udpEvent->datagramKey= malloc(sizeof(DatagramKey));
+       memcpy((*newEvent)->event.udpEvent->datagramKey,
+               event->event.udpEvent->datagramKey, sizeof(DatagramKey));
 }
index 1f70ade4877a82ae782bfe4695d16d7375895683..b6bc89801692d8d224f0a820d11e60232233d9b8 100644 (file)
@@ -100,6 +100,7 @@ typedef struct _Event
                UDPEvent* udpEvent;
        } event;
 
+       void (*copy)(const struct _Event* const event, struct _Event** const newEvent);
        void (*destroy)(struct _Event* const event);
 } Event;
 
@@ -149,6 +150,9 @@ void gdnDestroyDatagramKey(gpointer data);
 
 // Event-related functions
 void gdnDestroyEvent(gpointer data);
+void copyEvent(const Event* const event, Event** const newEvent);
+void copyTCPEvent(const Event* const event, Event** const newEvent);
+void copyUDPEvent(const Event* const event, Event** const newEvent);
 void destroyEvent(Event* const event);
 void destroyTCPEvent(Event* const event);
 void destroyUDPEvent(Event* const event);
index 49f3e17700f51a0399e61f02cdd2e17f831a576c..cfe387a9a9eee150063b39f177d44377c56ed53f 100644 (file)
@@ -17,6 +17,7 @@
  */
 
 #define _GNU_SOURCE
+#define _ISOC99_SOURCE
 
 #ifdef HAVE_CONFIG_H
 #include <config.h>
@@ -62,6 +63,7 @@ static void positionStream(FILE* stream);
 
 static void gfSum(gpointer data, gpointer userData);
 static void gfSumSquares(gpointer data, gpointer userData);
+static void ghfPrintExchangeRtt(gpointer key, gpointer value, gpointer user_data);
 
 
 static AnalysisModule analysisModuleEval= {
@@ -147,6 +149,10 @@ static void initAnalysisEval(SyncState* const syncState)
                        analysisData->stats->messageStats[i]= calloc(syncState->traceNb,
                                sizeof(MessageStats));
                }
+
+               analysisData->stats->exchangeRtt=
+                       g_hash_table_new_full(&ghfRttKeyHash, &gefRttKeyEqual,
+                               &gdnDestroyRttKey, &gdnDestroyDouble);
        }
 }
 
@@ -181,6 +187,9 @@ static void destroyAnalysisEval(SyncState* const syncState)
                        free(analysisData->stats->messageStats[i]);
                }
                free(analysisData->stats->messageStats);
+
+               g_hash_table_destroy(analysisData->stats->exchangeRtt);
+
                free(analysisData->stats);
        }
 
@@ -202,7 +211,7 @@ static void analyzeMessageEval(SyncState* const syncState, Message* const messag
 {
        AnalysisDataEval* analysisData;
        MessageStats* messageStats;
-       double* rttInfo;
+       double* rtt;
        double tt;
        struct RttKey rttKey;
 
@@ -223,14 +232,19 @@ static void analyzeMessageEval(SyncState* const syncState, Message* const messag
                messageStats->inversionNb++;
        }
 
-       g_assert(message->inE->type == UDP);
-       rttKey.saddr= message->inE->event.udpEvent->datagramKey->saddr;
-       rttKey.daddr= message->inE->event.udpEvent->datagramKey->daddr;
-       rttInfo= g_hash_table_lookup(analysisData->rttInfo, &rttKey);
+       g_assert(message->inE->type == TCP);
+       rttKey.saddr=
+               message->inE->event.tcpEvent->segmentKey->connectionKey.saddr;
+       rttKey.daddr=
+               message->inE->event.tcpEvent->segmentKey->connectionKey.daddr;
+       rtt= g_hash_table_lookup(analysisData->rttInfo, &rttKey);
+       g_debug("rttInfo, looking up (%u, %u)->(%f)", rttKey.saddr,
+               rttKey.daddr, rtt ? *rtt : NAN);
 
-       if (rttInfo)
+       if (rtt)
        {
-               if (tt < *rttInfo / 2.)
+               g_debug("rttInfo, tt: %f rtt / 2: %f", tt, *rtt / 2.);
+               if (tt < *rtt / 2.)
                {
                        messageStats->tooFastNb++;
                }
@@ -253,9 +267,44 @@ static void analyzeMessageEval(SyncState* const syncState, Message* const messag
  */
 static void analyzeExchangeEval(SyncState* const syncState, Exchange* const exchange)
 {
-       AnalysisDataEval* analysisData;
+       AnalysisDataEval* analysisData= syncState->analysisData;
+       Message* m1= g_queue_peek_tail(exchange->acks);
+       Message* m2= exchange->message;
+       struct RttKey* rttKey;
+       double* rtt, * exchangeRtt;
 
-       analysisData= (AnalysisDataEval*) syncState->analysisData;
+       if (!syncState->stats)
+       {
+               return;
+       }
+
+       // (T2 - T1) - (T3 - T4)
+       rtt= malloc(sizeof(double));
+       *rtt= wallTimeSub(&m1->inE->wallTime, &m1->outE->wallTime) -
+               wallTimeSub(&m2->outE->wallTime, &m2->inE->wallTime);
+
+       g_assert(m1->inE->type == TCP);
+       rttKey= malloc(sizeof(struct RttKey));
+       rttKey->saddr=
+               MIN(m1->inE->event.tcpEvent->segmentKey->connectionKey.saddr,
+                       m1->inE->event.tcpEvent->segmentKey->connectionKey.daddr);
+       rttKey->daddr=
+               MAX(m1->inE->event.tcpEvent->segmentKey->connectionKey.saddr,
+                       m1->inE->event.tcpEvent->segmentKey->connectionKey.daddr);
+       exchangeRtt= g_hash_table_lookup(analysisData->stats->exchangeRtt,
+               rttKey);
+
+       if (exchangeRtt)
+       {
+               if (*rtt < *exchangeRtt)
+               {
+                       g_hash_table_replace(analysisData->stats->exchangeRtt, rttKey, rtt);
+               }
+       }
+       else
+       {
+               g_hash_table_insert(analysisData->stats->exchangeRtt, rttKey, rtt);
+       }
 }
 
 
@@ -305,7 +354,7 @@ static void analyzeBroadcastEval(SyncState* const syncState, Broadcast* const br
  *   syncState     container for synchronization data.
  *
  * Returns:
- *   Factors[traceNb] synchronization factors for each trace
+ *   Factors[traceNb] identity factors for each trace
  */
 static GArray* finalizeAnalysisEval(SyncState* const syncState)
 {
@@ -375,6 +424,58 @@ static void printAnalysisStatsEval(SyncState* const syncState)
                                messageStats->noRTTInfoNb, messageStats->total);
                }
        }
+
+       printf("\tRound-trip times:\n"
+               "\t\tHost pair                          RTT from exchanges  RTTs from file (ms)\n");
+       g_hash_table_foreach(analysisData->stats->exchangeRtt,
+               &ghfPrintExchangeRtt, analysisData->rttInfo);
+}
+
+
+/*
+ * A GHFunc for g_hash_table_foreach()
+ *
+ * Args:
+ *   key:          RttKey* where saddr < daddr
+ *   value:        double*, RTT estimated from exchanges
+ *   user_data     GHashTable* rttInfo
+ */
+static void ghfPrintExchangeRtt(gpointer key, gpointer value, gpointer user_data)
+{
+       char addr1[16], addr2[16];
+       struct RttKey* rttKey1= key;
+       struct RttKey rttKey2= {rttKey1->daddr, rttKey1->saddr};
+       double* fileRtt1, *fileRtt2;
+       GHashTable* rttInfo= user_data;
+
+       convertIP(addr1, rttKey1->saddr);
+       convertIP(addr2, rttKey1->daddr);
+
+       fileRtt1= g_hash_table_lookup(rttInfo, rttKey1);
+       fileRtt2= g_hash_table_lookup(rttInfo, &rttKey2);
+
+       printf("\t\t(%15s, %-15s) %-18.3f  ", addr1, addr2, *(double*) value * 1e3);
+
+       if (fileRtt1 || fileRtt2)
+       {
+               if (fileRtt1)
+               {
+                       printf("%.3f", *fileRtt1 * 1e3);
+               }
+               if (fileRtt1 && fileRtt2)
+               {
+                       printf(", ");
+               }
+               if (fileRtt2)
+               {
+                       printf("%.3f", *fileRtt2 * 1e3);
+               }
+       }
+       else
+       {
+               printf("-");
+       }
+       printf("\n");
 }
 
 
@@ -520,6 +621,8 @@ static void readRttInfo(GHashTable* rttInfo, FILE* rttStream)
                }
 
                *rtt/= 1e3;
+               g_debug("rttInfo, Inserting (%u, %u)->(%f)", rttKey->saddr,
+                       rttKey->daddr, *rtt);
                g_hash_table_insert(rttInfo, rttKey, rtt);
 
                positionStream(rttStream);
index 1515bec955bbfbfa092d23c8ca078426b58bf003..159ba615acbba639589285a61e8988802485afb4 100644 (file)
 #include "data_structures.h"
 
 
+struct RttKey
+{
+       uint32_t saddr, daddr;
+};
+
 typedef struct
 {
        unsigned int inversionNb,
@@ -38,13 +43,12 @@ typedef struct
        unsigned int broadcastNb;
 
        MessageStats** messageStats;
+       /* double* exchangeRtt[RttKey]
+        * For this table, saddr and daddr are swapped as necessary such that
+        * saddr < daddr */
+       GHashTable* exchangeRtt;
 } AnalysisStatsEval;
 
-struct RttKey
-{
-       uint32_t saddr, daddr;
-};
-
 typedef struct
 {
        // double* rttInfo[RttKey]
index fbec98ecc28d8e46370bfd256ce246803431a1d0..32112014ca956302c8ea4d83be31950aa7be1219 100644 (file)
@@ -198,13 +198,14 @@ static void matchEventBroadcast(SyncState* const syncState, Event* const event)
                                matchingData->stats->totReceive++;
                        }
 
-                       // s'il est déjà dans pendingBroadcasts
-                       //   l'ajouter Ã  son broadcast
-                       //   s'il y a traceNb Ã©léments
-                       //     le retirer de pending et le livrer Ã  analysis
-                       //     détruire le broadcast (et ses Ã©léments)
-                       // sinon
-                       //   créer un broadcast et l'ajouter Ã  pending
+                       /* if event in pendingBroadcasts:
+                        *   add it to its broadcast
+                        *   if this broadcast has traceNb events:
+                        *     remove it from pending and deliver it to analysis
+                        *     destroy the broadcast (and its elements)
+                        * else:
+                        *   create a broadcast and add it to pending
+                        */
 
                        result=
                                g_hash_table_lookup_extended(matchingData->pendingBroadcasts,
diff --git a/lttv/lttv/sync/event_matching_distributor.c b/lttv/lttv/sync/event_matching_distributor.c
new file mode 100644 (file)
index 0000000..b9d7405
--- /dev/null
@@ -0,0 +1,369 @@
+/* This file is part of the Linux Trace Toolkit viewer
+ * Copyright (C) 2009 Benjamin Poirier <benjamin.poirier@polymtl.ca>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License Version 2 as
+ * published by the Free Software Foundation;
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston,
+ * MA 02111-1307, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdlib.h>
+#include <stddef.h>
+#include <string.h>
+
+#include "event_analysis.h"
+#include "sync_chain.h"
+
+#include "event_matching_distributor.h"
+
+
+struct InitAggregate
+{
+       SyncState* syncState;
+       GQueue* matchingModules;
+};
+
+
+struct GraphAggregate
+{
+       /* Offset whithin Matching module of the field* containing the function
+        * pointer */
+       size_t offset;
+       FILE* stream;
+       unsigned int i, j;
+};
+
+
+// Functions common to all matching modules
+static void initMatchingDistributor(SyncState* const syncState);
+static void destroyMatchingDistributor(SyncState* const syncState);
+
+static void matchEventDistributor(SyncState* const syncState, Event* const
+       event);
+static GArray* finalizeMatchingDistributor(SyncState* const syncState);
+static void printMatchingStatsDistributor(SyncState* const syncState);
+static void writeMatchingGraphsPlotsDistributor(FILE* stream, SyncState* const
+       syncState, const unsigned int i, const unsigned int j);
+static void writeMatchingGraphsOptionsDistributor(FILE* stream, SyncState*
+       const syncState, const unsigned int i, const unsigned int j);
+
+// Functions specific to this module
+static void registerMatchingDistributor() __attribute__((constructor (101)));
+
+void gfInitModule(gpointer data, gpointer user_data);
+void gfDestroyModule(gpointer data, gpointer user_data);
+void gfMatchEvent(gpointer data, gpointer user_data);
+void gfFinalize(gpointer data, gpointer user_data);
+void gfPrintStats(gpointer data, gpointer user_data);
+void gfGraphFunctionCall(gpointer data, gpointer user_data);
+
+
+static MatchingModule matchingModuleDistributor = {
+       .name= "distributor",
+       .canMatch[TCP]= true,
+       .canMatch[UDP]= true,
+       .initMatching= &initMatchingDistributor,
+       .destroyMatching= &destroyMatchingDistributor,
+       .matchEvent= &matchEventDistributor,
+       .finalizeMatching= &finalizeMatchingDistributor,
+       .printMatchingStats= &printMatchingStatsDistributor,
+       .writeMatchingGraphsPlots= &writeMatchingGraphsPlotsDistributor,
+       .writeMatchingGraphsOptions= &writeMatchingGraphsOptionsDistributor,
+};
+
+
+/*
+ * Matching module registering function
+ */
+static void registerMatchingDistributor()
+{
+       g_queue_push_tail(&matchingModules, &matchingModuleDistributor);
+}
+
+
+/*
+ * Matching init function
+ *
+ * This function is called at the beginning of a synchronization run for a set
+ * of traces.
+ *
+ * Build the list and initialize other matching Modules
+ *
+ * Args:
+ *   syncState     container for synchronization data.
+ */
+static void initMatchingDistributor(SyncState* const syncState)
+{
+       MatchingDataDistributor* matchingData;
+
+       matchingData= malloc(sizeof(MatchingDataDistributor));
+       syncState->matchingData= matchingData;
+
+       matchingData->distributedModules= g_queue_new();
+       g_queue_foreach(&matchingModules, &gfInitModule, &(struct InitAggregate)
+               {syncState, matchingData->distributedModules});
+}
+
+
+/*
+ * Matching destroy function
+ *
+ * Destroy other modules and free the matching specific data structures
+ *
+ * Args:
+ *   syncState     container for synchronization data.
+ */
+static void destroyMatchingDistributor(SyncState* const syncState)
+{
+       MatchingDataDistributor* matchingData= syncState->matchingData;
+
+       g_queue_foreach(matchingData->distributedModules, &gfDestroyModule, NULL);
+
+       g_queue_clear(matchingData->distributedModules);
+       free(syncState->matchingData);
+       syncState->matchingData= NULL;
+}
+
+
+
+/*
+ * Copy event and distribute to matching modules
+ *
+ * Args:
+ *   syncState     container for synchronization data.
+ *   event         new event to match
+ */
+static void matchEventDistributor(SyncState* const syncState, Event* const event)
+{
+       MatchingDataDistributor* matchingData= syncState->matchingData;
+
+       g_queue_foreach(matchingData->distributedModules, &gfMatchEvent, event);
+       event->destroy(event);
+}
+
+
+/*
+ * Call the distributed finalization functions and return identity factors
+ *
+ * Args:
+ *   syncState     container for synchronization data.
+ *
+ * Returns:
+ *   Factors[traceNb] identity factors for each trace
+ */
+static GArray* finalizeMatchingDistributor(SyncState* const syncState)
+{
+       GArray* factors;
+       unsigned int i;
+       MatchingDataDistributor* matchingData= syncState->matchingData;
+
+       g_queue_foreach(matchingData->distributedModules, &gfFinalize, NULL);
+
+       factors= g_array_sized_new(FALSE, FALSE, sizeof(Factors),
+               syncState->traceNb);
+       g_array_set_size(factors, syncState->traceNb);
+       for (i= 0; i < syncState->traceNb; i++)
+       {
+               Factors* e;
+
+               e= &g_array_index(factors, Factors, i);
+               e->drift= 1.;
+               e->offset= 0.;
+       }
+
+       return factors;
+}
+
+
+/*
+ * Call the distributed statistics functions (when they exist). Must be called
+ * after finalizeMatching.
+ *
+ * Args:
+ *   syncState     container for synchronization data.
+ */
+static void printMatchingStatsDistributor(SyncState* const syncState)
+{
+       MatchingDataDistributor* matchingData= syncState->matchingData;
+
+       g_queue_foreach(matchingData->distributedModules, &gfPrintStats, NULL);
+}
+
+
+/*
+ * Call the distributed graph lines functions (when they exist).
+ *
+ * Args:
+ *   stream:       stream where to write the data
+ *   syncState:    container for synchronization data
+ *   i:            first trace number
+ *   j:            second trace number, garanteed to be larger than i
+ */
+static void writeMatchingGraphsPlotsDistributor(FILE* stream, SyncState* const
+       syncState, const unsigned int i, const unsigned int j)
+{
+       MatchingDataDistributor* matchingData= syncState->matchingData;
+
+       g_queue_foreach(matchingData->distributedModules, &gfGraphFunctionCall,
+               &(struct GraphAggregate) {offsetof(MatchingModule,
+                       writeMatchingGraphsPlots), stream, i, j});
+}
+
+
+/*
+ * Call the distributed graph options functions (when they exist).
+ *
+ * Args:
+ *   stream:       stream where to write the data
+ *   syncState:    container for synchronization data
+ *   i:            first trace number
+ *   j:            second trace number, garanteed to be larger than i
+ */
+static void writeMatchingGraphsOptionsDistributor(FILE* stream, SyncState*
+       const syncState, const unsigned int i, const unsigned int j)
+{
+       MatchingDataDistributor* matchingData= syncState->matchingData;
+
+       g_queue_foreach(matchingData->distributedModules, &gfGraphFunctionCall,
+               &(struct GraphAggregate) {offsetof(MatchingModule,
+                       writeMatchingGraphsOptions), stream, i, j});
+}
+
+
+/*
+ * A GFunc for g_queue_foreach()
+ *
+ * Add and initialize matching module
+ *
+ * Args:
+ *   data          MatchingModule*, module to add
+ *   user_data     InitAggregate*
+ */
+void gfInitModule(gpointer data, gpointer user_data)
+{
+       SyncState* parallelSS;
+       struct InitAggregate* aggregate= user_data;
+       MatchingModule* matchingModule= data;
+
+       if (strcmp(matchingModule->name, matchingModuleDistributor.name) == 0)
+       {
+               return;
+       }
+
+       parallelSS= malloc(sizeof(SyncState));
+       memcpy(parallelSS, aggregate->syncState, sizeof(SyncState));
+       g_queue_push_tail(aggregate->matchingModules, parallelSS);
+
+       parallelSS->matchingModule= matchingModule;
+       parallelSS->matchingModule->initMatching(parallelSS);
+}
+
+
+/*
+ * A GFunc for g_queue_foreach()
+ *
+ * Destroy and remove matching module
+ *
+ * Args:
+ *   data          SyncState* containing the module to destroy
+ *   user_data     NULL
+ */
+void gfDestroyModule(gpointer data, gpointer user_data)
+{
+       SyncState* parallelSS= data;
+
+       parallelSS->matchingModule->destroyMatching(parallelSS);
+       free(parallelSS);
+}
+
+
+/*
+ * A GFunc for g_queue_foreach()
+ *
+ * Args:
+ *   data          SyncState* containing the distributed matching module
+ *   user_data     Event* original event
+ */
+void gfMatchEvent(gpointer data, gpointer user_data)
+{
+       SyncState* parallelSS= data;
+       const Event* event= user_data;
+       Event* newEvent;
+
+       if (parallelSS->matchingModule->canMatch[event->type])
+       {
+               event->copy(event, &newEvent);
+               parallelSS->matchingModule->matchEvent(parallelSS, newEvent);
+       }
+}
+
+
+/*
+ * A GFunc for g_queue_foreach()
+ *
+ * Args:
+ *   data          SyncState* containing the distributed matching module
+ *   user_data     NULL
+ */
+void gfFinalize(gpointer data, gpointer user_data)
+{
+       GArray* factors;
+       SyncState* parallelSS= data;
+
+       factors= parallelSS->matchingModule->finalizeMatching(parallelSS);
+       g_array_free(factors, TRUE);
+}
+
+
+/*
+ * A GFunc for g_queue_foreach()
+ *
+ * Args:
+ *   data          SyncState* containing the distributed matching module
+ *   user_data     NULL
+ */
+void gfPrintStats(gpointer data, gpointer user_data)
+{
+       SyncState* parallelSS= data;
+
+       if (parallelSS->matchingModule->printMatchingStats != NULL)
+       {
+               parallelSS->matchingModule->printMatchingStats(parallelSS);
+       }
+}
+
+
+/*
+ * A GFunc for g_queue_foreach()
+ *
+ * Call a certain matching function
+ *
+ * Args:
+ *   data          SyncState* containing the distributed matching module
+ *   user_data     size_t,
+ */
+void gfGraphFunctionCall(gpointer data, gpointer user_data)
+{
+       SyncState* parallelSS= data;
+       struct GraphAggregate* aggregate= user_data;
+       void (*graphFunction)(FILE* , struct _SyncState*, const unsigned int,
+               const unsigned int)= (void*) data + (size_t) aggregate->offset;
+
+       if (graphFunction != NULL)
+       {
+               graphFunction(aggregate->stream, parallelSS, aggregate->i, aggregate->j);
+       }
+}
diff --git a/lttv/lttv/sync/event_matching_distributor.h b/lttv/lttv/sync/event_matching_distributor.h
new file mode 100644 (file)
index 0000000..b4399e5
--- /dev/null
@@ -0,0 +1,34 @@
+/* This file is part of the Linux Trace Toolkit viewer
+ * Copyright (C) 2009 Benjamin Poirier <benjamin.poirier@polymtl.ca>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License Version 2 as
+ * published by the Free Software Foundation;
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston,
+ * MA 02111-1307, USA.
+ */
+
+#ifndef EVENT_MATCHING_DISTRIBUTOR_H
+#define EVENT_MATCHING_DISTRIBUTOR_H
+
+#include <glib.h>
+
+#include "data_structures.h"
+#include "sync_chain.h"
+
+
+typedef struct
+{
+       // SyncState* distributedModules[]
+       GQueue* distributedModules;
+} MatchingDataDistributor;
+
+#endif
index 8f1d2dcbc3a249af70e8e5f2c62fc1da6c51c2c9..f25e0b61d4bb18b06d4e05cd78375374d353b72d 100644 (file)
@@ -147,12 +147,12 @@ void registerHooks(GArray* hookListList, LttvTracesetContext* const
                        guint old_len;
                        bool registerHook;
 
-                       registerHook= true;
+                       registerHook= false;
                        for (k= 0; k < TYPE_COUNT; k++)
                        {
-                               if (eventTypes[k] && eventHookInfoList[j].eventTypes[k] == false)
+                               if (eventTypes[k] && eventHookInfoList[j].eventTypes[k])
                                {
-                                       registerHook= false;
+                                       registerHook= true;
                                        break;
                                }
                        }
index af1c93bb18156c2bd8ee60329342cc642b3f0569..b79ca4af27d3cab4545eb79b8dc037d0280ef8c8 100644 (file)
@@ -425,16 +425,17 @@ static gboolean processEventLTTVStandard(void* hookData, void* callData)
                outE->cpuTime= tsc;
                outE->wallTime= wTime;
                outE->type= TCP;
+               outE->copy= &copyTCPEvent;
                outE->destroy= &destroyTCPEvent;
                outE->event.tcpEvent= malloc(sizeof(TCPEvent));
                outE->event.tcpEvent->direction= OUT;
                outE->event.tcpEvent->segmentKey= malloc(sizeof(SegmentKey));
                outE->event.tcpEvent->segmentKey->connectionKey.saddr=
-                       ltt_event_get_unsigned(event, lttv_trace_get_hook_field(traceHook,
-                                       3));
+                       htonl(ltt_event_get_unsigned(event,
+                                       lttv_trace_get_hook_field(traceHook, 3)));
                outE->event.tcpEvent->segmentKey->connectionKey.daddr=
-                       ltt_event_get_unsigned(event, lttv_trace_get_hook_field(traceHook,
-                                       4));
+                       htonl(ltt_event_get_unsigned(event,
+                                       lttv_trace_get_hook_field(traceHook, 4)));
                outE->event.tcpEvent->segmentKey->tot_len=
                        ltt_event_get_unsigned(event, lttv_trace_get_hook_field(traceHook,
                                        5));
@@ -493,6 +494,7 @@ static gboolean processEventLTTVStandard(void* hookData, void* callData)
                        inE->cpuTime= tsc;
                        inE->wallTime= wTime;
                        inE->event.tcpEvent= NULL;
+                       inE->copy= &copyEvent;
                        inE->destroy= &destroyEvent;
 
                        skb= (void*) (long) ltt_event_get_long_unsigned(event,
@@ -535,15 +537,16 @@ static gboolean processEventLTTVStandard(void* hookData, void* callData)
 
                        inE->type= TCP;
                        inE->event.tcpEvent= malloc(sizeof(TCPEvent));
+                       inE->copy= &copyTCPEvent;
                        inE->destroy= &destroyTCPEvent;
                        inE->event.tcpEvent->direction= IN;
                        inE->event.tcpEvent->segmentKey= malloc(sizeof(SegmentKey));
                        inE->event.tcpEvent->segmentKey->connectionKey.saddr=
-                               ltt_event_get_unsigned(event,
-                                       lttv_trace_get_hook_field(traceHook, 1));
+                               htonl(ltt_event_get_unsigned(event,
+                                               lttv_trace_get_hook_field(traceHook, 1)));
                        inE->event.tcpEvent->segmentKey->connectionKey.daddr=
-                               ltt_event_get_unsigned(event,
-                                       lttv_trace_get_hook_field(traceHook, 2));
+                               htonl(ltt_event_get_unsigned(event,
+                                               lttv_trace_get_hook_field(traceHook, 2)));
                        inE->event.tcpEvent->segmentKey->tot_len=
                                ltt_event_get_unsigned(event,
                                        lttv_trace_get_hook_field(traceHook, 3));
@@ -613,15 +616,16 @@ static gboolean processEventLTTVStandard(void* hookData, void* callData)
 
                        inE->type= UDP;
                        inE->event.udpEvent= malloc(sizeof(UDPEvent));
+                       inE->copy= &copyUDPEvent;
                        inE->destroy= &destroyUDPEvent;
                        inE->event.udpEvent->direction= IN;
                        inE->event.udpEvent->datagramKey= malloc(sizeof(DatagramKey));
                        inE->event.udpEvent->datagramKey->saddr=
-                               ltt_event_get_unsigned(event,
-                                       lttv_trace_get_hook_field(traceHook, 1));
+                               htonl(ltt_event_get_unsigned(event,
+                                       lttv_trace_get_hook_field(traceHook, 1)));
                        inE->event.udpEvent->datagramKey->daddr=
-                               ltt_event_get_unsigned(event,
-                                       lttv_trace_get_hook_field(traceHook, 2));
+                               htonl(ltt_event_get_unsigned(event,
+                                       lttv_trace_get_hook_field(traceHook, 2)));
                        inE->event.udpEvent->unicast= ltt_event_get_unsigned(event,
                                lttv_trace_get_hook_field(traceHook, 3)) == 0 ? false : true;
                        inE->event.udpEvent->datagramKey->ulen=
index 9652f346328da0f6138f10bed0107ecbe1a82468..cd8532a990c5eed3ff640dc19ec807fedb3dd17a 100644 (file)
@@ -206,24 +206,8 @@ void syncTraceset(LttvTracesetContext* const traceSetContext)
                syncState->graphs= NULL;
        }
 
-       // Identify and initialize processing module
-       syncState->processingData= NULL;
-       if (optionSyncNull.present)
-       {
-               result= g_queue_find_custom(&processingModules, "LTTV-null",
-                       &gcfCompareProcessing);
-       }
-       else
-       {
-               result= g_queue_find_custom(&processingModules, "LTTV-standard",
-                       &gcfCompareProcessing);
-       }
-       g_assert(result != NULL);
-       syncState->processingModule= (ProcessingModule*) result->data;
-
        graphsStream= NULL;
-       if (syncState->graphs &&
-               syncState->processingModule->writeProcessingGraphsPlots != NULL)
+       if (syncState->graphs)
        {
                char* cwd;
                int graphsFp;
@@ -251,11 +235,27 @@ void syncTraceset(LttvTracesetContext* const traceSetContext)
                free(cwd);
        }
 
-       // Identify matching and analysis modules
+       // Identify and initialize modules
+       syncState->processingData= NULL;
+       if (optionSyncNull.present)
+       {
+               result= g_queue_find_custom(&processingModules, "LTTV-null",
+                       &gcfCompareProcessing);
+       }
+       else
+       {
+               result= g_queue_find_custom(&processingModules, "LTTV-standard",
+                       &gcfCompareProcessing);
+       }
+       g_assert(result != NULL);
+       syncState->processingModule= (ProcessingModule*) result->data;
+
+       syncState->matchingData= NULL;
        result= g_queue_find_custom(&matchingModules, "TCP", &gcfCompareMatching);
        g_assert(result != NULL);
        syncState->matchingModule= (MatchingModule*) result->data;
 
+       syncState->analysisData= NULL;
        result= g_queue_find_custom(&analysisModules, optionSyncAnalysis.arg,
                &gcfCompareAnalysis);
        if (result != NULL)
@@ -267,16 +267,12 @@ void syncTraceset(LttvTracesetContext* const traceSetContext)
                g_error("Analysis module '%s' not found", optionSyncAnalysis.arg);
        }
 
-       syncState->processingModule->initProcessing(syncState, traceSetContext);
-
-       syncState->matchingData= NULL;
-       syncState->analysisData= NULL;
-
        if (!optionSyncNull.present)
        {
-               syncState->matchingModule->initMatching(syncState);
                syncState->analysisModule->initAnalysis(syncState);
+               syncState->matchingModule->initMatching(syncState);
        }
+       syncState->processingModule->initProcessing(syncState, traceSetContext);
 
        // Process traceset
        lttv_process_traceset_seek_time(traceSetContext, ltt_time_zero);
index 731720c840d24414fd9ea8c325b07492f84476e9..3dfc72878591cd7472c68829c08de0253f410166 100644 (file)
@@ -303,14 +303,8 @@ void setupSyncChain(LttvTracesetContext* const traceSetContext)
                syncState->graphs= NULL;
        }
 
-       syncState->processingData= NULL;
-       result= g_queue_find_custom(&processingModules, "LTTV-standard",
-               &gcfCompareProcessing);
-       syncState->processingModule= (ProcessingModule*) result->data;
-
        tracesetChainState->graphsStream= NULL;
-       if (syncState->graphs &&
-               syncState->processingModule->writeProcessingGraphsPlots != NULL)
+       if (syncState->graphs)
        {
                char* cwd;
                int graphsFp;
@@ -338,18 +332,22 @@ void setupSyncChain(LttvTracesetContext* const traceSetContext)
                free(cwd);
        }
 
-       syncState->matchingData= NULL;
-       result= g_queue_find_custom(&matchingModules, "broadcast", &gcfCompareMatching);
-       syncState->matchingModule= (MatchingModule*) result->data;
-
        syncState->analysisData= NULL;
        result= g_queue_find_custom(&analysisModules, "eval",
                &gcfCompareAnalysis);
        syncState->analysisModule= (AnalysisModule*) result->data;
+       syncState->analysisModule->initAnalysis(syncState);
 
-       syncState->processingModule->initProcessing(syncState, traceSetContext);
+       syncState->matchingData= NULL;
+       result= g_queue_find_custom(&matchingModules, "broadcast", &gcfCompareMatching);
+       syncState->matchingModule= (MatchingModule*) result->data;
        syncState->matchingModule->initMatching(syncState);
-       syncState->analysisModule->initAnalysis(syncState);
+
+       syncState->processingData= NULL;
+       result= g_queue_find_custom(&processingModules, "LTTV-standard",
+               &gcfCompareProcessing);
+       syncState->processingModule= (ProcessingModule*) result->data;
+       syncState->processingModule->initProcessing(syncState, traceSetContext);
 }
 
 
This page took 0.038434 seconds and 4 git commands to generate.