1 /* Copyright (C) 2009 Pierre-Marc Fournier
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 #include <sys/epoll.h>
23 #include <sys/types.h>
35 #include <ust/ustconsumer.h>
37 #include "usterr_signal_safe.h"
40 #define GET_SUBBUF_OK 1
41 #define GET_SUBBUF_DONE 0
42 #define GET_SUBBUF_DIED 2
44 #define PUT_SUBBUF_OK 1
45 #define PUT_SUBBUF_DIED 0
46 #define PUT_SUBBUF_PUSHED 2
47 #define PUT_SUBBUF_DONE 3
49 #define UNIX_PATH_MAX 108
51 static int get_subbuffer(struct buffer_info
*buf
)
53 struct ustcomm_header _send_hdr
, *send_hdr
;
54 struct ustcomm_header _recv_hdr
, *recv_hdr
;
55 struct ustcomm_buffer_info _send_msg
, _recv_msg
;
56 struct ustcomm_buffer_info
*send_msg
, *recv_msg
;
59 send_hdr
= &_send_hdr
;
60 recv_hdr
= &_recv_hdr
;
61 send_msg
= &_send_msg
;
62 recv_msg
= &_recv_msg
;
64 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
, buf
->trace
,
65 buf
->channel
, buf
->channel_cpu
);
70 send_hdr
->command
= GET_SUBBUFFER
;
72 result
= ustcomm_req(buf
->app_sock
, send_hdr
, (char *)send_msg
,
73 recv_hdr
, (char *)recv_msg
);
74 if ((result
< 0 && (errno
== ECONNRESET
|| errno
== EPIPE
)) ||
76 DBG("app died while being traced");
77 return GET_SUBBUF_DIED
;
78 } else if (result
< 0) {
79 ERR("get_subbuffer: ustcomm_req failed");
83 if (!recv_hdr
->result
) {
84 DBG("got subbuffer %s", buf
->name
);
85 buf
->consumed_old
= recv_msg
->consumed_old
;
87 } else if (recv_hdr
->result
== -ENODATA
) {
88 DBG("For buffer %s, the trace was not found. This likely means"
89 " it was destroyed by the user.", buf
->name
);
90 return GET_SUBBUF_DIED
;
93 DBG("error getting subbuffer %s", buf
->name
);
94 return recv_hdr
->result
;
97 static int put_subbuffer(struct buffer_info
*buf
)
99 struct ustcomm_header _send_hdr
, *send_hdr
;
100 struct ustcomm_header _recv_hdr
, *recv_hdr
;
101 struct ustcomm_buffer_info _send_msg
, *send_msg
;
104 send_hdr
= &_send_hdr
;
105 recv_hdr
= &_recv_hdr
;
106 send_msg
= &_send_msg
;
108 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
, buf
->trace
,
109 buf
->channel
, buf
->channel_cpu
);
114 send_hdr
->command
= PUT_SUBBUFFER
;
115 send_msg
->consumed_old
= buf
->consumed_old
;
117 result
= ustcomm_req(buf
->app_sock
, send_hdr
, (char *)send_msg
,
119 if ((result
< 0 && (errno
== ECONNRESET
|| errno
== EPIPE
)) ||
121 DBG("app died while being traced");
122 return PUT_SUBBUF_DIED
;
123 } else if (result
< 0) {
124 ERR("put_subbuffer: ustcomm_req failed");
128 if (!recv_hdr
->result
) {
129 DBG("put subbuffer %s", buf
->name
);
130 return PUT_SUBBUF_OK
;
131 } else if (recv_hdr
->result
== -ENODATA
) {
132 DBG("For buffer %s, the trace was not found. This likely means"
133 " it was destroyed by the user.", buf
->name
);
134 return PUT_SUBBUF_DIED
;
137 DBG("error getting subbuffer %s", buf
->name
);
138 return recv_hdr
->result
;
141 void decrement_active_buffers(void *arg
)
143 struct ustconsumer_instance
*instance
= arg
;
144 pthread_mutex_lock(&instance
->mutex
);
145 instance
->active_buffers
--;
146 pthread_mutex_unlock(&instance
->mutex
);
149 static int get_pidunique(int sock
, int64_t *pidunique
)
151 struct ustcomm_header _send_hdr
, *send_hdr
;
152 struct ustcomm_header _recv_hdr
, *recv_hdr
;
153 struct ustcomm_pidunique _recv_msg
, *recv_msg
;
156 send_hdr
= &_send_hdr
;
157 recv_hdr
= &_recv_hdr
;
158 recv_msg
= &_recv_msg
;
160 memset(send_hdr
, 0, sizeof(*send_hdr
));
162 send_hdr
->command
= GET_PIDUNIQUE
;
163 result
= ustcomm_req(sock
, send_hdr
, NULL
, recv_hdr
, (char *)recv_msg
);
167 if (recv_hdr
->result
< 0) {
168 ERR("App responded with error: %s", strerror(recv_hdr
->result
));
169 return recv_hdr
->result
;
172 *pidunique
= recv_msg
->pidunique
;
177 static int get_buf_shmid_pipe_fd(int sock
, struct buffer_info
*buf
,
178 int *buf_shmid
, int *buf_struct_shmid
,
181 struct ustcomm_header _send_hdr
, *send_hdr
;
182 struct ustcomm_header _recv_hdr
, *recv_hdr
;
183 struct ustcomm_buffer_info _send_msg
, *send_msg
;
184 struct ustcomm_buffer_info _recv_msg
, *recv_msg
;
185 int result
, recv_pipe_fd
;
187 send_hdr
= &_send_hdr
;
188 recv_hdr
= &_recv_hdr
;
189 send_msg
= &_send_msg
;
190 recv_msg
= &_recv_msg
;
192 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
, buf
->trace
,
193 buf
->channel
, buf
->channel_cpu
);
195 ERR("Failed to pack buffer info");
199 send_hdr
->command
= GET_BUF_SHMID_PIPE_FD
;
201 result
= ustcomm_send(sock
, send_hdr
, (char *)send_msg
);
203 ERR("Failed to send request");
206 result
= ustcomm_recv_fd(sock
, recv_hdr
, (char *)recv_msg
, &recv_pipe_fd
);
208 ERR("Failed to receive message and fd");
211 if (recv_hdr
->result
< 0) {
212 ERR("App responded with error %s", strerror(recv_hdr
->result
));
213 return recv_hdr
->result
;
216 *buf_shmid
= recv_msg
->buf_shmid
;
217 *buf_struct_shmid
= recv_msg
->buf_struct_shmid
;
218 *buf_pipe_fd
= recv_pipe_fd
;
223 static int get_subbuf_num_size(int sock
, struct buffer_info
*buf
,
224 int *subbuf_num
, int *subbuf_size
)
226 struct ustcomm_header _send_hdr
, *send_hdr
;
227 struct ustcomm_header _recv_hdr
, *recv_hdr
;
228 struct ustcomm_channel_info _send_msg
, *send_msg
;
229 struct ustcomm_channel_info _recv_msg
, *recv_msg
;
232 send_hdr
= &_send_hdr
;
233 recv_hdr
= &_recv_hdr
;
234 send_msg
= &_send_msg
;
235 recv_msg
= &_recv_msg
;
237 result
= ustcomm_pack_channel_info(send_hdr
, send_msg
, buf
->trace
,
243 send_hdr
->command
= GET_SUBBUF_NUM_SIZE
;
245 result
= ustcomm_req(sock
, send_hdr
, (char *)send_msg
,
246 recv_hdr
, (char *)recv_msg
);
251 *subbuf_num
= recv_msg
->subbuf_num
;
252 *subbuf_size
= recv_msg
->subbuf_size
;
254 return recv_hdr
->result
;
258 static int notify_buffer_mapped(int sock
, struct buffer_info
*buf
)
260 struct ustcomm_header _send_hdr
, *send_hdr
;
261 struct ustcomm_header _recv_hdr
, *recv_hdr
;
262 struct ustcomm_buffer_info _send_msg
, *send_msg
;
265 send_hdr
= &_send_hdr
;
266 recv_hdr
= &_recv_hdr
;
267 send_msg
= &_send_msg
;
269 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
, buf
->trace
,
270 buf
->channel
, buf
->channel_cpu
);
275 send_hdr
->command
= NOTIFY_BUF_MAPPED
;
277 result
= ustcomm_req(sock
, send_hdr
, (char *)send_msg
,
283 return recv_hdr
->result
;
287 struct buffer_info
*connect_buffer(struct ustconsumer_instance
*instance
, pid_t pid
,
288 const char *trace
, const char *channel
,
291 struct buffer_info
*buf
;
293 struct shmid_ds shmds
;
295 buf
= (struct buffer_info
*) zmalloc(sizeof(struct buffer_info
));
297 ERR("add_buffer: insufficient memory");
301 buf
->trace
= strdup(trace
);
306 buf
->channel
= strdup(channel
);
311 result
= asprintf(&buf
->name
, "%s_%d", channel
, channel_cpu
);
312 if (result
< 0 || buf
->name
== NULL
) {
313 goto free_buf_channel
;
316 buf
->channel_cpu
= channel_cpu
;
319 result
= ustcomm_connect_app(buf
->pid
, &buf
->app_sock
);
321 WARN("unable to connect to process, it probably died before we were able to connect");
326 result
= get_pidunique(buf
->app_sock
, &buf
->pidunique
);
328 ERR("Failed to get pidunique");
332 /* get shmid and pipe fd */
333 result
= get_buf_shmid_pipe_fd(buf
->app_sock
, buf
, &buf
->shmid
,
334 &buf
->bufstruct_shmid
, &buf
->pipe_fd
);
336 ERR("Failed to get buf_shmid and pipe_fd");
340 fstat(buf
->pipe_fd
, &temp
);
341 if (!S_ISFIFO(temp
.st_mode
)) {
342 ERR("Didn't receive a fifo from the app");
348 /* get number of subbufs and subbuf size */
349 result
= get_subbuf_num_size(buf
->app_sock
, buf
, &buf
->n_subbufs
,
352 ERR("Failed to get subbuf number and size");
356 /* Set subbuffer's information */
357 buf
->subbuf_size_order
= get_count_order(buf
->subbuf_size
);
358 buf
->alloc_size
= buf
->subbuf_size
* buf
->n_subbufs
;
361 buf
->mem
= shmat(buf
->shmid
, NULL
, 0);
362 if(buf
->mem
== (void *) 0) {
366 DBG("successfully attached buffer memory");
368 buf
->bufstruct_mem
= shmat(buf
->bufstruct_shmid
, NULL
, 0);
369 if(buf
->bufstruct_mem
== (void *) 0) {
373 DBG("successfully attached buffer bufstruct memory");
375 /* obtain info on the memory segment */
376 result
= shmctl(buf
->shmid
, IPC_STAT
, &shmds
);
379 goto shmdt_bufstruct_mem
;
381 buf
->memlen
= shmds
.shm_segsz
;
383 /* Notify the application that we have mapped the buffer */
384 result
= notify_buffer_mapped(buf
->app_sock
, buf
);
386 goto shmdt_bufstruct_mem
;
389 if(instance
->callbacks
->on_open_buffer
)
390 instance
->callbacks
->on_open_buffer(instance
->callbacks
, buf
);
392 pthread_mutex_lock(&instance
->mutex
);
393 instance
->active_buffers
++;
394 pthread_mutex_unlock(&instance
->mutex
);
399 shmdt(buf
->bufstruct_mem
);
408 close(buf
->app_sock
);
424 static void destroy_buffer(struct ustconsumer_callbacks
*callbacks
,
425 struct buffer_info
*buf
)
429 result
= close(buf
->pipe_fd
);
431 WARN("problem closing the pipe fd");
434 result
= close(buf
->app_sock
);
436 WARN("problem calling ustcomm_close_app");
439 result
= shmdt(buf
->mem
);
444 result
= shmdt(buf
->bufstruct_mem
);
449 if(callbacks
->on_close_buffer
)
450 callbacks
->on_close_buffer(callbacks
, buf
);
455 int consumer_loop(struct ustconsumer_instance
*instance
, struct buffer_info
*buf
)
461 pthread_cleanup_push(decrement_active_buffers
, instance
);
464 read_result
= read(buf
->pipe_fd
, &read_buf
, 1);
465 /* get the subbuffer */
466 if (read_result
== 1) {
467 result
= get_subbuffer(buf
);
469 ERR("error getting subbuffer");
471 } else if (result
== GET_SUBBUF_DIED
) {
472 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
475 } else if ((read_result
== -1 && (errno
== ECONNRESET
|| errno
== EPIPE
)) ||
477 DBG("App died while being traced");
478 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
480 } else if (read_result
== -1 && errno
== EINTR
) {
484 if(instance
->callbacks
->on_read_subbuffer
)
485 instance
->callbacks
->on_read_subbuffer(instance
->callbacks
, buf
);
487 /* put the subbuffer */
488 result
= put_subbuffer(buf
);
490 ERR("unknown error putting subbuffer (channel=%s)", buf
->name
);
493 else if(result
== PUT_SUBBUF_PUSHED
) {
494 ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf
->name
);
497 else if(result
== PUT_SUBBUF_DIED
) {
498 DBG("application died while putting subbuffer");
499 /* Skip the first subbuffer. We are not sure it is trustable
500 * because the put_subbuffer() did not complete.
502 /* TODO: check on_put_error return value */
503 if(instance
->callbacks
->on_put_error
)
504 instance
->callbacks
->on_put_error(instance
->callbacks
, buf
);
506 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
509 else if(result
== PUT_SUBBUF_DONE
) {
510 /* Done with this subbuffer */
511 /* FIXME: add a case where this branch is used? Upon
512 * normal trace termination, at put_subbuf time, a
513 * special last-subbuffer code could be returned by
518 else if(result
== PUT_SUBBUF_OK
) {
522 DBG("thread for buffer %s is stopping", buf
->name
);
524 /* FIXME: destroy, unalloc... */
526 pthread_cleanup_pop(1);
531 struct consumer_thread_args
{
536 struct ustconsumer_instance
*instance
;
539 void *consumer_thread(void *arg
)
541 struct buffer_info
*buf
;
542 struct consumer_thread_args
*args
= (struct consumer_thread_args
*) arg
;
546 pthread_mutex_lock(&args
->instance
->mutex
);
547 args
->instance
->active_threads
++;
548 pthread_mutex_unlock(&args
->instance
->mutex
);
550 if(args
->instance
->callbacks
->on_new_thread
)
551 args
->instance
->callbacks
->on_new_thread(args
->instance
->callbacks
);
553 /* Block signals that should be handled by the main thread. */
554 result
= sigemptyset(&sigset
);
556 PERROR("sigemptyset");
559 result
= sigaddset(&sigset
, SIGTERM
);
564 result
= sigaddset(&sigset
, SIGINT
);
569 result
= sigprocmask(SIG_BLOCK
, &sigset
, NULL
);
571 PERROR("sigprocmask");
575 buf
= connect_buffer(args
->instance
, args
->pid
, args
->trace
,
576 args
->channel
, args
->channel_cpu
);
578 ERR("failed to connect to buffer");
582 consumer_loop(args
->instance
, buf
);
584 destroy_buffer(args
->instance
->callbacks
, buf
);
588 if(args
->instance
->callbacks
->on_close_thread
)
589 args
->instance
->callbacks
->on_close_thread(args
->instance
->callbacks
);
591 pthread_mutex_lock(&args
->instance
->mutex
);
592 args
->instance
->active_threads
--;
593 pthread_mutex_unlock(&args
->instance
->mutex
);
595 free((void *)args
->channel
);
600 int start_consuming_buffer(struct ustconsumer_instance
*instance
, pid_t pid
,
601 const char *trace
, const char *channel
,
605 struct consumer_thread_args
*args
;
608 DBG("beginning of start_consuming_buffer: args: pid %d bufname %s_%d", pid
, channel
,
611 args
= (struct consumer_thread_args
*) zmalloc(sizeof(struct consumer_thread_args
));
617 args
->trace
= strdup(trace
);
618 args
->channel
= strdup(channel
);
619 args
->channel_cpu
= channel_cpu
;
620 args
->instance
= instance
;
621 DBG("beginning2 of start_consuming_buffer: args: pid %d trace %s"
622 " bufname %s_%d", args
->pid
, args
->trace
, args
->channel
, args
->channel_cpu
);
624 result
= pthread_create(&thr
, NULL
, consumer_thread
, args
);
626 ERR("pthread_create failed");
629 result
= pthread_detach(thr
);
631 ERR("pthread_detach failed");
634 DBG("end of start_consuming_buffer: args: pid %d trace %s "
635 "bufname %s_%d", args
->pid
, args
->channel
, args
->trace
, args
->channel_cpu
);
639 static void process_client_cmd(int sock
, struct ustcomm_header
*req_header
,
640 char *recvbuf
, struct ustconsumer_instance
*instance
)
643 struct ustcomm_header _res_header
= {0};
644 struct ustcomm_header
*res_header
= &_res_header
;
645 struct ustcomm_buffer_info
*buf_inf
;
647 DBG("Processing client command");
649 switch (req_header
->command
) {
652 buf_inf
= (struct ustcomm_buffer_info
*)recvbuf
;
653 result
= ustcomm_unpack_buffer_info(buf_inf
);
655 ERR("Couldn't unpack buffer info");
659 DBG("Going to consume trace %s buffer %s_%d in process %d",
660 buf_inf
->trace
, buf_inf
->channel
, buf_inf
->ch_cpu
,
662 result
= start_consuming_buffer(instance
, buf_inf
->pid
,
667 ERR("error in add_buffer");
671 res_header
->result
= 0;
674 res_header
->result
= 0;
675 /* Only there to force poll to return */
678 res_header
->result
= -EINVAL
;
679 WARN("unknown command: %d", req_header
->command
);
682 if (ustcomm_send(sock
, res_header
, NULL
) <= 0) {
683 ERR("couldn't send command response");
687 #define MAX_EVENTS 10
689 int ustconsumer_start_instance(struct ustconsumer_instance
*instance
)
691 struct ustcomm_header recv_hdr
;
692 char recv_buf
[USTCOMM_BUFFER_SIZE
];
693 struct ustcomm_sock
*epoll_sock
;
694 struct epoll_event events
[MAX_EVENTS
];
695 struct sockaddr addr
;
696 int result
, epoll_fd
, accept_fd
, nfds
, i
, addr_size
, timeout
;
698 if(!instance
->is_init
) {
699 ERR("libustconsumer instance not initialized");
702 epoll_fd
= instance
->epoll_fd
;
708 nfds
= epoll_wait(epoll_fd
, events
, MAX_EVENTS
, timeout
);
709 if (nfds
== -1 && errno
== EINTR
) {
711 } else if (nfds
== -1) {
712 PERROR("ustconsumer_start_instance: epoll_wait failed");
716 for (i
= 0; i
< nfds
; ++i
) {
717 epoll_sock
= (struct ustcomm_sock
*)events
[i
].data
.ptr
;
718 if (epoll_sock
== instance
->listen_sock
) {
719 addr_size
= sizeof(struct sockaddr
);
720 accept_fd
= accept(epoll_sock
->fd
,
722 (socklen_t
*)&addr_size
);
723 if (accept_fd
== -1) {
724 PERROR("ustconsumer_start_instance: "
728 ustcomm_init_sock(accept_fd
, epoll_fd
,
729 &instance
->connections
);
731 result
= ustcomm_recv(epoll_sock
->fd
, &recv_hdr
,
734 ustcomm_del_sock(epoll_sock
, 0);
736 process_client_cmd(epoll_sock
->fd
,
744 if (instance
->quit_program
) {
745 pthread_mutex_lock(&instance
->mutex
);
746 if (instance
->active_buffers
== 0 && instance
->active_threads
== 0) {
747 pthread_mutex_unlock(&instance
->mutex
);
750 pthread_mutex_unlock(&instance
->mutex
);
755 if(instance
->callbacks
->on_trace_end
)
756 instance
->callbacks
->on_trace_end(instance
);
758 ustconsumer_delete_instance(instance
);
763 /* FIXME: threads and connections !? */
764 void ustconsumer_delete_instance(struct ustconsumer_instance
*instance
)
766 if (instance
->is_init
) {
767 ustcomm_del_named_sock(instance
->listen_sock
, 0);
768 close(instance
->epoll_fd
);
771 pthread_mutex_destroy(&instance
->mutex
);
772 free(instance
->sock_path
);
776 /* FIXME: Do something about the fixed path length, maybe get rid
777 * of the whole concept and use a pipe?
779 int ustconsumer_stop_instance(struct ustconsumer_instance
*instance
, int send_msg
)
787 instance
->quit_program
= 1;
792 /* Send a message through the socket to force poll to return */
794 struct sockaddr_un addr
;
797 result
= fd
= socket(PF_UNIX
, SOCK_STREAM
, 0);
805 addr
.sun_family
= AF_UNIX
;
807 strncpy(addr
.sun_path
, instance
->sock_path
, UNIX_PATH_MAX
);
808 addr
.sun_path
[UNIX_PATH_MAX
-1] = '\0';
811 result
= connect(fd
, (struct sockaddr
*)&addr
, sizeof(addr
));
818 while(bytes
!= sizeof(msg
)) {
819 int inc
= send(fd
, msg
, sizeof(msg
), 0);
820 if (inc
< 0 && errno
!= EINTR
)
831 struct ustconsumer_instance
832 *ustconsumer_new_instance(struct ustconsumer_callbacks
*callbacks
,
835 struct ustconsumer_instance
*instance
=
836 zmalloc(sizeof(struct ustconsumer_instance
));
841 instance
->callbacks
= callbacks
;
842 instance
->quit_program
= 0;
843 instance
->is_init
= 0;
844 instance
->active_buffers
= 0;
845 pthread_mutex_init(&instance
->mutex
, NULL
);
848 instance
->sock_path
= strdup(sock_path
);
850 instance
->sock_path
= NULL
;
856 static int init_ustconsumer_socket(struct ustconsumer_instance
*instance
)
860 if (instance
->sock_path
) {
861 if (asprintf(&name
, "%s", instance
->sock_path
) < 0) {
862 ERR("ustcomm_init_ustconsumer : asprintf failed (sock_path %s)",
863 instance
->sock_path
);
869 /* Only check if socket dir exists if we are using the default directory */
870 result
= ensure_dir_exists(SOCK_DIR
, S_IRWXU
| S_IRWXG
| S_IRWXO
);
872 ERR("Unable to create socket directory %s", SOCK_DIR
);
876 if (asprintf(&name
, "%s/%s", SOCK_DIR
, "ustconsumer") < 0) {
877 ERR("ustcomm_init_ustconsumer : asprintf failed (%s/ustconsumer)",
884 instance
->epoll_fd
= epoll_create(MAX_EVENTS
);
885 if (instance
->epoll_fd
== -1) {
886 ERR("epoll_create failed, start instance bailing");
890 /* Create the named socket */
891 instance
->listen_sock
= ustcomm_init_named_socket(name
,
893 if(!instance
->listen_sock
) {
894 ERR("error initializing named socket at %s", name
);
898 CDS_INIT_LIST_HEAD(&instance
->connections
);
905 close(instance
->epoll_fd
);
912 int ustconsumer_init_instance(struct ustconsumer_instance
*instance
)
915 result
= init_ustconsumer_socket(instance
);
917 ERR("failed to initialize socket");
920 instance
->is_init
= 1;