1 /* LTTng user-space "fast" library
3 * This daemon is spawned by each traced thread (to share the mmap).
5 * Its job is to dump periodically this buffer to disk (when it receives a
6 * SIGUSR1 from its parent).
8 * It uses the control information in the shared memory area (producer/consumer
11 * When the parent thread dies (yes, those thing may happen) ;) , this daemon
12 * will flush the last buffer and write it to disk.
14 * Supplement note for streaming : the daemon is responsible for flushing
15 * periodically the buffer if it is streaming data.
19 * shm memory is typically limited to 4096 units (system wide limit SHMMNI in
20 * /proc/sys/kernel/shmmni). As it requires computation time upon creation, we
21 * do not use it : we will use a shared mmap() instead which is passed through
23 * MAP_SHARED mmap segment. Updated when msync or munmap are called.
25 * Memory mapped by mmap() is preserved across fork(2), with the same
28 * Eventually, there will be two mode :
29 * * Slow thread spawn : a fork() is done for each new thread. If the process
30 * dies, the data is not lost.
31 * * Fast thread spawn : a pthread_create() is done by the application for each
34 * We use a timer to check periodically if the parent died. I think it is less
35 * intrusive than a ptrace() on the parent, which would get every signal. The
36 * side effect of this is that we won't be notified if the parent does an
37 * exec(). In this case, we will just sit there until the parent exits.
40 * Copyright 2006 Mathieu Desnoyers
44 #define inline inline __attribute__((always_inline))
48 #include <sys/types.h>
64 #include <sys/param.h>
68 #include <asm/atomic.h>
69 #include <asm/timex.h> //for get_cycles()
71 _syscall0(pid_t
,gettid
)
73 #include <ltt/ltt-usertrace-fast.h>
76 #define dbg_printf(...) dbg_printf(__VA_ARGS__)
78 #define dbg_printf(...)
79 #endif //LTT_SHOW_DEBUG
82 enum force_switch_mode
{ FORCE_ACTIVE
, FORCE_FLUSH
};
84 /* Writer (the traced application) */
86 __thread
struct ltt_trace_info
*thread_trace_info
= NULL
;
88 void ltt_usertrace_fast_buffer_switch(void)
90 struct ltt_trace_info
*tmp
= thread_trace_info
;
92 kill(tmp
->daemon_id
, SIGUSR1
);
95 /* The cleanup should never be called from a signal handler */
96 static void ltt_usertrace_fast_cleanup(void *arg
)
98 struct ltt_trace_info
*tmp
= thread_trace_info
;
100 thread_trace_info
= NULL
;
101 kill(tmp
->daemon_id
, SIGUSR2
);
102 munmap(tmp
, sizeof(*tmp
));
106 /* Reader (the disk dumper daemon) */
108 static pid_t traced_pid
= 0;
109 static pid_t traced_tid
= 0;
110 static int parent_exited
= 0;
112 /* signal handling */
113 static void handler_sigusr1(int signo
)
115 dbg_printf("LTT Signal %d received : parent buffer switch.\n", signo
);
118 static void handler_sigusr2(int signo
)
120 dbg_printf("LTT Signal %d received : parent exited.\n", signo
);
124 static void handler_sigalarm(int signo
)
126 dbg_printf("LTT Signal %d received\n", signo
);
128 if(getppid() != traced_pid
) {
130 dbg_printf("LTT Parent %lu died, cleaning up\n", traced_pid
);
136 /* Do a buffer switch. Don't switch if buffer is completely empty */
137 static void flush_buffer(struct ltt_buf
*ltt_buf
, enum force_switch_mode mode
)
140 int offset_begin
, offset_end
, offset_old
;
141 int reserve_commit_diff
;
142 int consumed_old
, consumed_new
;
143 int commit_count
, reserve_count
;
147 offset_old
= atomic_read(<t_buf
->offset
);
148 offset_begin
= offset_old
;
150 tsc
= ltt_get_timestamp();
152 /* Error in getting the timestamp : should not happen : it would
153 * mean we are called from an NMI during a write seqlock on xtime. */
157 if(SUBBUF_OFFSET(offset_begin
, ltt_buf
) != 0) {
158 offset_begin
= SUBBUF_ALIGN(offset_begin
, ltt_buf
);
161 /* we do not have to switch : buffer is empty */
164 if(mode
== FORCE_ACTIVE
)
165 offset_begin
+= ltt_subbuf_header_len(ltt_buf
);
166 /* Always begin_switch in FORCE_ACTIVE mode */
168 /* Test new buffer integrity */
169 reserve_commit_diff
=
171 <t_buf
->reserve_count
[SUBBUF_INDEX(offset_begin
, ltt_buf
)])
173 <t_buf
->commit_count
[SUBBUF_INDEX(offset_begin
, ltt_buf
)]);
174 if(reserve_commit_diff
== 0) {
175 /* Next buffer not corrupted. */
176 if(mode
== FORCE_ACTIVE
177 && (offset_begin
-atomic_read(<t_buf
->consumed
))
178 >= ltt_buf
->alloc_size
) {
179 /* We do not overwrite non consumed buffers and we are full : ignore
180 switch while tracing is active. */
184 /* Next subbuffer corrupted. Force pushing reader even in normal mode */
187 offset_end
= offset_begin
;
188 } while(atomic_cmpxchg(<t_buf
->offset
, offset_old
, offset_end
)
192 if(mode
== FORCE_ACTIVE
) {
193 /* Push the reader if necessary */
195 consumed_old
= atomic_read(<t_buf
->consumed
);
196 /* If buffer is in overwrite mode, push the reader consumed count if
197 the write position has reached it and we are not at the first
198 iteration (don't push the reader farther than the writer).
199 This operation can be done concurrently by many writers in the
200 same buffer, the writer being at the fartest write position sub-buffer
201 index in the buffer being the one which will win this loop. */
202 /* If the buffer is not in overwrite mode, pushing the reader only
203 happen if a sub-buffer is corrupted */
204 if((SUBBUF_TRUNC(offset_end
, ltt_buf
)
205 - SUBBUF_TRUNC(consumed_old
, ltt_buf
))
206 >= ltt_buf
->alloc_size
)
207 consumed_new
= SUBBUF_ALIGN(consumed_old
, ltt_buf
);
209 consumed_new
= consumed_old
;
212 } while(atomic_cmpxchg(<t_buf
->consumed
, consumed_old
, consumed_new
)
215 if(consumed_old
!= consumed_new
) {
216 /* Reader pushed : we are the winner of the push, we can therefore
217 reequilibrate reserve and commit. Atomic increment of the commit
218 count permits other writers to play around with this variable
219 before us. We keep track of corrupted_subbuffers even in overwrite
221 we never want to write over a non completely committed sub-buffer :
222 possible causes : the buffer size is too low compared to the unordered
223 data input, or there is a writer who died between the reserve and the
225 if(reserve_commit_diff
) {
226 /* We have to alter the sub-buffer commit count : a sub-buffer is
228 atomic_add(reserve_commit_diff
,
229 <t_buf
->commit_count
[SUBBUF_INDEX(offset_begin
, ltt_buf
)]);
230 atomic_inc(<t_buf
->corrupted_subbuffers
);
239 /* Concurrency safe because we are the last and only thread to alter this
240 sub-buffer. As long as it is not delivered and read, no other thread can
241 alter the offset, alter the reserve_count or call the
242 client_buffer_end_callback on this sub-buffer.
243 The only remaining threads could be the ones with pending commits. They
244 will have to do the deliver themself.
245 Not concurrency safe in overwrite mode. We detect corrupted subbuffers with
246 commit and reserve counts. We keep a corrupted sub-buffers count and push
247 the readers across these sub-buffers.
248 Not concurrency safe if a writer is stalled in a subbuffer and
249 another writer switches in, finding out it's corrupted. The result will be
250 than the old (uncommited) subbuffer will be declared corrupted, and that
251 the new subbuffer will be declared corrupted too because of the commit
253 Offset old should never be 0. */
254 ltt_buffer_end_callback(ltt_buf
, tsc
, offset_old
,
255 SUBBUF_INDEX((offset_old
), ltt_buf
));
256 /* Setting this reserve_count will allow the sub-buffer to be delivered by
257 the last committer. */
258 reserve_count
= atomic_add_return((SUBBUF_OFFSET((offset_old
-1),
260 <t_buf
->reserve_count
[SUBBUF_INDEX((offset_old
),
262 if(reserve_count
== atomic_read(
263 <t_buf
->commit_count
[SUBBUF_INDEX((offset_old
), ltt_buf
)])) {
264 ltt_deliver_callback(ltt_buf
, SUBBUF_INDEX((offset_old
), ltt_buf
), NULL
);
268 if(mode
== FORCE_ACTIVE
) {
270 /* This code can be executed unordered : writers may already have written
271 to the sub-buffer before this code gets executed, caution. */
272 /* The commit makes sure that this code is executed before the deliver
273 of this sub-buffer */
274 ltt_buffer_begin_callback(ltt_buf
, tsc
, SUBBUF_INDEX(offset_begin
, ltt_buf
));
275 commit_count
= atomic_add_return(ltt_subbuf_header_len(ltt_buf
),
276 <t_buf
->commit_count
[SUBBUF_INDEX(offset_begin
, ltt_buf
)]);
277 /* Check if the written buffer has to be delivered */
278 if(commit_count
== atomic_read(
279 <t_buf
->reserve_count
[SUBBUF_INDEX(offset_begin
, ltt_buf
)])) {
280 ltt_deliver_callback(ltt_buf
, SUBBUF_INDEX(offset_begin
, ltt_buf
), NULL
);
286 static inline int ltt_buffer_get(struct ltt_buf
*ltt_buf
,
287 unsigned int *offset
)
289 unsigned int consumed_old
, consumed_idx
;
290 consumed_old
= atomic_read(<t_buf
->consumed
);
291 consumed_idx
= SUBBUF_INDEX(consumed_old
, ltt_buf
);
293 if(atomic_read(<t_buf
->commit_count
[consumed_idx
])
294 != atomic_read(<t_buf
->reserve_count
[consumed_idx
])) {
297 if((SUBBUF_TRUNC(atomic_read(<t_buf
->offset
), ltt_buf
)
298 -SUBBUF_TRUNC(consumed_old
, ltt_buf
)) == 0) {
302 *offset
= consumed_old
;
307 static inline int ltt_buffer_put(struct ltt_buf
*ltt_buf
,
310 unsigned int consumed_old
, consumed_new
;
313 consumed_old
= offset
;
314 consumed_new
= SUBBUF_ALIGN(consumed_old
, ltt_buf
);
315 if(atomic_cmpxchg(<t_buf
->consumed
, consumed_old
, consumed_new
)
317 /* We have been pushed by the writer : the last buffer read _is_
319 * It can also happen if this is a buffer we never got. */
322 ret
= sem_post(<t_buf
->writer_sem
);
324 printf("error in sem_post");
329 static int read_subbuffer(struct ltt_buf
*ltt_buf
, int fd
)
331 unsigned int consumed_old
;
333 dbg_printf("LTT read buffer\n");
336 err
= ltt_buffer_get(ltt_buf
, &consumed_old
);
338 if(err
!= -EAGAIN
) dbg_printf("LTT Reserving sub buffer failed\n");
342 err
= TEMP_FAILURE_RETRY(write(fd
,
344 + (consumed_old
& ((ltt_buf
->alloc_size
)-1)),
345 ltt_buf
->subbuf_size
));
348 perror("Error in writing to file");
352 err
= fsync(pair
->trace
);
355 perror("Error in writing to file");
360 err
= ltt_buffer_put(ltt_buf
, consumed_old
);
364 dbg_printf("Reader has been pushed by the writer, last subbuffer corrupted.\n");
365 /* FIXME : we may delete the last written buffer if we wish. */
374 /* This function is called by ltt_rw_init which has signals blocked */
375 static void ltt_usertrace_fast_daemon(struct ltt_trace_info
*shared_trace_info
,
376 sigset_t oldset
, pid_t l_traced_pid
, pthread_t l_traced_tid
)
379 struct sigaction act
;
382 char outfile_name
[PATH_MAX
];
383 char identifier_name
[PATH_MAX
];
386 traced_pid
= l_traced_pid
;
387 traced_tid
= l_traced_tid
;
389 dbg_printf("LTT ltt_usertrace_fast_daemon : init is %d, pid is %lu, traced_pid is %lu, traced_tid is %lu\n",
390 shared_trace_info
->init
, getpid(), traced_pid
, traced_tid
);
392 act
.sa_handler
= handler_sigusr1
;
394 sigemptyset(&(act
.sa_mask
));
395 sigaddset(&(act
.sa_mask
), SIGUSR1
);
396 sigaction(SIGUSR1
, &act
, NULL
);
398 act
.sa_handler
= handler_sigusr2
;
400 sigemptyset(&(act
.sa_mask
));
401 sigaddset(&(act
.sa_mask
), SIGUSR2
);
402 sigaction(SIGUSR2
, &act
, NULL
);
404 act
.sa_handler
= handler_sigalarm
;
406 sigemptyset(&(act
.sa_mask
));
407 sigaddset(&(act
.sa_mask
), SIGALRM
);
408 sigaction(SIGALRM
, &act
, NULL
);
412 /* Open output files */
414 ret
= mkdir(LTT_USERTRACE_ROOT
, 0777);
415 if(ret
< 0 && errno
!= EEXIST
) {
416 perror("LTT Error in creating output (mkdir)");
419 ret
= chdir(LTT_USERTRACE_ROOT
);
421 perror("LTT Error in creating output (chdir)");
424 snprintf(identifier_name
, PATH_MAX
-1, "%lu.%lu.%llu",
425 traced_tid
, traced_pid
, get_cycles());
426 snprintf(outfile_name
, PATH_MAX
-1, "process-%s", identifier_name
);
427 #ifndef LTT_NULL_OUTPUT_TEST
428 fd_process
= creat(outfile_name
, 0644);
431 ret
= symlink("/dev/null", outfile_name
);
433 perror("error in symlink");
435 fd_process
= open(outfile_name
, O_WRONLY
);
437 perror("Error in open");
439 #endif //LTT_NULL_OUTPUT_TEST
442 ret
= sigsuspend(&oldset
);
444 perror("LTT Error in sigsuspend\n");
447 if(traced_pid
== 0) break; /* parent died */
448 if(parent_exited
) break;
449 dbg_printf("LTT Doing a buffer switch read. pid is : %lu\n", getpid());
452 ret
= read_subbuffer(&shared_trace_info
->channel
.process
, fd_process
);
456 /* The parent thread is dead and we have finished with the buffer */
458 /* Buffer force switch (flush). Using FLUSH instead of ACTIVE because we know
459 * there is no writer. */
460 flush_buffer(&shared_trace_info
->channel
.process
, FORCE_FLUSH
);
462 ret
= read_subbuffer(&shared_trace_info
->channel
.process
, fd_process
);
468 ret
= sem_destroy(&shared_trace_info
->channel
.process
.writer_sem
);
470 perror("error in sem_destroy");
472 munmap(shared_trace_info
, sizeof(*shared_trace_info
));
478 /* Reader-writer initialization */
480 static enum ltt_process_role
{ LTT_ROLE_WRITER
, LTT_ROLE_READER
}
481 role
= LTT_ROLE_WRITER
;
484 void ltt_rw_init(void)
487 struct ltt_trace_info
*shared_trace_info
;
489 sigset_t set
, oldset
;
490 pid_t l_traced_pid
= getpid();
491 pid_t l_traced_tid
= gettid();
493 /* parent : create the shared memory map */
494 shared_trace_info
= mmap(0, sizeof(*thread_trace_info
),
495 PROT_READ
|PROT_WRITE
, MAP_SHARED
|MAP_ANONYMOUS
, 0, 0);
496 shared_trace_info
->init
=0;
497 shared_trace_info
->filter
=0;
498 shared_trace_info
->daemon_id
=0;
499 shared_trace_info
->nesting
=0;
500 memset(&shared_trace_info
->channel
.process
, 0,
501 sizeof(shared_trace_info
->channel
.process
));
503 ret
= sem_init(&shared_trace_info
->channel
.process
.writer_sem
, 1,
506 perror("error in sem_init");
508 shared_trace_info
->channel
.process
.alloc_size
= LTT_BUF_SIZE_PROCESS
;
509 shared_trace_info
->channel
.process
.subbuf_size
= LTT_SUBBUF_SIZE_PROCESS
;
510 shared_trace_info
->channel
.process
.start
=
511 shared_trace_info
->channel
.process_buf
;
512 ltt_buffer_begin_callback(&shared_trace_info
->channel
.process
,
513 ltt_get_timestamp(), 0);
515 shared_trace_info
->init
= 1;
517 /* Disable signals */
518 ret
= sigfillset(&set
);
520 dbg_printf("LTT Error in sigfillset\n");
524 ret
= pthread_sigmask(SIG_BLOCK
, &set
, &oldset
);
526 dbg_printf("LTT Error in pthread_sigmask\n");
532 shared_trace_info
->daemon_id
= pid
;
533 thread_trace_info
= shared_trace_info
;
536 ret
= pthread_sigmask(SIG_SETMASK
, &oldset
, NULL
);
538 dbg_printf("LTT Error in pthread_sigmask\n");
540 } else if(pid
== 0) {
543 role
= LTT_ROLE_READER
;
545 //Not a good idea to renice, unless futex wait eventually implement
546 //priority inheritence.
549 // perror("Error in nice");
552 perror("Error setting sid");
554 ltt_usertrace_fast_daemon(shared_trace_info
, oldset
, l_traced_pid
,
556 /* Should never return */
560 perror("LTT Error in forking ltt-usertrace-fast");
564 static __thread
struct _pthread_cleanup_buffer cleanup_buffer
;
566 void ltt_thread_init(void)
568 _pthread_cleanup_push(&cleanup_buffer
, ltt_usertrace_fast_cleanup
, NULL
);
572 void __attribute__((constructor
)) __ltt_usertrace_fast_init(void)
574 dbg_printf("LTT usertrace-fast init\n");
579 void __attribute__((destructor
)) __ltt_usertrace_fast_fini(void)
581 if(role
== LTT_ROLE_WRITER
) {
582 dbg_printf("LTT usertrace-fast fini\n");
583 ltt_usertrace_fast_cleanup(NULL
);