1 /* Copyright (C) 2009 Pierre-Marc Fournier
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 #include <sys/epoll.h>
23 #include <sys/types.h>
35 #include <ust/ustconsumer.h>
37 #include "usterr_signal_safe.h"
40 #define GET_SUBBUF_OK 1
41 #define GET_SUBBUF_DONE 0
42 #define GET_SUBBUF_DIED 2
44 #define PUT_SUBBUF_OK 1
45 #define PUT_SUBBUF_DIED 0
46 #define PUT_SUBBUF_PUSHED 2
47 #define PUT_SUBBUF_DONE 3
49 #define UNIX_PATH_MAX 108
51 static int get_subbuffer(struct buffer_info
*buf
)
53 struct ustcomm_header _send_hdr
, *send_hdr
;
54 struct ustcomm_header _recv_hdr
, *recv_hdr
;
55 struct ustcomm_buffer_info _send_msg
, _recv_msg
;
56 struct ustcomm_buffer_info
*send_msg
, *recv_msg
;
59 send_hdr
= &_send_hdr
;
60 recv_hdr
= &_recv_hdr
;
61 send_msg
= &_send_msg
;
62 recv_msg
= &_recv_msg
;
64 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
, buf
->trace
,
65 buf
->channel
, buf
->channel_cpu
);
70 send_hdr
->command
= GET_SUBBUFFER
;
72 result
= ustcomm_req(buf
->app_sock
, send_hdr
, (char *)send_msg
,
73 recv_hdr
, (char *)recv_msg
);
74 if ((result
< 0 && (errno
== ECONNRESET
|| errno
== EPIPE
)) ||
76 DBG("app died while being traced");
77 return GET_SUBBUF_DIED
;
78 } else if (result
< 0) {
79 ERR("get_subbuffer: ustcomm_req failed");
83 if (!recv_hdr
->result
) {
84 DBG("got subbuffer %s", buf
->name
);
85 buf
->consumed_old
= recv_msg
->consumed_old
;
87 } else if (recv_hdr
->result
== -ENODATA
) {
88 DBG("For buffer %s, the trace was not found. This likely means"
89 " it was destroyed by the user.", buf
->name
);
90 return GET_SUBBUF_DIED
;
93 DBG("error getting subbuffer %s", buf
->name
);
94 return recv_hdr
->result
;
97 static int put_subbuffer(struct buffer_info
*buf
)
99 struct ustcomm_header _send_hdr
, *send_hdr
;
100 struct ustcomm_header _recv_hdr
, *recv_hdr
;
101 struct ustcomm_buffer_info _send_msg
, *send_msg
;
104 send_hdr
= &_send_hdr
;
105 recv_hdr
= &_recv_hdr
;
106 send_msg
= &_send_msg
;
108 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
, buf
->trace
,
109 buf
->channel
, buf
->channel_cpu
);
114 send_hdr
->command
= PUT_SUBBUFFER
;
115 send_msg
->consumed_old
= buf
->consumed_old
;
117 result
= ustcomm_req(buf
->app_sock
, send_hdr
, (char *)send_msg
,
119 if ((result
< 0 && (errno
== ECONNRESET
|| errno
== EPIPE
)) ||
121 DBG("app died while being traced");
122 return PUT_SUBBUF_DIED
;
123 } else if (result
< 0) {
124 ERR("put_subbuffer: ustcomm_req failed");
128 if (!recv_hdr
->result
) {
129 DBG("put subbuffer %s", buf
->name
);
130 return PUT_SUBBUF_OK
;
131 } else if (recv_hdr
->result
== -ENODATA
) {
132 DBG("For buffer %s, the trace was not found. This likely means"
133 " it was destroyed by the user.", buf
->name
);
134 return PUT_SUBBUF_DIED
;
137 DBG("error getting subbuffer %s", buf
->name
);
138 return recv_hdr
->result
;
141 void decrement_active_buffers(void *arg
)
143 struct ustconsumer_instance
*instance
= arg
;
144 pthread_mutex_lock(&instance
->mutex
);
145 instance
->active_buffers
--;
146 pthread_mutex_unlock(&instance
->mutex
);
149 static int get_pidunique(int sock
, s64
*pidunique
)
151 struct ustcomm_header _send_hdr
, *send_hdr
;
152 struct ustcomm_header _recv_hdr
, *recv_hdr
;
153 struct ustcomm_pidunique _recv_msg
, *recv_msg
;
156 send_hdr
= &_send_hdr
;
157 recv_hdr
= &_recv_hdr
;
158 recv_msg
= &_recv_msg
;
160 memset(send_hdr
, 0, sizeof(*send_hdr
));
162 send_hdr
->command
= GET_PIDUNIQUE
;
163 result
= ustcomm_req(sock
, send_hdr
, NULL
, recv_hdr
, (char *)recv_msg
);
167 if (recv_hdr
->result
< 0) {
168 ERR("App responded with error: %s", strerror(recv_hdr
->result
));
169 return recv_hdr
->result
;
172 *pidunique
= recv_msg
->pidunique
;
177 static int get_buf_shmid_pipe_fd(int sock
, struct buffer_info
*buf
,
178 int *buf_shmid
, int *buf_struct_shmid
,
181 struct ustcomm_header _send_hdr
, *send_hdr
;
182 struct ustcomm_header _recv_hdr
, *recv_hdr
;
183 struct ustcomm_buffer_info _send_msg
, *send_msg
;
184 struct ustcomm_buffer_info _recv_msg
, *recv_msg
;
185 int result
, recv_pipe_fd
;
187 send_hdr
= &_send_hdr
;
188 recv_hdr
= &_recv_hdr
;
189 send_msg
= &_send_msg
;
190 recv_msg
= &_recv_msg
;
192 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
, buf
->trace
,
193 buf
->channel
, buf
->channel_cpu
);
195 ERR("Failed to pack buffer info");
199 send_hdr
->command
= GET_BUF_SHMID_PIPE_FD
;
201 result
= ustcomm_send(sock
, send_hdr
, (char *)send_msg
);
203 ERR("Failed to send request");
206 result
= ustcomm_recv_fd(sock
, recv_hdr
, (char *)recv_msg
, &recv_pipe_fd
);
208 ERR("Failed to receive message and fd");
211 if (recv_hdr
->result
< 0) {
212 ERR("App responded with error %s", strerror(recv_hdr
->result
));
213 return recv_hdr
->result
;
216 *buf_shmid
= recv_msg
->buf_shmid
;
217 *buf_struct_shmid
= recv_msg
->buf_struct_shmid
;
218 *buf_pipe_fd
= recv_pipe_fd
;
223 static int get_subbuf_num_size(int sock
, struct buffer_info
*buf
,
224 int *subbuf_num
, int *subbuf_size
)
226 struct ustcomm_header _send_hdr
, *send_hdr
;
227 struct ustcomm_header _recv_hdr
, *recv_hdr
;
228 struct ustcomm_channel_info _send_msg
, *send_msg
;
229 struct ustcomm_channel_info _recv_msg
, *recv_msg
;
232 send_hdr
= &_send_hdr
;
233 recv_hdr
= &_recv_hdr
;
234 send_msg
= &_send_msg
;
235 recv_msg
= &_recv_msg
;
237 result
= ustcomm_pack_channel_info(send_hdr
, send_msg
, buf
->trace
,
243 send_hdr
->command
= GET_SUBBUF_NUM_SIZE
;
245 result
= ustcomm_req(sock
, send_hdr
, (char *)send_msg
,
246 recv_hdr
, (char *)recv_msg
);
251 *subbuf_num
= recv_msg
->subbuf_num
;
252 *subbuf_size
= recv_msg
->subbuf_size
;
254 return recv_hdr
->result
;
258 static int notify_buffer_mapped(int sock
, struct buffer_info
*buf
)
260 struct ustcomm_header _send_hdr
, *send_hdr
;
261 struct ustcomm_header _recv_hdr
, *recv_hdr
;
262 struct ustcomm_buffer_info _send_msg
, *send_msg
;
265 send_hdr
= &_send_hdr
;
266 recv_hdr
= &_recv_hdr
;
267 send_msg
= &_send_msg
;
269 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
, buf
->trace
,
270 buf
->channel
, buf
->channel_cpu
);
275 send_hdr
->command
= NOTIFY_BUF_MAPPED
;
277 result
= ustcomm_req(sock
, send_hdr
, (char *)send_msg
,
283 return recv_hdr
->result
;
287 struct buffer_info
*connect_buffer(struct ustconsumer_instance
*instance
, pid_t pid
,
288 const char *trace
, const char *channel
,
291 struct buffer_info
*buf
;
293 struct shmid_ds shmds
;
295 buf
= (struct buffer_info
*) zmalloc(sizeof(struct buffer_info
));
297 ERR("add_buffer: insufficient memory");
301 buf
->trace
= strdup(trace
);
306 buf
->channel
= strdup(channel
);
311 result
= asprintf(&buf
->name
, "%s_%d", channel
, channel_cpu
);
312 if (result
< 0 || buf
->name
== NULL
) {
313 goto free_buf_channel
;
316 buf
->channel_cpu
= channel_cpu
;
319 result
= ustcomm_connect_app(buf
->pid
, &buf
->app_sock
);
321 WARN("unable to connect to process, it probably died before we were able to connect");
326 result
= get_pidunique(buf
->app_sock
, &buf
->pidunique
);
328 ERR("Failed to get pidunique");
332 /* get shmid and pipe fd */
333 result
= get_buf_shmid_pipe_fd(buf
->app_sock
, buf
, &buf
->shmid
,
334 &buf
->bufstruct_shmid
, &buf
->pipe_fd
);
336 ERR("Failed to get buf_shmid and pipe_fd");
340 fstat(buf
->pipe_fd
, &temp
);
341 if (!S_ISFIFO(temp
.st_mode
)) {
342 ERR("Didn't receive a fifo from the app");
348 /* get number of subbufs and subbuf size */
349 result
= get_subbuf_num_size(buf
->app_sock
, buf
, &buf
->n_subbufs
,
352 ERR("Failed to get subbuf number and size");
356 /* Set subbuffer's information */
357 buf
->subbuf_size_order
= get_count_order(buf
->subbuf_size
);
358 buf
->alloc_size
= buf
->subbuf_size
* buf
->n_subbufs
;
361 buf
->mem
= shmat(buf
->shmid
, NULL
, 0);
362 if(buf
->mem
== (void *) 0) {
366 DBG("successfully attached buffer memory");
368 buf
->bufstruct_mem
= shmat(buf
->bufstruct_shmid
, NULL
, 0);
369 if(buf
->bufstruct_mem
== (void *) 0) {
373 DBG("successfully attached buffer bufstruct memory");
375 /* obtain info on the memory segment */
376 result
= shmctl(buf
->shmid
, IPC_STAT
, &shmds
);
379 goto shmdt_bufstruct_mem
;
381 buf
->memlen
= shmds
.shm_segsz
;
383 /* Notify the application that we have mapped the buffer */
384 result
= notify_buffer_mapped(buf
->app_sock
, buf
);
386 goto shmdt_bufstruct_mem
;
389 if(instance
->callbacks
->on_open_buffer
)
390 instance
->callbacks
->on_open_buffer(instance
->callbacks
, buf
);
392 pthread_mutex_lock(&instance
->mutex
);
393 instance
->active_buffers
++;
394 pthread_mutex_unlock(&instance
->mutex
);
399 shmdt(buf
->bufstruct_mem
);
408 close(buf
->app_sock
);
424 static void destroy_buffer(struct ustconsumer_callbacks
*callbacks
,
425 struct buffer_info
*buf
)
429 result
= close(buf
->pipe_fd
);
431 WARN("problem closing the pipe fd");
434 result
= close(buf
->app_sock
);
436 WARN("problem calling ustcomm_close_app");
439 result
= shmdt(buf
->mem
);
444 result
= shmdt(buf
->bufstruct_mem
);
449 if(callbacks
->on_close_buffer
)
450 callbacks
->on_close_buffer(callbacks
, buf
);
455 int consumer_loop(struct ustconsumer_instance
*instance
, struct buffer_info
*buf
)
461 pthread_cleanup_push(decrement_active_buffers
, instance
);
464 read_result
= read(buf
->pipe_fd
, &read_buf
, 1);
465 /* get the subbuffer */
466 if (read_result
== 1) {
467 result
= get_subbuffer(buf
);
469 ERR("error getting subbuffer");
471 } else if (result
== GET_SUBBUF_DIED
) {
472 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
475 } else if ((read_result
== -1 && (errno
== ECONNRESET
|| errno
== EPIPE
)) ||
477 DBG("App died while being traced");
478 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
482 if(instance
->callbacks
->on_read_subbuffer
)
483 instance
->callbacks
->on_read_subbuffer(instance
->callbacks
, buf
);
485 /* put the subbuffer */
486 result
= put_subbuffer(buf
);
488 ERR("unknown error putting subbuffer (channel=%s)", buf
->name
);
491 else if(result
== PUT_SUBBUF_PUSHED
) {
492 ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf
->name
);
495 else if(result
== PUT_SUBBUF_DIED
) {
496 DBG("application died while putting subbuffer");
497 /* Skip the first subbuffer. We are not sure it is trustable
498 * because the put_subbuffer() did not complete.
500 /* TODO: check on_put_error return value */
501 if(instance
->callbacks
->on_put_error
)
502 instance
->callbacks
->on_put_error(instance
->callbacks
, buf
);
504 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
507 else if(result
== PUT_SUBBUF_DONE
) {
508 /* Done with this subbuffer */
509 /* FIXME: add a case where this branch is used? Upon
510 * normal trace termination, at put_subbuf time, a
511 * special last-subbuffer code could be returned by
516 else if(result
== PUT_SUBBUF_OK
) {
520 DBG("thread for buffer %s is stopping", buf
->name
);
522 /* FIXME: destroy, unalloc... */
524 pthread_cleanup_pop(1);
529 struct consumer_thread_args
{
534 struct ustconsumer_instance
*instance
;
537 void *consumer_thread(void *arg
)
539 struct buffer_info
*buf
;
540 struct consumer_thread_args
*args
= (struct consumer_thread_args
*) arg
;
544 if(args
->instance
->callbacks
->on_new_thread
)
545 args
->instance
->callbacks
->on_new_thread(args
->instance
->callbacks
);
547 /* Block signals that should be handled by the main thread. */
548 result
= sigemptyset(&sigset
);
550 PERROR("sigemptyset");
553 result
= sigaddset(&sigset
, SIGTERM
);
558 result
= sigaddset(&sigset
, SIGINT
);
563 result
= sigprocmask(SIG_BLOCK
, &sigset
, NULL
);
565 PERROR("sigprocmask");
569 buf
= connect_buffer(args
->instance
, args
->pid
, args
->trace
,
570 args
->channel
, args
->channel_cpu
);
572 ERR("failed to connect to buffer");
576 consumer_loop(args
->instance
, buf
);
578 destroy_buffer(args
->instance
->callbacks
, buf
);
582 if(args
->instance
->callbacks
->on_close_thread
)
583 args
->instance
->callbacks
->on_close_thread(args
->instance
->callbacks
);
585 free((void *)args
->channel
);
590 int start_consuming_buffer(struct ustconsumer_instance
*instance
, pid_t pid
,
591 const char *trace
, const char *channel
,
595 struct consumer_thread_args
*args
;
598 DBG("beginning of start_consuming_buffer: args: pid %d bufname %s_%d", pid
, channel
,
601 args
= (struct consumer_thread_args
*) zmalloc(sizeof(struct consumer_thread_args
));
607 args
->trace
= strdup(trace
);
608 args
->channel
= strdup(channel
);
609 args
->channel_cpu
= channel_cpu
;
610 args
->instance
= instance
;
611 DBG("beginning2 of start_consuming_buffer: args: pid %d trace %s"
612 " bufname %s_%d", args
->pid
, args
->trace
, args
->channel
, args
->channel_cpu
);
614 result
= pthread_create(&thr
, NULL
, consumer_thread
, args
);
616 ERR("pthread_create failed");
619 result
= pthread_detach(thr
);
621 ERR("pthread_detach failed");
624 DBG("end of start_consuming_buffer: args: pid %d trace %s "
625 "bufname %s_%d", args
->pid
, args
->channel
, args
->trace
, args
->channel_cpu
);
629 static void process_client_cmd(int sock
, struct ustcomm_header
*req_header
,
630 char *recvbuf
, struct ustconsumer_instance
*instance
)
633 struct ustcomm_header _res_header
= {0};
634 struct ustcomm_header
*res_header
= &_res_header
;
635 struct ustcomm_buffer_info
*buf_inf
;
637 DBG("Processing client command");
639 switch (req_header
->command
) {
642 buf_inf
= (struct ustcomm_buffer_info
*)recvbuf
;
643 result
= ustcomm_unpack_buffer_info(buf_inf
);
645 ERR("Couldn't unpack buffer info");
649 DBG("Going to consume trace %s buffer %s_%d in process %d",
650 buf_inf
->trace
, buf_inf
->channel
, buf_inf
->ch_cpu
,
652 result
= start_consuming_buffer(instance
, buf_inf
->pid
,
657 ERR("error in add_buffer");
661 res_header
->result
= 0;
664 res_header
->result
= 0;
665 /* Only there to force poll to return */
668 res_header
->result
= -EINVAL
;
669 WARN("unknown command: %d", req_header
->command
);
672 if (ustcomm_send(sock
, res_header
, NULL
) <= 0) {
673 ERR("couldn't send command response");
677 #define MAX_EVENTS 10
679 int ustconsumer_start_instance(struct ustconsumer_instance
*instance
)
681 struct ustcomm_header recv_hdr
;
682 char recv_buf
[USTCOMM_BUFFER_SIZE
];
683 struct ustcomm_sock
*epoll_sock
;
684 struct epoll_event events
[MAX_EVENTS
];
685 struct sockaddr addr
;
686 int result
, epoll_fd
, accept_fd
, nfds
, i
, addr_size
, timeout
;
688 if(!instance
->is_init
) {
689 ERR("libustconsumer instance not initialized");
692 epoll_fd
= instance
->epoll_fd
;
698 nfds
= epoll_wait(epoll_fd
, events
, MAX_EVENTS
, timeout
);
699 if (nfds
== -1 && errno
== EINTR
) {
701 } else if (nfds
== -1) {
702 PERROR("ustconsumer_start_instance: epoll_wait failed");
706 for (i
= 0; i
< nfds
; ++i
) {
707 epoll_sock
= (struct ustcomm_sock
*)events
[i
].data
.ptr
;
708 if (epoll_sock
== instance
->listen_sock
) {
709 addr_size
= sizeof(struct sockaddr
);
710 accept_fd
= accept(epoll_sock
->fd
,
712 (socklen_t
*)&addr_size
);
713 if (accept_fd
== -1) {
714 PERROR("ustconsumer_start_instance: "
718 ustcomm_init_sock(accept_fd
, epoll_fd
,
719 &instance
->connections
);
721 result
= ustcomm_recv(epoll_sock
->fd
, &recv_hdr
,
724 ustcomm_del_sock(epoll_sock
, 0);
726 process_client_cmd(epoll_sock
->fd
,
734 if (instance
->quit_program
) {
735 pthread_mutex_lock(&instance
->mutex
);
736 if(instance
->active_buffers
== 0) {
737 pthread_mutex_unlock(&instance
->mutex
);
740 pthread_mutex_unlock(&instance
->mutex
);
745 if(instance
->callbacks
->on_trace_end
)
746 instance
->callbacks
->on_trace_end(instance
);
748 ustconsumer_delete_instance(instance
);
753 /* FIXME: threads and connections !? */
754 void ustconsumer_delete_instance(struct ustconsumer_instance
*instance
)
756 if (instance
->is_init
) {
757 ustcomm_del_named_sock(instance
->listen_sock
, 0);
758 close(instance
->epoll_fd
);
761 pthread_mutex_destroy(&instance
->mutex
);
762 free(instance
->sock_path
);
766 /* FIXME: Do something about the fixed path length, maybe get rid
767 * of the whole concept and use a pipe?
769 int ustconsumer_stop_instance(struct ustconsumer_instance
*instance
, int send_msg
)
777 instance
->quit_program
= 1;
782 /* Send a message through the socket to force poll to return */
784 struct sockaddr_un addr
;
786 result
= fd
= socket(PF_UNIX
, SOCK_STREAM
, 0);
792 addr
.sun_family
= AF_UNIX
;
794 strncpy(addr
.sun_path
, instance
->sock_path
, UNIX_PATH_MAX
);
795 addr
.sun_path
[UNIX_PATH_MAX
-1] = '\0';
797 result
= connect(fd
, (struct sockaddr
*)&addr
, sizeof(addr
));
802 while(bytes
!= sizeof(msg
))
803 bytes
+= send(fd
, msg
, sizeof(msg
), 0);
810 struct ustconsumer_instance
811 *ustconsumer_new_instance(struct ustconsumer_callbacks
*callbacks
,
814 struct ustconsumer_instance
*instance
=
815 zmalloc(sizeof(struct ustconsumer_instance
));
820 instance
->callbacks
= callbacks
;
821 instance
->quit_program
= 0;
822 instance
->is_init
= 0;
823 instance
->active_buffers
= 0;
824 pthread_mutex_init(&instance
->mutex
, NULL
);
827 instance
->sock_path
= strdup(sock_path
);
829 instance
->sock_path
= NULL
;
835 static int init_ustconsumer_socket(struct ustconsumer_instance
*instance
)
839 if (instance
->sock_path
) {
840 if (asprintf(&name
, "%s", instance
->sock_path
) < 0) {
841 ERR("ustcomm_init_ustconsumer : asprintf failed (sock_path %s)",
842 instance
->sock_path
);
848 /* Only check if socket dir exists if we are using the default directory */
849 result
= ensure_dir_exists(SOCK_DIR
, S_IRWXU
| S_IRWXG
| S_IRWXO
);
851 ERR("Unable to create socket directory %s", SOCK_DIR
);
855 if (asprintf(&name
, "%s/%s", SOCK_DIR
, "ustconsumer") < 0) {
856 ERR("ustcomm_init_ustconsumer : asprintf failed (%s/ustconsumer)",
863 instance
->epoll_fd
= epoll_create(MAX_EVENTS
);
864 if (instance
->epoll_fd
== -1) {
865 ERR("epoll_create failed, start instance bailing");
869 /* Create the named socket */
870 instance
->listen_sock
= ustcomm_init_named_socket(name
,
872 if(!instance
->listen_sock
) {
873 ERR("error initializing named socket at %s", name
);
877 CDS_INIT_LIST_HEAD(&instance
->connections
);
884 close(instance
->epoll_fd
);
891 int ustconsumer_init_instance(struct ustconsumer_instance
*instance
)
894 result
= init_ustconsumer_socket(instance
);
896 ERR("failed to initialize socket");
899 instance
->is_init
= 1;