1 /* LTTng user-space "fast" library
3 * This daemon is spawned by each traced thread (to share the mmap).
5 * Its job is to dump periodically this buffer to disk (when it receives a
6 * SIGUSR1 from its parent).
8 * It uses the control information in the shared memory area (producer/consumer
11 * When the parent thread dies (yes, those thing may happen) ;) , this daemon
12 * will flush the last buffer and write it to disk.
14 * Supplement note for streaming : the daemon is responsible for flushing
15 * periodically the buffer if it is streaming data.
19 * shm memory is typically limited to 4096 units (system wide limit SHMMNI in
20 * /proc/sys/kernel/shmmni). As it requires computation time upon creation, we
21 * do not use it : we will use a shared mmap() instead which is passed through
23 * MAP_SHARED mmap segment. Updated when msync or munmap are called.
25 * Memory mapped by mmap() is preserved across fork(2), with the same
28 * Eventually, there will be two mode :
29 * * Slow thread spawn : a fork() is done for each new thread. If the process
30 * dies, the data is not lost.
31 * * Fast thread spawn : a pthread_create() is done by the application for each
34 * We use a timer to check periodically if the parent died. I think it is less
35 * intrusive than a ptrace() on the parent, which would get every signal. The
36 * side effect of this is that we won't be notified if the parent does an
37 * exec(). In this case, we will just sit there until the parent exits.
40 * Copyright 2006 Mathieu Desnoyers
45 #include <sys/types.h>
61 #include <sys/param.h>
64 #include <asm/atomic.h>
65 #include <asm/timex.h> //for get_cycles()
67 #include "ltt-usertrace-fast.h"
69 enum force_switch_mode
{ FORCE_ACTIVE
, FORCE_FLUSH
};
71 /* Writer (the traced application) */
73 __thread
struct ltt_trace_info
*thread_trace_info
= NULL
;
75 void ltt_usertrace_fast_buffer_switch(void)
77 struct ltt_trace_info
*tmp
= thread_trace_info
;
79 kill(tmp
->daemon_id
, SIGUSR1
);
82 /* The cleanup should never be called from a signal handler */
83 static void ltt_usertrace_fast_cleanup(void *arg
)
85 struct ltt_trace_info
*tmp
= thread_trace_info
;
87 thread_trace_info
= NULL
;
88 kill(tmp
->daemon_id
, SIGUSR2
);
89 munmap(tmp
, sizeof(*tmp
));
93 /* Reader (the disk dumper daemon) */
95 static pid_t traced_pid
= 0;
96 static pid_t traced_tid
= 0;
97 static int parent_exited
= 0;
100 static void handler_sigusr1(int signo
)
102 printf("LTT Signal %d received : parent buffer switch.\n", signo
);
105 static void handler_sigusr2(int signo
)
107 printf("LTT Signal %d received : parent exited.\n", signo
);
111 static void handler_sigalarm(int signo
)
113 printf("LTT Signal %d received\n", signo
);
115 if(getppid() != traced_pid
) {
117 printf("LTT Parent %lu died, cleaning up\n", traced_pid
);
123 /* Do a buffer switch. Don't switch if buffer is completely empty */
124 static void flush_buffer(struct ltt_buf
*ltt_buf
, enum force_switch_mode mode
)
127 int offset_begin
, offset_end
, offset_old
;
128 int reserve_commit_diff
;
129 int consumed_old
, consumed_new
;
130 int commit_count
, reserve_count
;
134 offset_old
= atomic_read(<t_buf
->offset
);
135 offset_begin
= offset_old
;
137 tsc
= ltt_get_timestamp();
139 /* Error in getting the timestamp : should not happen : it would
140 * mean we are called from an NMI during a write seqlock on xtime. */
144 if(SUBBUF_OFFSET(offset_begin
, ltt_buf
) != 0) {
145 offset_begin
= SUBBUF_ALIGN(offset_begin
, ltt_buf
);
148 /* we do not have to switch : buffer is empty */
151 if(mode
== FORCE_ACTIVE
)
152 offset_begin
+= ltt_subbuf_header_len(ltt_buf
);
153 /* Always begin_switch in FORCE_ACTIVE mode */
155 /* Test new buffer integrity */
156 reserve_commit_diff
=
158 <t_buf
->reserve_count
[SUBBUF_INDEX(offset_begin
, ltt_buf
)])
160 <t_buf
->commit_count
[SUBBUF_INDEX(offset_begin
, ltt_buf
)]);
161 if(reserve_commit_diff
== 0) {
162 /* Next buffer not corrupted. */
163 if(mode
== FORCE_ACTIVE
164 && (offset_begin
-atomic_read(<t_buf
->consumed
))
165 >= ltt_buf
->alloc_size
) {
166 /* We do not overwrite non consumed buffers and we are full : ignore
167 switch while tracing is active. */
171 /* Next subbuffer corrupted. Force pushing reader even in normal mode */
174 offset_end
= offset_begin
;
175 } while(atomic_cmpxchg(<t_buf
->offset
, offset_old
, offset_end
)
179 if(mode
== FORCE_ACTIVE
) {
180 /* Push the reader if necessary */
182 consumed_old
= atomic_read(<t_buf
->consumed
);
183 /* If buffer is in overwrite mode, push the reader consumed count if
184 the write position has reached it and we are not at the first
185 iteration (don't push the reader farther than the writer).
186 This operation can be done concurrently by many writers in the
187 same buffer, the writer being at the fartest write position sub-buffer
188 index in the buffer being the one which will win this loop. */
189 /* If the buffer is not in overwrite mode, pushing the reader only
190 happen if a sub-buffer is corrupted */
191 if((SUBBUF_TRUNC(offset_end
, ltt_buf
)
192 - SUBBUF_TRUNC(consumed_old
, ltt_buf
))
193 >= ltt_buf
->alloc_size
)
194 consumed_new
= SUBBUF_ALIGN(consumed_old
, ltt_buf
);
196 consumed_new
= consumed_old
;
199 } while(atomic_cmpxchg(<t_buf
->consumed
, consumed_old
, consumed_new
)
202 if(consumed_old
!= consumed_new
) {
203 /* Reader pushed : we are the winner of the push, we can therefore
204 reequilibrate reserve and commit. Atomic increment of the commit
205 count permits other writers to play around with this variable
206 before us. We keep track of corrupted_subbuffers even in overwrite
208 we never want to write over a non completely committed sub-buffer :
209 possible causes : the buffer size is too low compared to the unordered
210 data input, or there is a writer who died between the reserve and the
212 if(reserve_commit_diff
) {
213 /* We have to alter the sub-buffer commit count : a sub-buffer is
215 atomic_add(reserve_commit_diff
,
216 <t_buf
->commit_count
[SUBBUF_INDEX(offset_begin
, ltt_buf
)]);
217 atomic_inc(<t_buf
->corrupted_subbuffers
);
226 /* Concurrency safe because we are the last and only thread to alter this
227 sub-buffer. As long as it is not delivered and read, no other thread can
228 alter the offset, alter the reserve_count or call the
229 client_buffer_end_callback on this sub-buffer.
230 The only remaining threads could be the ones with pending commits. They
231 will have to do the deliver themself.
232 Not concurrency safe in overwrite mode. We detect corrupted subbuffers with
233 commit and reserve counts. We keep a corrupted sub-buffers count and push
234 the readers across these sub-buffers.
235 Not concurrency safe if a writer is stalled in a subbuffer and
236 another writer switches in, finding out it's corrupted. The result will be
237 than the old (uncommited) subbuffer will be declared corrupted, and that
238 the new subbuffer will be declared corrupted too because of the commit
240 Offset old should never be 0. */
241 ltt_buffer_end_callback(ltt_buf
, tsc
, offset_old
,
242 SUBBUF_INDEX((offset_old
), ltt_buf
));
243 /* Setting this reserve_count will allow the sub-buffer to be delivered by
244 the last committer. */
245 reserve_count
= atomic_add_return((SUBBUF_OFFSET((offset_old
-1),
247 <t_buf
->reserve_count
[SUBBUF_INDEX((offset_old
),
249 if(reserve_count
== atomic_read(
250 <t_buf
->commit_count
[SUBBUF_INDEX((offset_old
), ltt_buf
)])) {
251 ltt_deliver_callback(ltt_buf
, SUBBUF_INDEX((offset_old
), ltt_buf
), NULL
);
255 if(mode
== FORCE_ACTIVE
) {
257 /* This code can be executed unordered : writers may already have written
258 to the sub-buffer before this code gets executed, caution. */
259 /* The commit makes sure that this code is executed before the deliver
260 of this sub-buffer */
261 ltt_buffer_begin_callback(ltt_buf
, tsc
, SUBBUF_INDEX(offset_begin
, ltt_buf
));
262 commit_count
= atomic_add_return(ltt_subbuf_header_len(ltt_buf
),
263 <t_buf
->commit_count
[SUBBUF_INDEX(offset_begin
, ltt_buf
)]);
264 /* Check if the written buffer has to be delivered */
265 if(commit_count
== atomic_read(
266 <t_buf
->reserve_count
[SUBBUF_INDEX(offset_begin
, ltt_buf
)])) {
267 ltt_deliver_callback(ltt_buf
, SUBBUF_INDEX(offset_begin
, ltt_buf
), NULL
);
273 static inline int ltt_buffer_get(struct ltt_buf
*ltt_buf
,
274 unsigned int *offset
)
276 unsigned int consumed_old
, consumed_idx
;
277 consumed_old
= atomic_read(<t_buf
->consumed
);
278 consumed_idx
= SUBBUF_INDEX(consumed_old
, ltt_buf
);
280 if(atomic_read(<t_buf
->commit_count
[consumed_idx
])
281 != atomic_read(<t_buf
->reserve_count
[consumed_idx
])) {
284 if((SUBBUF_TRUNC(atomic_read(<t_buf
->offset
), ltt_buf
)
285 -SUBBUF_TRUNC(consumed_old
, ltt_buf
)) == 0) {
289 *offset
= consumed_old
;
294 static inline int ltt_buffer_put(struct ltt_buf
*ltt_buf
,
297 unsigned int consumed_old
, consumed_new
;
300 consumed_old
= offset
;
301 consumed_new
= SUBBUF_ALIGN(consumed_old
, ltt_buf
);
302 if(atomic_cmpxchg(<t_buf
->consumed
, consumed_old
, consumed_new
)
304 /* We have been pushed by the writer : the last buffer read _is_
306 * It can also happen if this is a buffer we never got. */
309 if(atomic_read(<t_buf
->full
) == 1) {
310 /* tell the client that buffer is now unfull */
311 ret
= futex((unsigned long)<t_buf
->full
,
312 FUTEX_WAKE
, 1, 0, 0, 0);
314 printf("LTT warning : race condition : writer not waiting or too many writers\n");
316 atomic_set(<t_buf
->full
, 0);
321 static int read_subbuffer(struct ltt_buf
*ltt_buf
, int fd
)
323 unsigned int consumed_old
;
325 printf("LTT read buffer\n");
328 err
= ltt_buffer_get(ltt_buf
, &consumed_old
);
330 if(err
!= -EAGAIN
) printf("LTT Reserving sub buffer failed\n");
334 err
= TEMP_FAILURE_RETRY(write(fd
,
336 + (consumed_old
& ((ltt_buf
->alloc_size
)-1)),
337 ltt_buf
->subbuf_size
));
340 perror("Error in writing to file");
344 err
= fsync(pair
->trace
);
347 perror("Error in writing to file");
352 err
= ltt_buffer_put(ltt_buf
, consumed_old
);
356 printf("Reader has been pushed by the writer, last subbuffer corrupted.\n");
357 /* FIXME : we may delete the last written buffer if we wish. */
366 /* This function is called by ltt_rw_init which has signals blocked */
367 static void ltt_usertrace_fast_daemon(struct ltt_trace_info
*shared_trace_info
,
368 sigset_t oldset
, pid_t l_traced_pid
, pthread_t l_traced_tid
)
370 struct sigaction act
;
374 char outfile_name
[PATH_MAX
];
375 char identifier_name
[PATH_MAX
];
378 traced_pid
= l_traced_pid
;
379 traced_tid
= l_traced_tid
;
381 printf("LTT ltt_usertrace_fast_daemon : init is %d, pid is %lu, traced_pid is %lu, traced_tid is %lu\n",
382 shared_trace_info
->init
, getpid(), traced_pid
, traced_tid
);
384 act
.sa_handler
= handler_sigusr1
;
386 sigemptyset(&(act
.sa_mask
));
387 sigaddset(&(act
.sa_mask
), SIGUSR1
);
388 sigaction(SIGUSR1
, &act
, NULL
);
390 act
.sa_handler
= handler_sigusr2
;
392 sigemptyset(&(act
.sa_mask
));
393 sigaddset(&(act
.sa_mask
), SIGUSR2
);
394 sigaction(SIGUSR2
, &act
, NULL
);
396 act
.sa_handler
= handler_sigalarm
;
398 sigemptyset(&(act
.sa_mask
));
399 sigaddset(&(act
.sa_mask
), SIGALRM
);
400 sigaction(SIGALRM
, &act
, NULL
);
403 ret
= pthread_sigmask(SIG_SETMASK
, &oldset
, NULL
);
405 printf("LTT Error in pthread_sigmask\n");
410 /* Open output files */
412 ret
= mkdir(LTT_USERTRACE_ROOT
, 0777);
413 if(ret
< 0 && errno
!= EEXIST
) {
414 perror("LTT Error in creating output (mkdir)");
417 ret
= chdir(LTT_USERTRACE_ROOT
);
419 perror("LTT Error in creating output (chdir)");
422 snprintf(identifier_name
, PATH_MAX
-1, "%lu.%lu.%llu",
423 traced_pid
, traced_tid
, get_cycles());
424 snprintf(outfile_name
, PATH_MAX
-1, "facilities-%s", identifier_name
);
425 fd_fac
= creat(outfile_name
, 0644);
427 snprintf(outfile_name
, PATH_MAX
-1, "cpu-%s", identifier_name
);
428 fd_cpu
= creat(outfile_name
, 0644);
433 if(traced_pid
== 0) break; /* parent died */
434 if(parent_exited
) break;
435 printf("LTT Doing a buffer switch read. pid is : %lu\n", getpid());
438 ret
= read_subbuffer(&shared_trace_info
->channel
.cpu
, fd_cpu
);
442 ret
= read_subbuffer(&shared_trace_info
->channel
.facilities
, fd_fac
);
446 /* The parent thread is dead and we have finished with the buffer */
448 /* Buffer force switch (flush). Using FLUSH instead of ACTIVE because we know
449 * there is no writer. */
450 flush_buffer(&shared_trace_info
->channel
.cpu
, FORCE_FLUSH
);
452 ret
= read_subbuffer(&shared_trace_info
->channel
.cpu
, fd_cpu
);
456 flush_buffer(&shared_trace_info
->channel
.facilities
, FORCE_FLUSH
);
458 ret
= read_subbuffer(&shared_trace_info
->channel
.facilities
, fd_fac
);
464 munmap(shared_trace_info
, sizeof(*shared_trace_info
));
470 /* Reader-writer initialization */
472 static enum ltt_process_role
{ LTT_ROLE_WRITER
, LTT_ROLE_READER
}
473 role
= LTT_ROLE_WRITER
;
476 void ltt_rw_init(void)
479 struct ltt_trace_info
*shared_trace_info
;
481 sigset_t set
, oldset
;
482 pid_t l_traced_pid
= getpid();
483 pid_t l_traced_tid
= gettid();
485 /* parent : create the shared memory map */
486 shared_trace_info
= mmap(0, sizeof(*thread_trace_info
),
487 PROT_READ
|PROT_WRITE
, MAP_SHARED
|MAP_ANONYMOUS
, 0, 0);
488 memset(shared_trace_info
, 0, sizeof(*shared_trace_info
));
489 /* Tricky semaphore : is in a shared memory space, so it's ok for a fast
491 atomic_set(&shared_trace_info
->channel
.facilities
.full
, 0);
492 shared_trace_info
->channel
.facilities
.alloc_size
= LTT_BUF_SIZE_FACILITIES
;
493 shared_trace_info
->channel
.facilities
.subbuf_size
= LTT_SUBBUF_SIZE_FACILITIES
;
494 shared_trace_info
->channel
.facilities
.start
=
495 shared_trace_info
->channel
.facilities_buf
;
496 ltt_buffer_begin_callback(&shared_trace_info
->channel
.facilities
,
497 ltt_get_timestamp(), 0);
499 atomic_set(&shared_trace_info
->channel
.cpu
.full
, 0);
500 shared_trace_info
->channel
.cpu
.alloc_size
= LTT_BUF_SIZE_CPU
;
501 shared_trace_info
->channel
.cpu
.subbuf_size
= LTT_SUBBUF_SIZE_CPU
;
502 shared_trace_info
->channel
.cpu
.start
= shared_trace_info
->channel
.cpu_buf
;
503 ltt_buffer_begin_callback(&shared_trace_info
->channel
.cpu
,
504 ltt_get_timestamp(), 0);
506 shared_trace_info
->init
= 1;
508 /* Disable signals */
509 ret
= sigfillset(&set
);
511 printf("LTT Error in sigfillset\n");
515 ret
= pthread_sigmask(SIG_BLOCK
, &set
, &oldset
);
517 printf("LTT Error in pthread_sigmask\n");
523 shared_trace_info
->daemon_id
= pid
;
524 thread_trace_info
= shared_trace_info
;
527 ret
= pthread_sigmask(SIG_SETMASK
, &oldset
, NULL
);
529 printf("LTT Error in pthread_sigmask\n");
531 } else if(pid
== 0) {
533 role
= LTT_ROLE_READER
;
534 ltt_usertrace_fast_daemon(shared_trace_info
, oldset
, l_traced_pid
,
536 /* Should never return */
540 perror("LTT Error in forking ltt-usertrace-fast");
544 static __thread
struct _pthread_cleanup_buffer cleanup_buffer
;
546 void ltt_thread_init(void)
548 _pthread_cleanup_push(&cleanup_buffer
, ltt_usertrace_fast_cleanup
, NULL
);
552 void __attribute__((constructor
)) __ltt_usertrace_fast_init(void)
554 printf("LTT usertrace-fast init\n");
559 void __attribute__((destructor
)) __ltt_usertrace_fast_fini(void)
561 if(role
== LTT_ROLE_WRITER
) {
562 printf("LTT usertrace-fast fini\n");
563 ltt_usertrace_fast_cleanup(NULL
);