999e4dadfcf65da35e4f669e7e0ca71fb3e837ef
1 /* Copyright (C) 2009 Pierre-Marc Fournier
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
37 /* return value: 0 = subbuffer is finished, it won't produce data anymore
38 * 1 = got subbuffer successfully
42 #define GET_SUBBUF_OK 1
43 #define GET_SUBBUF_DONE 0
44 #define GET_SUBBUF_DIED 2
46 #define PUT_SUBBUF_OK 1
47 #define PUT_SUBBUF_DIED 0
48 #define PUT_SUBBUF_PUSHED 2
49 #define PUT_SUBBUF_DONE 3
51 #define UNIX_PATH_MAX 108
53 int get_subbuffer(struct buffer_info
*buf
)
56 char *received_msg
=NULL
;
61 if (asprintf(&send_msg
, "get_subbuffer %s", buf
->name
) < 0) {
62 ERR("get_subbuffer : asprintf failed (%s)",
67 result
= ustcomm_send_request(buf
->conn
, send_msg
, &received_msg
);
68 if((result
== -1 && (errno
== ECONNRESET
|| errno
== EPIPE
)) || result
== 0) {
69 DBG("app died while being traced");
70 retval
= GET_SUBBUF_DIED
;
74 ERR("get_subbuffer: ustcomm_send_request failed");
79 result
= sscanf(received_msg
, "%as %ld", &rep_code
, &buf
->consumed_old
);
80 if(result
!= 2 && result
!= 1) {
81 ERR("unable to parse response to get_subbuffer");
87 if(!strcmp(rep_code
, "OK")) {
88 DBG("got subbuffer %s", buf
->name
);
89 retval
= GET_SUBBUF_OK
;
91 else if(nth_token_is(received_msg
, "END", 0) == 1) {
92 retval
= GET_SUBBUF_DONE
;
95 else if(!strcmp(received_msg
, "NOTFOUND")) {
96 DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf
->name
);
97 retval
= GET_SUBBUF_DIED
;
101 DBG("error getting subbuffer %s", buf
->name
);
105 /* FIXME: free correctly the stuff */
118 int put_subbuffer(struct buffer_info
*buf
)
121 char *received_msg
=NULL
;
126 if (asprintf(&send_msg
, "put_subbuffer %s %ld", buf
->name
, buf
->consumed_old
) < 0) {
127 ERR("put_subbuffer : asprintf failed (%s %ld)",
128 buf
->name
, buf
->consumed_old
);
132 result
= ustcomm_send_request(buf
->conn
, send_msg
, &received_msg
);
133 if(result
< 0 && (errno
== ECONNRESET
|| errno
== EPIPE
)) {
134 retval
= PUT_SUBBUF_DIED
;
137 else if(result
< 0) {
138 ERR("put_subbuffer: send_message failed");
142 else if(result
== 0) {
143 /* Program seems finished. However this might not be
144 * the last subbuffer that has to be collected.
146 retval
= PUT_SUBBUF_DIED
;
150 result
= sscanf(received_msg
, "%as", &rep_code
);
152 ERR("unable to parse response to put_subbuffer");
157 if(!strcmp(rep_code
, "OK")) {
158 DBG("subbuffer put %s", buf
->name
);
159 retval
= PUT_SUBBUF_OK
;
161 else if(!strcmp(received_msg
, "NOTFOUND")) {
162 DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf
->name
);
163 /* However, maybe this was not the last subbuffer. So
164 * we return the program died.
166 retval
= PUT_SUBBUF_DIED
;
170 DBG("put_subbuffer: received error, we were pushed");
171 retval
= PUT_SUBBUF_PUSHED
;
188 void decrement_active_buffers(void *arg
)
190 struct libustd_instance
*instance
= arg
;
191 pthread_mutex_lock(&instance
->mutex
);
192 instance
->active_buffers
--;
193 pthread_mutex_unlock(&instance
->mutex
);
196 struct buffer_info
*connect_buffer(struct libustd_instance
*instance
, pid_t pid
, const char *bufname
)
198 struct buffer_info
*buf
;
202 struct shmid_ds shmds
;
204 buf
= (struct buffer_info
*) zmalloc(sizeof(struct buffer_info
));
206 ERR("add_buffer: insufficient memory");
210 buf
->conn
= malloc(sizeof(struct ustcomm_connection
));
211 if(buf
->conn
== NULL
) {
212 ERR("add_buffer: insufficient memory");
221 result
= ustcomm_connect_app(buf
->pid
, buf
->conn
);
223 WARN("unable to connect to process, it probably died before we were able to connect");
228 if (asprintf(&send_msg
, "get_pidunique") < 0) {
229 ERR("connect_buffer : asprintf failed (get_pidunique)");
232 result
= ustcomm_send_request(buf
->conn
, send_msg
, &received_msg
);
235 ERR("problem in ustcomm_send_request(get_pidunique)");
242 result
= sscanf(received_msg
, "%lld", &buf
->pidunique
);
244 ERR("unable to parse response to get_pidunique");
248 DBG("got pidunique %lld", buf
->pidunique
);
251 if (asprintf(&send_msg
, "get_shmid %s", buf
->name
) < 0) {
252 ERR("connect_buffer : asprintf failed (get_schmid %s)",
256 result
= ustcomm_send_request(buf
->conn
, send_msg
, &received_msg
);
259 ERR("problem in ustcomm_send_request(get_shmid)");
266 result
= sscanf(received_msg
, "%d %d", &buf
->shmid
, &buf
->bufstruct_shmid
);
268 ERR("unable to parse response to get_shmid (\"%s\")", received_msg
);
272 DBG("got shmids %d %d", buf
->shmid
, buf
->bufstruct_shmid
);
275 if (asprintf(&send_msg
, "get_n_subbufs %s", buf
->name
) < 0) {
276 ERR("connect_buffer : asprintf failed (get_n_subbufs %s)",
280 result
= ustcomm_send_request(buf
->conn
, send_msg
, &received_msg
);
283 ERR("problem in ustcomm_send_request(g_n_subbufs)");
290 result
= sscanf(received_msg
, "%d", &buf
->n_subbufs
);
292 ERR("unable to parse response to get_n_subbufs");
296 DBG("got n_subbufs %d", buf
->n_subbufs
);
298 /* get subbuf size */
299 if (asprintf(&send_msg
, "get_subbuf_size %s", buf
->name
) < 0) {
300 ERR("connect_buffer : asprintf failed (get_subbuf_size %s)",
304 result
= ustcomm_send_request(buf
->conn
, send_msg
, &received_msg
);
307 ERR("problem in ustcomm_send_request(get_subbuf_size)");
314 result
= sscanf(received_msg
, "%d", &buf
->subbuf_size
);
316 ERR("unable to parse response to get_subbuf_size");
320 DBG("got subbuf_size %d", buf
->subbuf_size
);
323 buf
->mem
= shmat(buf
->shmid
, NULL
, 0);
324 if(buf
->mem
== (void *) 0) {
328 DBG("successfully attached buffer memory");
330 buf
->bufstruct_mem
= shmat(buf
->bufstruct_shmid
, NULL
, 0);
331 if(buf
->bufstruct_mem
== (void *) 0) {
335 DBG("successfully attached buffer bufstruct memory");
337 /* obtain info on the memory segment */
338 result
= shmctl(buf
->shmid
, IPC_STAT
, &shmds
);
343 buf
->memlen
= shmds
.shm_segsz
;
345 if(instance
->callbacks
->on_open_buffer
)
346 instance
->callbacks
->on_open_buffer(instance
->callbacks
, buf
);
348 pthread_mutex_lock(&instance
->mutex
);
349 instance
->active_buffers
++;
350 pthread_mutex_unlock(&instance
->mutex
);
359 static void destroy_buffer(struct libustd_callbacks
*callbacks
,
360 struct buffer_info
*buf
)
364 result
= ustcomm_close_app(buf
->conn
);
366 WARN("problem calling ustcomm_close_app");
369 result
= shmdt(buf
->mem
);
374 result
= shmdt(buf
->bufstruct_mem
);
379 if(callbacks
->on_close_buffer
)
380 callbacks
->on_close_buffer(callbacks
, buf
);
386 int consumer_loop(struct libustd_instance
*instance
, struct buffer_info
*buf
)
390 pthread_cleanup_push(decrement_active_buffers
, instance
);
393 /* get the subbuffer */
394 result
= get_subbuffer(buf
);
396 ERR("error getting subbuffer");
399 else if(result
== GET_SUBBUF_DONE
) {
403 else if(result
== GET_SUBBUF_DIED
) {
404 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
408 if(instance
->callbacks
->on_read_subbuffer
)
409 instance
->callbacks
->on_read_subbuffer(instance
->callbacks
, buf
);
411 /* put the subbuffer */
412 result
= put_subbuffer(buf
);
414 ERR("unknown error putting subbuffer (channel=%s)", buf
->name
);
417 else if(result
== PUT_SUBBUF_PUSHED
) {
418 ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf
->name
);
421 else if(result
== PUT_SUBBUF_DIED
) {
422 DBG("application died while putting subbuffer");
423 /* Skip the first subbuffer. We are not sure it is trustable
424 * because the put_subbuffer() did not complete.
426 if(instance
->callbacks
->on_put_error
)
427 instance
->callbacks
->on_put_error(instance
->callbacks
, buf
);
429 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
432 else if(result
== PUT_SUBBUF_DONE
) {
433 /* Done with this subbuffer */
434 /* FIXME: add a case where this branch is used? Upon
435 * normal trace termination, at put_subbuf time, a
436 * special last-subbuffer code could be returned by
441 else if(result
== PUT_SUBBUF_OK
) {
445 DBG("thread for buffer %s is stopping", buf
->name
);
447 /* FIXME: destroy, unalloc... */
449 pthread_cleanup_pop(1);
454 struct consumer_thread_args
{
457 struct libustd_instance
*instance
;
460 void *consumer_thread(void *arg
)
462 struct buffer_info
*buf
;
463 struct consumer_thread_args
*args
= (struct consumer_thread_args
*) arg
;
467 DBG("GOT ARGS: pid %d bufname %s", args
->pid
, args
->bufname
);
469 if(args
->instance
->callbacks
->on_new_thread
)
470 args
->instance
->callbacks
->on_new_thread(args
->instance
->callbacks
);
472 /* Block signals that should be handled by the main thread. */
473 result
= sigemptyset(&sigset
);
475 PERROR("sigemptyset");
478 result
= sigaddset(&sigset
, SIGTERM
);
483 result
= sigaddset(&sigset
, SIGINT
);
488 result
= sigprocmask(SIG_BLOCK
, &sigset
, NULL
);
490 PERROR("sigprocmask");
494 buf
= connect_buffer(args
->instance
, args
->pid
, args
->bufname
);
496 ERR("failed to connect to buffer");
500 consumer_loop(args
->instance
, buf
);
502 destroy_buffer(args
->instance
->callbacks
, buf
);
506 if(args
->instance
->callbacks
->on_close_thread
)
507 args
->instance
->callbacks
->on_close_thread(args
->instance
->callbacks
);
509 free((void *)args
->bufname
);
514 int start_consuming_buffer(
515 struct libustd_instance
*instance
, pid_t pid
, const char *bufname
)
518 struct consumer_thread_args
*args
;
521 DBG("beginning of start_consuming_buffer: args: pid %d bufname %s", pid
, bufname
);
523 args
= (struct consumer_thread_args
*) zmalloc(sizeof(struct consumer_thread_args
));
526 args
->bufname
= strdup(bufname
);
527 args
->instance
= instance
;
528 DBG("beginning2 of start_consuming_buffer: args: pid %d bufname %s", args
->pid
, args
->bufname
);
530 result
= pthread_create(&thr
, NULL
, consumer_thread
, args
);
532 ERR("pthread_create failed");
535 result
= pthread_detach(thr
);
537 ERR("pthread_detach failed");
540 DBG("end of start_consuming_buffer: args: pid %d bufname %s", args
->pid
, args
->bufname
);
545 int libustd_start_instance(struct libustd_instance
*instance
)
550 if(!instance
->is_init
) {
551 ERR("libustd instance not initialized");
559 /* check for requests on our public socket */
560 result
= ustcomm_ustd_recv_message(instance
->comm
, &recvbuf
, NULL
, timeout
);
561 if(result
== -1 && errno
== EINTR
) {
564 else if(result
== -1) {
565 ERR("error in ustcomm_ustd_recv_message");
568 else if(result
> 0) {
569 if(!strncmp(recvbuf
, "collect", 7)) {
574 result
= sscanf(recvbuf
, "%*s %d %50as", &pid
, &bufname
);
576 ERR("parsing error: %s", recvbuf
);
580 result
= start_consuming_buffer(instance
, pid
, bufname
);
582 ERR("error in add_buffer");
589 else if(!strncmp(recvbuf
, "exit", 4)) {
590 /* Only there to force poll to return */
593 WARN("unknown command: %s", recvbuf
);
601 if(instance
->quit_program
) {
602 pthread_mutex_lock(&instance
->mutex
);
603 if(instance
->active_buffers
== 0) {
604 pthread_mutex_unlock(&instance
->mutex
);
607 pthread_mutex_unlock(&instance
->mutex
);
612 if(instance
->callbacks
->on_trace_end
)
613 instance
->callbacks
->on_trace_end(instance
);
615 libustd_delete_instance(instance
);
620 void libustd_delete_instance(struct libustd_instance
*instance
)
622 if(instance
->is_init
)
623 ustcomm_fini_ustd(instance
->comm
);
625 pthread_mutex_destroy(&instance
->mutex
);
626 free(instance
->sock_path
);
627 free(instance
->comm
);
631 int libustd_stop_instance(struct libustd_instance
*instance
, int send_msg
)
639 instance
->quit_program
= 1;
644 /* Send a message through the socket to force poll to return */
646 struct sockaddr_un addr
;
648 result
= fd
= socket(PF_UNIX
, SOCK_STREAM
, 0);
654 addr
.sun_family
= AF_UNIX
;
656 strncpy(addr
.sun_path
, instance
->sock_path
, UNIX_PATH_MAX
);
657 addr
.sun_path
[UNIX_PATH_MAX
-1] = '\0';
659 result
= connect(fd
, (struct sockaddr
*)&addr
, sizeof(addr
));
664 while(bytes
!= sizeof(msg
))
665 bytes
+= send(fd
, msg
, sizeof(msg
), 0);
672 struct libustd_instance
*libustd_new_instance(
673 struct libustd_callbacks
*callbacks
, char *sock_path
)
675 struct libustd_instance
*instance
=
676 zmalloc(sizeof(struct libustd_instance
));
680 instance
->comm
= malloc(sizeof(struct ustcomm_ustd
));
681 if(!instance
->comm
) {
686 instance
->callbacks
= callbacks
;
687 instance
->quit_program
= 0;
688 instance
->is_init
= 0;
689 instance
->active_buffers
= 0;
690 pthread_mutex_init(&instance
->mutex
, NULL
);
693 instance
->sock_path
= strdup(sock_path
);
695 instance
->sock_path
= NULL
;
700 int libustd_init_instance(struct libustd_instance
*instance
)
703 result
= ustcomm_init_ustd(instance
->comm
, instance
->sock_path
);
705 ERR("failed to initialize socket");
708 instance
->is_init
= 1;
This page took 0.144453 seconds and 4 git commands to generate.