1 /* Copyright (C) 2009 Pierre-Marc Fournier
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 #include <sys/epoll.h>
23 #include <sys/types.h>
40 /* return value: 0 = subbuffer is finished, it won't produce data anymore
41 * 1 = got subbuffer successfully
45 #define GET_SUBBUF_OK 1
46 #define GET_SUBBUF_DONE 0
47 #define GET_SUBBUF_DIED 2
49 #define PUT_SUBBUF_OK 1
50 #define PUT_SUBBUF_DIED 0
51 #define PUT_SUBBUF_PUSHED 2
52 #define PUT_SUBBUF_DONE 3
54 #define UNIX_PATH_MAX 108
56 int get_subbuffer(struct buffer_info
*buf
)
59 char *received_msg
=NULL
;
64 if (asprintf(&send_msg
, "get_subbuffer %s", buf
->name
) < 0) {
65 ERR("get_subbuffer : asprintf failed (%s)",
71 result
= ustcomm_send_request(buf
->app_sock
, send_msg
, &received_msg
);
72 if((result
== -1 && (errno
== ECONNRESET
|| errno
== EPIPE
)) || result
== 0) {
73 DBG("app died while being traced");
74 retval
= GET_SUBBUF_DIED
;
78 ERR("get_subbuffer: ustcomm_send_request failed");
83 result
= sscanf(received_msg
, "%as %ld", &rep_code
, &buf
->consumed_old
);
84 if(result
!= 2 && result
!= 1) {
85 ERR("unable to parse response to get_subbuffer");
91 if (!strcmp(rep_code
, "OK")) {
92 DBG("got subbuffer %s", buf
->name
);
93 retval
= GET_SUBBUF_OK
;
94 } else if(!strcmp(received_msg
, "NOTFOUND")) {
95 DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf
->name
);
96 retval
= GET_SUBBUF_DIED
;
99 DBG("error getting subbuffer %s", buf
->name
);
103 /* FIXME: free correctly the stuff */
116 int put_subbuffer(struct buffer_info
*buf
)
119 char *received_msg
=NULL
;
124 if (asprintf(&send_msg
, "put_subbuffer %s %ld", buf
->name
, buf
->consumed_old
) < 0) {
125 ERR("put_subbuffer : asprintf failed (%s %ld)",
126 buf
->name
, buf
->consumed_old
);
130 result
= ustcomm_send_request(buf
->app_sock
, send_msg
, &received_msg
);
131 if(result
< 0 && (errno
== ECONNRESET
|| errno
== EPIPE
)) {
132 retval
= PUT_SUBBUF_DIED
;
135 else if(result
< 0) {
136 ERR("put_subbuffer: send_message failed");
140 else if(result
== 0) {
141 /* Program seems finished. However this might not be
142 * the last subbuffer that has to be collected.
144 retval
= PUT_SUBBUF_DIED
;
148 result
= sscanf(received_msg
, "%as", &rep_code
);
150 ERR("unable to parse response to put_subbuffer");
155 if(!strcmp(rep_code
, "OK")) {
156 DBG("subbuffer put %s", buf
->name
);
157 retval
= PUT_SUBBUF_OK
;
159 else if(!strcmp(received_msg
, "NOTFOUND")) {
160 DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf
->name
);
161 /* However, maybe this was not the last subbuffer. So
162 * we return the program died.
164 retval
= PUT_SUBBUF_DIED
;
168 DBG("put_subbuffer: received error, we were pushed");
169 retval
= PUT_SUBBUF_PUSHED
;
186 void decrement_active_buffers(void *arg
)
188 struct libustd_instance
*instance
= arg
;
189 pthread_mutex_lock(&instance
->mutex
);
190 instance
->active_buffers
--;
191 pthread_mutex_unlock(&instance
->mutex
);
194 struct buffer_info
*connect_buffer(struct libustd_instance
*instance
, pid_t pid
, const char *bufname
)
196 struct buffer_info
*buf
;
200 struct shmid_ds shmds
;
201 struct ustcomm_header header
;
203 buf
= (struct buffer_info
*) zmalloc(sizeof(struct buffer_info
));
205 ERR("add_buffer: insufficient memory");
212 /* FIXME: Fix all the freeing and exit sequence from this functions */
214 result
= ustcomm_connect_app(buf
->pid
, &buf
->app_sock
);
216 WARN("unable to connect to process, it probably died before we were able to connect");
221 if (asprintf(&send_msg
, "get_pidunique") < 0) {
222 ERR("connect_buffer : asprintf failed (get_pidunique)");
225 result
= ustcomm_send_request(buf
->app_sock
, send_msg
, &received_msg
);
228 ERR("problem in ustcomm_send_request(get_pidunique)");
235 result
= sscanf(received_msg
, "%lld", &buf
->pidunique
);
237 ERR("unable to parse response to get_pidunique");
241 DBG("got pidunique %lld", buf
->pidunique
);
244 if (asprintf(&send_msg
, "get_shmid %s", buf
->name
) < 0) {
245 ERR("connect_buffer : asprintf failed (get_schmid %s)",
249 result
= ustcomm_send_request(buf
->app_sock
, send_msg
, &received_msg
);
252 ERR("problem in ustcomm_send_request(get_shmid)");
259 result
= sscanf(received_msg
, "%d %d", &buf
->shmid
, &buf
->bufstruct_shmid
);
261 ERR("unable to parse response to get_shmid (\"%s\")", received_msg
);
265 DBG("got shmids %d %d", buf
->shmid
, buf
->bufstruct_shmid
);
268 if (asprintf(&send_msg
, "get_n_subbufs %s", buf
->name
) < 0) {
269 ERR("connect_buffer : asprintf failed (get_n_subbufs %s)",
273 result
= ustcomm_send_request(buf
->app_sock
, send_msg
, &received_msg
);
276 ERR("problem in ustcomm_send_request(g_n_subbufs)");
283 result
= sscanf(received_msg
, "%d", &buf
->n_subbufs
);
285 ERR("unable to parse response to get_n_subbufs");
289 DBG("got n_subbufs %d", buf
->n_subbufs
);
291 /* get subbuf size */
292 if (asprintf(&send_msg
, "get_subbuf_size %s", buf
->name
) < 0) {
293 ERR("connect_buffer : asprintf failed (get_subbuf_size %s)",
297 result
= ustcomm_send_request(buf
->app_sock
, send_msg
, &received_msg
);
300 ERR("problem in ustcomm_send_request(get_subbuf_size)");
307 result
= sscanf(received_msg
, "%d", &buf
->subbuf_size
);
309 ERR("unable to parse response to get_subbuf_size");
313 DBG("got subbuf_size %d", buf
->subbuf_size
);
316 buf
->mem
= shmat(buf
->shmid
, NULL
, 0);
317 if(buf
->mem
== (void *) 0) {
321 DBG("successfully attached buffer memory");
323 buf
->bufstruct_mem
= shmat(buf
->bufstruct_shmid
, NULL
, 0);
324 if(buf
->bufstruct_mem
== (void *) 0) {
328 DBG("successfully attached buffer bufstruct memory");
330 /* obtain info on the memory segment */
331 result
= shmctl(buf
->shmid
, IPC_STAT
, &shmds
);
336 buf
->memlen
= shmds
.shm_segsz
;
338 /* get buffer pipe fd */
339 memset(&header
, 0, sizeof(header
));
340 if (asprintf(&send_msg
, "get_buffer_fd %s", buf
->name
) < 0) {
341 ERR("connect_buffer : asprintf failed (get_buffer_fd %s)",
345 header
.size
= strlen(send_msg
) + 1;
346 result
= ustcomm_send(buf
->app_sock
, &header
, send_msg
);
349 ERR("ustcomm_send failed.");
352 result
= ustcomm_recv_fd(buf
->app_sock
, &header
, NULL
, &buf
->pipe_fd
);
354 ERR("ustcomm_recv_fd failed");
358 fstat(buf
->pipe_fd
, &temp
);
359 if (!S_ISFIFO(temp
.st_mode
)) {
360 ERR("Didn't receive a fifo from the app");
364 if(instance
->callbacks
->on_open_buffer
)
365 instance
->callbacks
->on_open_buffer(instance
->callbacks
, buf
);
367 pthread_mutex_lock(&instance
->mutex
);
368 instance
->active_buffers
++;
369 pthread_mutex_unlock(&instance
->mutex
);
378 static void destroy_buffer(struct libustd_callbacks
*callbacks
,
379 struct buffer_info
*buf
)
383 result
= close(buf
->app_sock
);
385 WARN("problem calling ustcomm_close_app");
388 result
= shmdt(buf
->mem
);
393 result
= shmdt(buf
->bufstruct_mem
);
398 if(callbacks
->on_close_buffer
)
399 callbacks
->on_close_buffer(callbacks
, buf
);
404 int consumer_loop(struct libustd_instance
*instance
, struct buffer_info
*buf
)
406 int result
, read_result
;
409 pthread_cleanup_push(decrement_active_buffers
, instance
);
412 read_result
= read(buf
->pipe_fd
, &read_buf
, 1);
413 /* get the subbuffer */
414 if (read_result
== 1) {
415 result
= get_subbuffer(buf
);
417 ERR("error getting subbuffer");
419 } else if (result
== GET_SUBBUF_DIED
) {
420 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
423 } else if ((read_result
== -1 && (errno
== ECONNRESET
|| errno
== EPIPE
)) ||
425 DBG("App died while being traced");
426 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
430 if(instance
->callbacks
->on_read_subbuffer
)
431 instance
->callbacks
->on_read_subbuffer(instance
->callbacks
, buf
);
433 /* put the subbuffer */
434 result
= put_subbuffer(buf
);
436 ERR("unknown error putting subbuffer (channel=%s)", buf
->name
);
439 else if(result
== PUT_SUBBUF_PUSHED
) {
440 ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf
->name
);
443 else if(result
== PUT_SUBBUF_DIED
) {
444 DBG("application died while putting subbuffer");
445 /* Skip the first subbuffer. We are not sure it is trustable
446 * because the put_subbuffer() did not complete.
448 if(instance
->callbacks
->on_put_error
)
449 instance
->callbacks
->on_put_error(instance
->callbacks
, buf
);
451 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
454 else if(result
== PUT_SUBBUF_DONE
) {
455 /* Done with this subbuffer */
456 /* FIXME: add a case where this branch is used? Upon
457 * normal trace termination, at put_subbuf time, a
458 * special last-subbuffer code could be returned by
463 else if(result
== PUT_SUBBUF_OK
) {
467 DBG("thread for buffer %s is stopping", buf
->name
);
469 /* FIXME: destroy, unalloc... */
471 pthread_cleanup_pop(1);
476 struct consumer_thread_args
{
479 struct libustd_instance
*instance
;
482 void *consumer_thread(void *arg
)
484 struct buffer_info
*buf
;
485 struct consumer_thread_args
*args
= (struct consumer_thread_args
*) arg
;
489 DBG("GOT ARGS: pid %d bufname %s", args
->pid
, args
->bufname
);
491 if(args
->instance
->callbacks
->on_new_thread
)
492 args
->instance
->callbacks
->on_new_thread(args
->instance
->callbacks
);
494 /* Block signals that should be handled by the main thread. */
495 result
= sigemptyset(&sigset
);
497 PERROR("sigemptyset");
500 result
= sigaddset(&sigset
, SIGTERM
);
505 result
= sigaddset(&sigset
, SIGINT
);
510 result
= sigprocmask(SIG_BLOCK
, &sigset
, NULL
);
512 PERROR("sigprocmask");
516 buf
= connect_buffer(args
->instance
, args
->pid
, args
->bufname
);
518 ERR("failed to connect to buffer");
522 consumer_loop(args
->instance
, buf
);
524 destroy_buffer(args
->instance
->callbacks
, buf
);
528 if(args
->instance
->callbacks
->on_close_thread
)
529 args
->instance
->callbacks
->on_close_thread(args
->instance
->callbacks
);
531 free((void *)args
->bufname
);
536 int start_consuming_buffer(
537 struct libustd_instance
*instance
, pid_t pid
, const char *bufname
)
540 struct consumer_thread_args
*args
;
543 DBG("beginning of start_consuming_buffer: args: pid %d bufname %s", pid
, bufname
);
545 args
= (struct consumer_thread_args
*) zmalloc(sizeof(struct consumer_thread_args
));
548 args
->bufname
= strdup(bufname
);
549 args
->instance
= instance
;
550 DBG("beginning2 of start_consuming_buffer: args: pid %d bufname %s", args
->pid
, args
->bufname
);
552 result
= pthread_create(&thr
, NULL
, consumer_thread
, args
);
554 ERR("pthread_create failed");
557 result
= pthread_detach(thr
);
559 ERR("pthread_detach failed");
562 DBG("end of start_consuming_buffer: args: pid %d bufname %s", args
->pid
, args
->bufname
);
566 static void process_client_cmd(char *recvbuf
, struct libustd_instance
*instance
)
568 if(!strncmp(recvbuf
, "collect", 7)) {
570 char *bufname
= NULL
;
573 result
= sscanf(recvbuf
, "%*s %d %50as", &pid
, &bufname
);
575 ERR("parsing error: %s", recvbuf
);
579 result
= start_consuming_buffer(instance
, pid
, bufname
);
581 ERR("error in add_buffer");
589 } else if(!strncmp(recvbuf
, "exit", 4)) {
590 /* Only there to force poll to return */
592 WARN("unknown command: %s", recvbuf
);
596 #define MAX_EVENTS 10
598 int libustd_start_instance(struct libustd_instance
*instance
)
600 struct ustcomm_sock
*epoll_sock
;
601 struct epoll_event events
[MAX_EVENTS
];
602 struct sockaddr addr
;
603 int result
, epoll_fd
, accept_fd
, nfds
, i
, addr_size
, timeout
;
605 if(!instance
->is_init
) {
606 ERR("libustd instance not initialized");
609 epoll_fd
= instance
->epoll_fd
;
615 nfds
= epoll_wait(epoll_fd
, events
, MAX_EVENTS
, timeout
);
616 if (nfds
== -1 && errno
== EINTR
) {
618 } else if (nfds
== -1) {
619 PERROR("libustd_start_instance: epoll_wait failed");
623 for (i
= 0; i
< nfds
; ++i
) {
624 epoll_sock
= (struct ustcomm_sock
*)events
[i
].data
.ptr
;
625 if (epoll_sock
== instance
->listen_sock
) {
626 addr_size
= sizeof(struct sockaddr
);
627 accept_fd
= accept(epoll_sock
->fd
,
629 (socklen_t
*)&addr_size
);
630 if (accept_fd
== -1) {
631 PERROR("libustd_start_instance: "
635 ustcomm_init_sock(accept_fd
, epoll_fd
,
636 &instance
->connections
);
639 result
= recv_message_conn(epoll_sock
->fd
, &msg
);
641 ustcomm_del_sock(epoll_sock
, 0);
643 process_client_cmd(msg
, instance
);
650 if (instance
->quit_program
) {
651 pthread_mutex_lock(&instance
->mutex
);
652 if(instance
->active_buffers
== 0) {
653 pthread_mutex_unlock(&instance
->mutex
);
656 pthread_mutex_unlock(&instance
->mutex
);
661 if(instance
->callbacks
->on_trace_end
)
662 instance
->callbacks
->on_trace_end(instance
);
664 libustd_delete_instance(instance
);
669 /* FIXME: threads and connections !? */
670 void libustd_delete_instance(struct libustd_instance
*instance
)
672 if (instance
->is_init
) {
673 ustcomm_del_named_sock(instance
->listen_sock
, 0);
674 close(instance
->epoll_fd
);
677 pthread_mutex_destroy(&instance
->mutex
);
678 free(instance
->sock_path
);
682 int libustd_stop_instance(struct libustd_instance
*instance
, int send_msg
)
690 instance
->quit_program
= 1;
695 /* Send a message through the socket to force poll to return */
697 struct sockaddr_un addr
;
699 result
= fd
= socket(PF_UNIX
, SOCK_STREAM
, 0);
705 addr
.sun_family
= AF_UNIX
;
707 strncpy(addr
.sun_path
, instance
->sock_path
, UNIX_PATH_MAX
);
708 addr
.sun_path
[UNIX_PATH_MAX
-1] = '\0';
710 result
= connect(fd
, (struct sockaddr
*)&addr
, sizeof(addr
));
715 while(bytes
!= sizeof(msg
))
716 bytes
+= send(fd
, msg
, sizeof(msg
), 0);
723 struct libustd_instance
724 *libustd_new_instance(struct libustd_callbacks
*callbacks
,
727 struct libustd_instance
*instance
=
728 zmalloc(sizeof(struct libustd_instance
));
733 instance
->callbacks
= callbacks
;
734 instance
->quit_program
= 0;
735 instance
->is_init
= 0;
736 instance
->active_buffers
= 0;
737 pthread_mutex_init(&instance
->mutex
, NULL
);
740 instance
->sock_path
= strdup(sock_path
);
742 instance
->sock_path
= NULL
;
748 static int init_ustd_socket(struct libustd_instance
*instance
)
752 if (instance
->sock_path
) {
753 if (asprintf(&name
, "%s", instance
->sock_path
) < 0) {
754 ERR("ustcomm_init_ustd : asprintf failed (sock_path %s)",
755 instance
->sock_path
);
761 /* Only check if socket dir exists if we are using the default directory */
762 result
= ensure_dir_exists(SOCK_DIR
);
764 ERR("Unable to create socket directory %s", SOCK_DIR
);
768 if (asprintf(&name
, "%s/%s", SOCK_DIR
, "ustd") < 0) {
769 ERR("ustcomm_init_ustd : asprintf failed (%s/ustd)",
776 instance
->epoll_fd
= epoll_create(MAX_EVENTS
);
777 if (instance
->epoll_fd
== -1) {
778 ERR("epoll_create failed, start instance bailing");
782 /* Create the named socket */
783 instance
->listen_sock
= ustcomm_init_named_socket(name
,
785 if(!instance
->listen_sock
) {
786 ERR("error initializing named socket at %s", name
);
790 INIT_LIST_HEAD(&instance
->connections
);
797 close(instance
->epoll_fd
);
804 int libustd_init_instance(struct libustd_instance
*instance
)
807 result
= init_ustd_socket(instance
);
809 ERR("failed to initialize socket");
812 instance
->is_init
= 1;
This page took 0.065947 seconds and 4 git commands to generate.