1 /* LTTng user-space "fast" library
3 * This daemon is spawned by each traced thread (to share the mmap).
5 * Its job is to dump periodically this buffer to disk (when it receives a
6 * SIGUSR1 from its parent).
8 * It uses the control information in the shared memory area (producer/consumer
11 * When the parent thread dies (yes, those thing may happen) ;) , this daemon
12 * will flush the last buffer and write it to disk.
14 * Supplement note for streaming : the daemon is responsible for flushing
15 * periodically the buffer if it is streaming data.
19 * shm memory is typically limited to 4096 units (system wide limit SHMMNI in
20 * /proc/sys/kernel/shmmni). As it requires computation time upon creation, we
21 * do not use it : we will use a shared mmap() instead which is passed through
23 * MAP_SHARED mmap segment. Updated when msync or munmap are called.
25 * Memory mapped by mmap() is preserved across fork(2), with the same
28 * Eventually, there will be two mode :
29 * * Slow thread spawn : a fork() is done for each new thread. If the process
30 * dies, the data is not lost.
31 * * Fast thread spawn : a pthread_create() is done by the application for each
34 * We use a timer to check periodically if the parent died. I think it is less
35 * intrusive than a ptrace() on the parent, which would get every signal. The
36 * side effect of this is that we won't be notified if the parent does an
37 * exec(). In this case, we will just sit there until the parent exits.
40 * Copyright 2006 Mathieu Desnoyers
46 #include <sys/types.h>
62 #include <sys/param.h>
66 #include <asm/atomic.h>
67 #include <asm/timex.h> //for get_cycles()
69 _syscall0(pid_t
,gettid
)
71 #include <ltt/ltt-usertrace-fast.h>
73 enum force_switch_mode
{ FORCE_ACTIVE
, FORCE_FLUSH
};
75 /* Writer (the traced application) */
77 __thread
struct ltt_trace_info
*thread_trace_info
= NULL
;
79 void ltt_usertrace_fast_buffer_switch(void)
81 struct ltt_trace_info
*tmp
= thread_trace_info
;
83 kill(tmp
->daemon_id
, SIGUSR1
);
86 /* The cleanup should never be called from a signal handler */
87 static void ltt_usertrace_fast_cleanup(void *arg
)
89 struct ltt_trace_info
*tmp
= thread_trace_info
;
91 thread_trace_info
= NULL
;
92 kill(tmp
->daemon_id
, SIGUSR2
);
93 munmap(tmp
, sizeof(*tmp
));
97 /* Reader (the disk dumper daemon) */
99 static pid_t traced_pid
= 0;
100 static pid_t traced_tid
= 0;
101 static int parent_exited
= 0;
103 /* signal handling */
104 static void handler_sigusr1(int signo
)
106 printf("LTT Signal %d received : parent buffer switch.\n", signo
);
109 static void handler_sigusr2(int signo
)
111 printf("LTT Signal %d received : parent exited.\n", signo
);
115 static void handler_sigalarm(int signo
)
117 printf("LTT Signal %d received\n", signo
);
119 if(getppid() != traced_pid
) {
121 printf("LTT Parent %lu died, cleaning up\n", traced_pid
);
127 /* Do a buffer switch. Don't switch if buffer is completely empty */
128 static void flush_buffer(struct ltt_buf
*ltt_buf
, enum force_switch_mode mode
)
131 int offset_begin
, offset_end
, offset_old
;
132 int reserve_commit_diff
;
133 int consumed_old
, consumed_new
;
134 int commit_count
, reserve_count
;
138 offset_old
= atomic_read(<t_buf
->offset
);
139 offset_begin
= offset_old
;
141 tsc
= ltt_get_timestamp();
143 /* Error in getting the timestamp : should not happen : it would
144 * mean we are called from an NMI during a write seqlock on xtime. */
148 if(SUBBUF_OFFSET(offset_begin
, ltt_buf
) != 0) {
149 offset_begin
= SUBBUF_ALIGN(offset_begin
, ltt_buf
);
152 /* we do not have to switch : buffer is empty */
155 if(mode
== FORCE_ACTIVE
)
156 offset_begin
+= ltt_subbuf_header_len(ltt_buf
);
157 /* Always begin_switch in FORCE_ACTIVE mode */
159 /* Test new buffer integrity */
160 reserve_commit_diff
=
162 <t_buf
->reserve_count
[SUBBUF_INDEX(offset_begin
, ltt_buf
)])
164 <t_buf
->commit_count
[SUBBUF_INDEX(offset_begin
, ltt_buf
)]);
165 if(reserve_commit_diff
== 0) {
166 /* Next buffer not corrupted. */
167 if(mode
== FORCE_ACTIVE
168 && (offset_begin
-atomic_read(<t_buf
->consumed
))
169 >= ltt_buf
->alloc_size
) {
170 /* We do not overwrite non consumed buffers and we are full : ignore
171 switch while tracing is active. */
175 /* Next subbuffer corrupted. Force pushing reader even in normal mode */
178 offset_end
= offset_begin
;
179 } while(atomic_cmpxchg(<t_buf
->offset
, offset_old
, offset_end
)
183 if(mode
== FORCE_ACTIVE
) {
184 /* Push the reader if necessary */
186 consumed_old
= atomic_read(<t_buf
->consumed
);
187 /* If buffer is in overwrite mode, push the reader consumed count if
188 the write position has reached it and we are not at the first
189 iteration (don't push the reader farther than the writer).
190 This operation can be done concurrently by many writers in the
191 same buffer, the writer being at the fartest write position sub-buffer
192 index in the buffer being the one which will win this loop. */
193 /* If the buffer is not in overwrite mode, pushing the reader only
194 happen if a sub-buffer is corrupted */
195 if((SUBBUF_TRUNC(offset_end
, ltt_buf
)
196 - SUBBUF_TRUNC(consumed_old
, ltt_buf
))
197 >= ltt_buf
->alloc_size
)
198 consumed_new
= SUBBUF_ALIGN(consumed_old
, ltt_buf
);
200 consumed_new
= consumed_old
;
203 } while(atomic_cmpxchg(<t_buf
->consumed
, consumed_old
, consumed_new
)
206 if(consumed_old
!= consumed_new
) {
207 /* Reader pushed : we are the winner of the push, we can therefore
208 reequilibrate reserve and commit. Atomic increment of the commit
209 count permits other writers to play around with this variable
210 before us. We keep track of corrupted_subbuffers even in overwrite
212 we never want to write over a non completely committed sub-buffer :
213 possible causes : the buffer size is too low compared to the unordered
214 data input, or there is a writer who died between the reserve and the
216 if(reserve_commit_diff
) {
217 /* We have to alter the sub-buffer commit count : a sub-buffer is
219 atomic_add(reserve_commit_diff
,
220 <t_buf
->commit_count
[SUBBUF_INDEX(offset_begin
, ltt_buf
)]);
221 atomic_inc(<t_buf
->corrupted_subbuffers
);
230 /* Concurrency safe because we are the last and only thread to alter this
231 sub-buffer. As long as it is not delivered and read, no other thread can
232 alter the offset, alter the reserve_count or call the
233 client_buffer_end_callback on this sub-buffer.
234 The only remaining threads could be the ones with pending commits. They
235 will have to do the deliver themself.
236 Not concurrency safe in overwrite mode. We detect corrupted subbuffers with
237 commit and reserve counts. We keep a corrupted sub-buffers count and push
238 the readers across these sub-buffers.
239 Not concurrency safe if a writer is stalled in a subbuffer and
240 another writer switches in, finding out it's corrupted. The result will be
241 than the old (uncommited) subbuffer will be declared corrupted, and that
242 the new subbuffer will be declared corrupted too because of the commit
244 Offset old should never be 0. */
245 ltt_buffer_end_callback(ltt_buf
, tsc
, offset_old
,
246 SUBBUF_INDEX((offset_old
), ltt_buf
));
247 /* Setting this reserve_count will allow the sub-buffer to be delivered by
248 the last committer. */
249 reserve_count
= atomic_add_return((SUBBUF_OFFSET((offset_old
-1),
251 <t_buf
->reserve_count
[SUBBUF_INDEX((offset_old
),
253 if(reserve_count
== atomic_read(
254 <t_buf
->commit_count
[SUBBUF_INDEX((offset_old
), ltt_buf
)])) {
255 ltt_deliver_callback(ltt_buf
, SUBBUF_INDEX((offset_old
), ltt_buf
), NULL
);
259 if(mode
== FORCE_ACTIVE
) {
261 /* This code can be executed unordered : writers may already have written
262 to the sub-buffer before this code gets executed, caution. */
263 /* The commit makes sure that this code is executed before the deliver
264 of this sub-buffer */
265 ltt_buffer_begin_callback(ltt_buf
, tsc
, SUBBUF_INDEX(offset_begin
, ltt_buf
));
266 commit_count
= atomic_add_return(ltt_subbuf_header_len(ltt_buf
),
267 <t_buf
->commit_count
[SUBBUF_INDEX(offset_begin
, ltt_buf
)]);
268 /* Check if the written buffer has to be delivered */
269 if(commit_count
== atomic_read(
270 <t_buf
->reserve_count
[SUBBUF_INDEX(offset_begin
, ltt_buf
)])) {
271 ltt_deliver_callback(ltt_buf
, SUBBUF_INDEX(offset_begin
, ltt_buf
), NULL
);
277 static inline int ltt_buffer_get(struct ltt_buf
*ltt_buf
,
278 unsigned int *offset
)
280 unsigned int consumed_old
, consumed_idx
;
281 consumed_old
= atomic_read(<t_buf
->consumed
);
282 consumed_idx
= SUBBUF_INDEX(consumed_old
, ltt_buf
);
284 if(atomic_read(<t_buf
->commit_count
[consumed_idx
])
285 != atomic_read(<t_buf
->reserve_count
[consumed_idx
])) {
288 if((SUBBUF_TRUNC(atomic_read(<t_buf
->offset
), ltt_buf
)
289 -SUBBUF_TRUNC(consumed_old
, ltt_buf
)) == 0) {
293 *offset
= consumed_old
;
298 static inline int ltt_buffer_put(struct ltt_buf
*ltt_buf
,
301 unsigned int consumed_old
, consumed_new
;
304 consumed_old
= offset
;
305 consumed_new
= SUBBUF_ALIGN(consumed_old
, ltt_buf
);
306 if(atomic_cmpxchg(<t_buf
->consumed
, consumed_old
, consumed_new
)
308 /* We have been pushed by the writer : the last buffer read _is_
310 * It can also happen if this is a buffer we never got. */
313 if(atomic_read(<t_buf
->full
) == 1) {
314 /* tell the client that buffer is now unfull */
315 ret
= futex((unsigned long)<t_buf
->full
,
316 FUTEX_WAKE
, 1, 0, 0, 0);
318 printf("LTT warning : race condition : writer not waiting or too many writers\n");
320 atomic_set(<t_buf
->full
, 0);
325 static int read_subbuffer(struct ltt_buf
*ltt_buf
, int fd
)
327 unsigned int consumed_old
;
329 printf("LTT read buffer\n");
332 err
= ltt_buffer_get(ltt_buf
, &consumed_old
);
334 if(err
!= -EAGAIN
) printf("LTT Reserving sub buffer failed\n");
338 err
= TEMP_FAILURE_RETRY(write(fd
,
340 + (consumed_old
& ((ltt_buf
->alloc_size
)-1)),
341 ltt_buf
->subbuf_size
));
344 perror("Error in writing to file");
348 err
= fsync(pair
->trace
);
351 perror("Error in writing to file");
356 err
= ltt_buffer_put(ltt_buf
, consumed_old
);
360 printf("Reader has been pushed by the writer, last subbuffer corrupted.\n");
361 /* FIXME : we may delete the last written buffer if we wish. */
370 /* This function is called by ltt_rw_init which has signals blocked */
371 static void ltt_usertrace_fast_daemon(struct ltt_trace_info
*shared_trace_info
,
372 sigset_t oldset
, pid_t l_traced_pid
, pthread_t l_traced_tid
)
374 struct sigaction act
;
378 char outfile_name
[PATH_MAX
];
379 char identifier_name
[PATH_MAX
];
382 traced_pid
= l_traced_pid
;
383 traced_tid
= l_traced_tid
;
385 printf("LTT ltt_usertrace_fast_daemon : init is %d, pid is %lu, traced_pid is %lu, traced_tid is %lu\n",
386 shared_trace_info
->init
, getpid(), traced_pid
, traced_tid
);
388 act
.sa_handler
= handler_sigusr1
;
390 sigemptyset(&(act
.sa_mask
));
391 sigaddset(&(act
.sa_mask
), SIGUSR1
);
392 sigaction(SIGUSR1
, &act
, NULL
);
394 act
.sa_handler
= handler_sigusr2
;
396 sigemptyset(&(act
.sa_mask
));
397 sigaddset(&(act
.sa_mask
), SIGUSR2
);
398 sigaction(SIGUSR2
, &act
, NULL
);
400 act
.sa_handler
= handler_sigalarm
;
402 sigemptyset(&(act
.sa_mask
));
403 sigaddset(&(act
.sa_mask
), SIGALRM
);
404 sigaction(SIGALRM
, &act
, NULL
);
407 ret
= pthread_sigmask(SIG_SETMASK
, &oldset
, NULL
);
409 printf("LTT Error in pthread_sigmask\n");
414 /* Open output files */
416 ret
= mkdir(LTT_USERTRACE_ROOT
, 0777);
417 if(ret
< 0 && errno
!= EEXIST
) {
418 perror("LTT Error in creating output (mkdir)");
421 ret
= chdir(LTT_USERTRACE_ROOT
);
423 perror("LTT Error in creating output (chdir)");
426 snprintf(identifier_name
, PATH_MAX
-1, "%lu.%lu.%llu",
427 traced_tid
, traced_pid
, get_cycles());
428 snprintf(outfile_name
, PATH_MAX
-1, "facilities-%s", identifier_name
);
429 fd_fac
= creat(outfile_name
, 0644);
431 snprintf(outfile_name
, PATH_MAX
-1, "cpu-%s", identifier_name
);
432 fd_cpu
= creat(outfile_name
, 0644);
437 if(traced_pid
== 0) break; /* parent died */
438 if(parent_exited
) break;
439 printf("LTT Doing a buffer switch read. pid is : %lu\n", getpid());
442 ret
= read_subbuffer(&shared_trace_info
->channel
.cpu
, fd_cpu
);
446 ret
= read_subbuffer(&shared_trace_info
->channel
.facilities
, fd_fac
);
450 /* The parent thread is dead and we have finished with the buffer */
452 /* Buffer force switch (flush). Using FLUSH instead of ACTIVE because we know
453 * there is no writer. */
454 flush_buffer(&shared_trace_info
->channel
.cpu
, FORCE_FLUSH
);
456 ret
= read_subbuffer(&shared_trace_info
->channel
.cpu
, fd_cpu
);
460 flush_buffer(&shared_trace_info
->channel
.facilities
, FORCE_FLUSH
);
462 ret
= read_subbuffer(&shared_trace_info
->channel
.facilities
, fd_fac
);
468 munmap(shared_trace_info
, sizeof(*shared_trace_info
));
474 /* Reader-writer initialization */
476 static enum ltt_process_role
{ LTT_ROLE_WRITER
, LTT_ROLE_READER
}
477 role
= LTT_ROLE_WRITER
;
480 void ltt_rw_init(void)
483 struct ltt_trace_info
*shared_trace_info
;
485 sigset_t set
, oldset
;
486 pid_t l_traced_pid
= getpid();
487 pid_t l_traced_tid
= gettid();
489 /* parent : create the shared memory map */
490 shared_trace_info
= mmap(0, sizeof(*thread_trace_info
),
491 PROT_READ
|PROT_WRITE
, MAP_SHARED
|MAP_ANONYMOUS
, 0, 0);
492 memset(shared_trace_info
, 0, sizeof(*shared_trace_info
));
493 /* Tricky semaphore : is in a shared memory space, so it's ok for a fast
495 atomic_set(&shared_trace_info
->channel
.facilities
.full
, 0);
496 shared_trace_info
->channel
.facilities
.alloc_size
= LTT_BUF_SIZE_FACILITIES
;
497 shared_trace_info
->channel
.facilities
.subbuf_size
= LTT_SUBBUF_SIZE_FACILITIES
;
498 shared_trace_info
->channel
.facilities
.start
=
499 shared_trace_info
->channel
.facilities_buf
;
500 ltt_buffer_begin_callback(&shared_trace_info
->channel
.facilities
,
501 ltt_get_timestamp(), 0);
503 atomic_set(&shared_trace_info
->channel
.cpu
.full
, 0);
504 shared_trace_info
->channel
.cpu
.alloc_size
= LTT_BUF_SIZE_CPU
;
505 shared_trace_info
->channel
.cpu
.subbuf_size
= LTT_SUBBUF_SIZE_CPU
;
506 shared_trace_info
->channel
.cpu
.start
= shared_trace_info
->channel
.cpu_buf
;
507 ltt_buffer_begin_callback(&shared_trace_info
->channel
.cpu
,
508 ltt_get_timestamp(), 0);
510 shared_trace_info
->init
= 1;
512 /* Disable signals */
513 ret
= sigfillset(&set
);
515 printf("LTT Error in sigfillset\n");
519 ret
= pthread_sigmask(SIG_BLOCK
, &set
, &oldset
);
521 printf("LTT Error in pthread_sigmask\n");
527 shared_trace_info
->daemon_id
= pid
;
528 thread_trace_info
= shared_trace_info
;
531 ret
= pthread_sigmask(SIG_SETMASK
, &oldset
, NULL
);
533 printf("LTT Error in pthread_sigmask\n");
535 } else if(pid
== 0) {
538 role
= LTT_ROLE_READER
;
541 perror("Error setting sid");
543 ltt_usertrace_fast_daemon(shared_trace_info
, oldset
, l_traced_pid
,
545 /* Should never return */
549 perror("LTT Error in forking ltt-usertrace-fast");
553 static __thread
struct _pthread_cleanup_buffer cleanup_buffer
;
555 void ltt_thread_init(void)
557 _pthread_cleanup_push(&cleanup_buffer
, ltt_usertrace_fast_cleanup
, NULL
);
561 void __attribute__((constructor
)) __ltt_usertrace_fast_init(void)
563 printf("LTT usertrace-fast init\n");
568 void __attribute__((destructor
)) __ltt_usertrace_fast_fini(void)
570 if(role
== LTT_ROLE_WRITER
) {
571 printf("LTT usertrace-fast fini\n");
572 ltt_usertrace_fast_cleanup(NULL
);