#include <arpa/inet.h>
#include <errno.h>
+#include <math.h>
#include <netinet/in.h>
#include <stddef.h>
#include <stdlib.h>
static void readRttInfo(GHashTable* rttInfo, FILE* rttFile);
static void positionStream(FILE* stream);
+static void gfSum(gpointer data, gpointer userData);
+static void gfSumSquares(gpointer data, gpointer userData);
+
static AnalysisModule analysisModuleEval= {
.name= "eval",
if (syncState->stats)
{
- analysisData->stats= malloc(sizeof(AnalysisStatsEval));
+ analysisData->stats= calloc(1, sizeof(AnalysisStatsEval));
analysisData->stats->broadcastDiffSum= 0.;
- analysisData->stats->allStats= malloc(syncState->traceNb *
- sizeof(TracePairStats*));
+ analysisData->stats->messageStats= malloc(syncState->traceNb *
+ sizeof(MessageStats*));
for (i= 0; i < syncState->traceNb; i++)
{
- analysisData->stats->allStats[i]= calloc(syncState->traceNb,
- sizeof(TracePairStats));
+ analysisData->stats->messageStats[i]= calloc(syncState->traceNb,
+ sizeof(MessageStats));
}
}
}
{
for (i= 0; i < syncState->traceNb; i++)
{
- free(analysisData->stats->allStats[i]);
+ free(analysisData->stats->messageStats[i]);
}
- free(analysisData->stats->allStats);
+ free(analysisData->stats->messageStats);
free(analysisData->stats);
}
/*
* Perform analysis on an event pair.
*
+ * Check if there is message inversion or messages that are too fast.
+ *
* Args:
* syncState container for synchronization data
* message structure containing the events
static void analyzeMessageEval(SyncState* const syncState, Message* const message)
{
AnalysisDataEval* analysisData;
+ MessageStats* messageStats;
+ double* rttInfo;
+ double tt;
+ struct RttKey rttKey;
+
+ if (!syncState->stats)
+ {
+ return;
+ }
analysisData= (AnalysisDataEval*) syncState->analysisData;
+ messageStats=
+ &analysisData->stats->messageStats[message->outE->traceNum][message->inE->traceNum];
+
+ messageStats->total++;
+
+ tt= wallTimeSub(&message->inE->wallTime, &message->outE->wallTime);
+ if (tt <= 0)
+ {
+ messageStats->inversionNb++;
+ }
+
+ g_assert(message->inE->type == UDP);
+ rttKey.saddr= message->inE->event.udpEvent->datagramKey->saddr;
+ rttKey.daddr= message->inE->event.udpEvent->datagramKey->daddr;
+ rttInfo= g_hash_table_lookup(analysisData->rttInfo, &rttKey);
+
+ if (rttInfo)
+ {
+ if (tt < *rttInfo / 2.)
+ {
+ messageStats->tooFastNb++;
+ }
+ }
+ else
+ {
+ messageStats->noRTTInfoNb++;
+ }
}
/*
* Perform analysis on multiple messages
*
+ * Measure the RTT
+ *
* Args:
* syncState container for synchronization data
* exchange structure containing the messages
/*
* Perform analysis on muliple events
*
+ * Sum the broadcast differential delays
+ *
* Args:
* syncState container for synchronization data
* broadcast structure containing the events
static void analyzeBroadcastEval(SyncState* const syncState, Broadcast* const broadcast)
{
AnalysisDataEval* analysisData;
+ double sum= 0, squaresSum= 0;
+ double y;
+
+ if (!syncState->stats)
+ {
+ return;
+ }
analysisData= (AnalysisDataEval*) syncState->analysisData;
+
+ g_queue_foreach(broadcast->events, &gfSum, &sum);
+ g_queue_foreach(broadcast->events, &gfSumSquares, &squaresSum);
+
+ analysisData->stats->broadcastNb++;
+ // Because of numerical errors, this can at times be < 0
+ y= squaresSum / g_queue_get_length(broadcast->events) - pow(sum /
+ g_queue_get_length(broadcast->events), 2.);
+ if (y > 0)
+ {
+ analysisData->stats->broadcastDiffSum+= sqrt(y);
+ }
}
printf("Synchronization evaluation analysis stats:\n");
printf("\tsum of broadcast differential delays: %g\n",
analysisData->stats->broadcastDiffSum);
+ printf("\taverage broadcast differential delays: %g\n",
+ analysisData->stats->broadcastDiffSum /
+ analysisData->stats->broadcastNb);
printf("\tIndividual evaluation:\n"
- "\t\tTrace pair Inversions Too fast (No RTT info)\n");
+ "\t\tTrace pair Inversions Too fast (No RTT info) Total\n");
for (i= 0; i < syncState->traceNb; i++)
{
for (j= i + 1; j < syncState->traceNb; j++)
{
- TracePairStats* tpStats;
- const char* format= "\t\t%3d - %-3d %-10u %-10u %u\n";
+ MessageStats* messageStats;
+ const char* format= "\t\t%3d - %-3d %-10u %-10u %-10u %u\n";
- tpStats= &analysisData->stats->allStats[i][j];
+ messageStats= &analysisData->stats->messageStats[i][j];
- printf(format, i, j, tpStats->inversionNb, tpStats->tooFastNb,
- tpStats->noRTTInfoNb);
+ printf(format, i, j, messageStats->inversionNb, messageStats->tooFastNb,
+ messageStats->noRTTInfoNb, messageStats->total);
- tpStats= &analysisData->stats->allStats[j][i];
+ messageStats= &analysisData->stats->messageStats[j][i];
- printf(format, j, i, tpStats->inversionNb, tpStats->tooFastNb,
- tpStats->noRTTInfoNb);
+ printf(format, j, i, messageStats->inversionNb, messageStats->tooFastNb,
+ messageStats->noRTTInfoNb, messageStats->total);
}
}
}
addr.s_addr;
}
+ *rtt/= 1e3;
g_hash_table_insert(rttInfo, rttKey, rtt);
positionStream(rttStream);
free(line);
}
}
+
+
+/*
+ * A GFunc for g_queue_foreach()
+ *
+ * Args:
+ * data Event*, a UDP broadcast event
+ * user_data double*, the running sum
+ *
+ * Returns:
+ * Adds the time of the event to the sum
+ */
+static void gfSum(gpointer data, gpointer userData)
+{
+ Event* event= (Event*) data;
+
+ *(double*) userData+= event->wallTime.seconds + event->wallTime.nanosec /
+ 1e9;
+}
+
+
+/*
+ * A GFunc for g_queue_foreach()
+ *
+ * Args:
+ * data Event*, a UDP broadcast event
+ * user_data double*, the running sum
+ *
+ * Returns:
+ * Adds the square of the time of the event to the sum
+ */
+static void gfSumSquares(gpointer data, gpointer userData)
+{
+ Event* event= (Event*) data;
+
+ *(double*) userData+= pow(event->wallTime.seconds + event->wallTime.nanosec
+ / 1e9, 2.);
+}
LttvTraceHook* traceHook;
LttvTracefileContext* tfc;
LttEvent* event;
- LttTime time;
LttCycleCount tsc;
+ LttTime time;
+ WallTime wTime;
LttTrace* trace;
unsigned long traceNum;
struct marker_info* info;
syncState= (SyncState*) traceHook->hook_data;
processingData= (ProcessingDataLTTVStandard*) syncState->processingData;
event= ltt_tracefile_get_event(tfc->tf);
- time= ltt_event_time(event);
- tsc= trace->drift * ltt_event_cycle_count(event) + trace->offset;
info= marker_get_info_from_id(tfc->tf->mdata, event->event_id);
+ tsc= ltt_event_cycle_count(event);
+ time= ltt_event_time(event);
+ wTime.seconds= time.tv_sec;
+ wTime.nanosec= time.tv_nsec;
g_assert(g_hash_table_lookup_extended(processingData->traceNumTable,
trace, NULL, (gpointer*) &traceNum));
g_debug("XXXX process event: time: %ld.%09ld trace: %ld (%p) name: %s ",
- (long) time.tv_sec, time.tv_nsec, traceNum, trace,
+ time.tv_sec, time.tv_nsec, traceNum, trace,
g_quark_to_string(info->name));
if (info->name == LTT_EVENT_DEV_XMIT_EXTENDED)
outE= malloc(sizeof(Event));
outE->traceNum= traceNum;
- outE->time= tsc;
+ outE->cpuTime= tsc;
+ outE->wallTime= wTime;
outE->type= TCP;
outE->destroy= &destroyTCPEvent;
outE->event.tcpEvent= malloc(sizeof(TCPEvent));
inE= malloc(sizeof(Event));
inE->traceNum= traceNum;
- inE->time= tsc;
+ inE->cpuTime= tsc;
+ inE->wallTime= wTime;
inE->event.tcpEvent= NULL;
inE->destroy= &destroyEvent;