1 /* Copyright (C) 2009 Pierre-Marc Fournier
3 * This library is free software; you can redistribute it and/or
4 * modify it under the terms of the GNU Lesser General Public
5 * License as published by the Free Software Foundation; either
6 * version 2.1 of the License, or (at your option) any later version.
8 * This library is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * Lesser General Public License for more details.
13 * You should have received a copy of the GNU Lesser General Public
14 * License along with this library; if not, write to the Free Software
15 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 #include <sys/types.h>
39 /* return value: 0 = subbuffer is finished, it won't produce data anymore
40 * 1 = got subbuffer successfully
44 #define GET_SUBBUF_OK 1
45 #define GET_SUBBUF_DONE 0
46 #define GET_SUBBUF_DIED 2
48 #define PUT_SUBBUF_OK 1
49 #define PUT_SUBBUF_DIED 0
50 #define PUT_SUBBUF_PUSHED 2
53 char *trace_path
=NULL
;
55 /* Number of active buffers and the mutex to protect it. */
56 int active_buffers
= 0;
57 pthread_mutex_t active_buffers_mutex
= PTHREAD_MUTEX_INITIALIZER
;
58 /* Whether a request to end the program was received. */
59 sig_atomic_t terminate_req
= 0;
61 int test_sigpipe(void)
66 result
= sigemptyset(&sigset
);
68 PERROR("sigemptyset");
71 result
= sigaddset(&sigset
, SIGPIPE
);
77 result
= sigtimedwait(&sigset
, NULL
, &(struct timespec
){0,0});
78 if(result
== -1 && errno
== EAGAIN
) {
79 /* no signal received */
82 else if(result
== -1) {
83 PERROR("sigtimedwait");
86 else if(result
== SIGPIPE
) {
87 /* received sigpipe */
95 int get_subbuffer(struct buffer_info
*buf
)
98 char *received_msg
=NULL
;
103 asprintf(&send_msg
, "get_subbuffer %s", buf
->name
);
104 result
= ustcomm_send_request(&buf
->conn
, send_msg
, &received_msg
);
106 WARN("process %d destroyed before we could connect to it", buf
->pid
);
107 retval
= GET_SUBBUF_DONE
;
110 else if(result
< 0) {
111 ERR("get_subbuffer: ustcomm_send_request failed");
115 else if(result
== 0) {
116 DBG("app died while being traced");
117 retval
= GET_SUBBUF_DIED
;
121 result
= sscanf(received_msg
, "%as %ld", &rep_code
, &buf
->consumed_old
);
122 if(result
!= 2 && result
!= 1) {
123 ERR("unable to parse response to get_subbuffer");
128 DBG("received msg is %s", received_msg
);
130 if(!strcmp(rep_code
, "OK")) {
131 DBG("got subbuffer %s", buf
->name
);
132 retval
= GET_SUBBUF_OK
;
134 else if(nth_token_is(received_msg
, "END", 0) == 1) {
135 retval
= GET_SUBBUF_DONE
;
139 DBG("error getting subbuffer %s", buf
->name
);
143 /* FIMXE: free correctly the stuff */
156 int put_subbuffer(struct buffer_info
*buf
)
159 char *received_msg
=NULL
;
164 asprintf(&send_msg
, "put_subbuffer %s %ld", buf
->name
, buf
->consumed_old
);
165 result
= ustcomm_send_request(&buf
->conn
, send_msg
, &received_msg
);
166 if(result
< 0 && errno
== ECONNRESET
) {
167 retval
= PUT_SUBBUF_DIED
;
171 ERR("put_subbuffer: send_message failed");
176 result
= sscanf(received_msg
, "%as", &rep_code
);
178 ERR("unable to parse response to put_subbuffer");
183 if(!strcmp(rep_code
, "OK")) {
184 DBG("subbuffer put %s", buf
->name
);
185 retval
= PUT_SUBBUF_OK
;
188 DBG("put_subbuffer: received error, we were pushed");
189 retval
= PUT_SUBBUF_PUSHED
;
206 /* This write is patient because it restarts if it was incomplete.
209 ssize_t
patient_write(int fd
, const void *buf
, size_t count
)
211 const char *bufc
= (const char *) buf
;
215 result
= write(fd
, bufc
, count
);
227 return bufc
-(const char *)buf
;
230 void decrement_active_buffers(void *arg
)
232 pthread_mutex_lock(&active_buffers_mutex
);
234 pthread_mutex_unlock(&active_buffers_mutex
);
237 void *consumer_thread(void *arg
)
239 struct buffer_info
*buf
= (struct buffer_info
*) arg
;
242 pthread_cleanup_push(decrement_active_buffers
, NULL
);
245 /* get the subbuffer */
246 result
= get_subbuffer(buf
);
248 ERR("error getting subbuffer");
251 else if(result
== GET_SUBBUF_DONE
) {
255 else if(result
== GET_SUBBUF_DIED
) {
256 finish_consuming_dead_subbuffer(buf
);
260 /* write data to file */
261 result
= patient_write(buf
->file_fd
, buf
->mem
+ (buf
->consumed_old
& (buf
->n_subbufs
* buf
->subbuf_size
-1)), buf
->subbuf_size
);
264 /* FIXME: maybe drop this trace */
267 /* put the subbuffer */
268 result
= put_subbuffer(buf
);
270 ERR("unknown error putting subbuffer (channel=%s)", buf
->name
);
273 else if(result
== PUT_SUBBUF_PUSHED
) {
274 ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf
->name
);
277 else if(result
== PUT_SUBBUF_DIED
) {
278 WARN("application died while putting subbuffer");
279 /* FIXME: probably need to skip the first subbuffer in finish_consuming_dead_subbuffer */
280 finish_consuming_dead_subbuffer(buf
);
282 else if(result
== PUT_SUBBUF_OK
) {
286 DBG("thread for buffer %s is stopping", buf
->name
);
288 /* FIXME: destroy, unalloc... */
290 pthread_cleanup_pop(1);
295 int create_dir_if_needed(char *dir
)
298 result
= mkdir(dir
, 0777);
300 if(errno
!= EEXIST
) {
309 int is_directory(const char *dir
)
314 result
= stat(dir
, &st
);
320 if(!S_ISDIR(st
.st_mode
)) {
327 int add_buffer(pid_t pid
, char *bufname
)
329 struct buffer_info
*buf
;
336 struct shmid_ds shmds
;
338 buf
= (struct buffer_info
*) malloc(sizeof(struct buffer_info
));
340 ERR("add_buffer: insufficient memory");
348 result
= ustcomm_connect_app(buf
->pid
, &buf
->conn
);
350 WARN("unable to connect to process, it probably died before we were able to connect");
355 asprintf(&send_msg
, "get_pidunique");
356 result
= ustcomm_send_request(&buf
->conn
, send_msg
, &received_msg
);
359 ERR("problem in ustcomm_send_request(get_pidunique)");
363 result
= sscanf(received_msg
, "%lld", &buf
->pidunique
);
365 ERR("unable to parse response to get_pidunique");
369 DBG("got pidunique %lld", buf
->pidunique
);
372 asprintf(&send_msg
, "get_shmid %s", buf
->name
);
373 result
= ustcomm_send_request(&buf
->conn
, send_msg
, &received_msg
);
376 ERR("problem in ustcomm_send_request(get_shmid)");
380 result
= sscanf(received_msg
, "%d %d", &buf
->shmid
, &buf
->bufstruct_shmid
);
382 ERR("unable to parse response to get_shmid");
386 DBG("got shmids %d %d", buf
->shmid
, buf
->bufstruct_shmid
);
389 asprintf(&send_msg
, "get_n_subbufs %s", buf
->name
);
390 result
= ustcomm_send_request(&buf
->conn
, send_msg
, &received_msg
);
393 ERR("problem in ustcomm_send_request(g_n_subbufs)");
397 result
= sscanf(received_msg
, "%d", &buf
->n_subbufs
);
399 ERR("unable to parse response to get_n_subbufs");
403 DBG("got n_subbufs %d", buf
->n_subbufs
);
405 /* get subbuf size */
406 asprintf(&send_msg
, "get_subbuf_size %s", buf
->name
);
407 ustcomm_send_request(&buf
->conn
, send_msg
, &received_msg
);
410 result
= sscanf(received_msg
, "%d", &buf
->subbuf_size
);
412 ERR("unable to parse response to get_subbuf_size");
416 DBG("got subbuf_size %d", buf
->subbuf_size
);
419 buf
->mem
= shmat(buf
->shmid
, NULL
, 0);
420 if(buf
->mem
== (void *) 0) {
424 DBG("successfully attached buffer memory");
426 buf
->bufstruct_mem
= shmat(buf
->bufstruct_shmid
, NULL
, 0);
427 if(buf
->bufstruct_mem
== (void *) 0) {
431 DBG("successfully attached buffer bufstruct memory");
433 /* obtain info on the memory segment */
434 result
= shmctl(buf
->shmid
, IPC_STAT
, &shmds
);
439 buf
->memlen
= shmds
.shm_segsz
;
441 /* open file for output */
443 /* Only create the directory if using the default path, because
444 * of the risk of typo when using trace path override. We don't
445 * want to risk creating plenty of useless directories in that case.
447 result
= create_dir_if_needed(USTD_DEFAULT_TRACE_PATH
);
449 ERR("could not create directory %s", USTD_DEFAULT_TRACE_PATH
);
453 trace_path
= USTD_DEFAULT_TRACE_PATH
;
456 asprintf(&tmp
, "%s/%u_%lld", trace_path
, buf
->pid
, buf
->pidunique
);
457 result
= create_dir_if_needed(tmp
);
459 ERR("could not create directory %s", tmp
);
465 asprintf(&tmp
, "%s/%u_%lld/%s_0", trace_path
, buf
->pid
, buf
->pidunique
, buf
->name
);
466 result
= fd
= open(tmp
, O_WRONLY
| O_CREAT
| O_TRUNC
| O_EXCL
, 00600);
469 ERR("failed opening trace file %s", tmp
);
475 pthread_mutex_lock(&active_buffers_mutex
);
477 pthread_mutex_unlock(&active_buffers_mutex
);
479 pthread_create(&thr
, NULL
, consumer_thread
, buf
);
486 fprintf(stderr
, "Usage:\nustd OPTIONS\n\nOptions:\n"
487 "\t-h\t\tDisplay this usage.\n"
488 "\t-o DIR\t\tSpecify the directory where to output the traces.\n"
489 "\t-s PATH\t\tSpecify the path to use for the daemon socket.\n");
492 int parse_args(int argc
, char **argv
)
497 int option_index
= 0;
498 static struct option long_options
[] = {
500 {"version", 0, 0, 'V'},
504 c
= getopt_long(argc
, argv
, "hs:o:", long_options
, &option_index
);
510 printf("option %s", long_options
[option_index
].name
);
512 printf(" with arg %s", optarg
);
520 if(!is_directory(trace_path
)) {
521 ERR("Not a valid directory. (%s)", trace_path
);
529 printf("Version 0.0\n");
533 /* unknown option or other error; error is
534 printed by getopt, just return */
542 void sigterm_handler(int sig
)
547 int main(int argc
, char **argv
)
549 struct ustcomm_ustd ustd
;
554 result
= sigemptyset(&sigset
);
556 PERROR("sigemptyset");
559 sa
.sa_handler
= sigterm_handler
;
561 sa
.sa_flags
= SA_RESTART
;
562 result
= sigaction(SIGTERM
, &sa
, NULL
);
568 result
= parse_args(argc
, argv
);
573 result
= ustcomm_init_ustd(&ustd
, sock_path
);
575 ERR("failed to initialize socket");
579 /* setup handler for SIGPIPE */
580 result
= sigemptyset(&sigset
);
582 PERROR("sigemptyset");
585 result
= sigaddset(&sigset
, SIGPIPE
);
590 result
= sigprocmask(SIG_BLOCK
, &sigset
, NULL
);
592 PERROR("sigprocmask");
600 /* check for requests on our public socket */
601 result
= ustcomm_ustd_recv_message(&ustd
, &recvbuf
, NULL
, 100);
603 ERR("error in ustcomm_ustd_recv_message");
607 if(!strncmp(recvbuf
, "collect", 7)) {
612 result
= sscanf(recvbuf
, "%*s %d %50as", &pid
, &bufname
);
614 fprintf(stderr
, "parsing error: %s\n", recvbuf
);
617 result
= add_buffer(pid
, bufname
);
619 ERR("error in add_buffer");
628 pthread_mutex_lock(&active_buffers_mutex
);
629 if(active_buffers
== 0) {
630 pthread_mutex_unlock(&active_buffers_mutex
);
633 pthread_mutex_unlock(&active_buffers_mutex
);