1 /* Copyright (C) 2009 Pierre-Marc Fournier
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 #include <sys/epoll.h>
23 #include <sys/types.h>
35 #include <ust/ustconsumer.h>
40 #define GET_SUBBUF_OK 1
41 #define GET_SUBBUF_DONE 0
42 #define GET_SUBBUF_DIED 2
44 #define PUT_SUBBUF_OK 1
45 #define PUT_SUBBUF_DIED 0
46 #define PUT_SUBBUF_PUSHED 2
47 #define PUT_SUBBUF_DONE 3
49 #define UNIX_PATH_MAX 108
51 static int get_subbuffer(struct buffer_info
*buf
)
53 struct ustcomm_header _send_hdr
, *send_hdr
;
54 struct ustcomm_header _recv_hdr
, *recv_hdr
;
55 struct ustcomm_buffer_info _send_msg
, _recv_msg
;
56 struct ustcomm_buffer_info
*send_msg
, *recv_msg
;
59 send_hdr
= &_send_hdr
;
60 recv_hdr
= &_recv_hdr
;
61 send_msg
= &_send_msg
;
62 recv_msg
= &_recv_msg
;
64 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
, buf
->trace
,
65 buf
->channel
, buf
->channel_cpu
);
70 send_hdr
->command
= GET_SUBBUFFER
;
72 result
= ustcomm_req(buf
->app_sock
, send_hdr
, (char *)send_msg
,
73 recv_hdr
, (char *)recv_msg
);
74 if ((result
< 0 && (errno
== ECONNRESET
|| errno
== EPIPE
)) ||
76 DBG("app died while being traced");
77 return GET_SUBBUF_DIED
;
78 } else if (result
< 0) {
79 ERR("get_subbuffer: ustcomm_req failed");
83 if (!recv_hdr
->result
) {
84 DBG("got subbuffer %s", buf
->name
);
85 buf
->consumed_old
= recv_msg
->consumed_old
;
87 } else if (recv_hdr
->result
== -ENODATA
) {
88 DBG("For buffer %s, the trace was not found. This likely means"
89 " it was destroyed by the user.", buf
->name
);
90 return GET_SUBBUF_DIED
;
93 DBG("error getting subbuffer %s", buf
->name
);
94 return recv_hdr
->result
;
97 static int put_subbuffer(struct buffer_info
*buf
)
99 struct ustcomm_header _send_hdr
, *send_hdr
;
100 struct ustcomm_header _recv_hdr
, *recv_hdr
;
101 struct ustcomm_buffer_info _send_msg
, *send_msg
;
104 send_hdr
= &_send_hdr
;
105 recv_hdr
= &_recv_hdr
;
106 send_msg
= &_send_msg
;
108 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
, buf
->trace
,
109 buf
->channel
, buf
->channel_cpu
);
114 send_hdr
->command
= PUT_SUBBUFFER
;
115 send_msg
->consumed_old
= buf
->consumed_old
;
117 result
= ustcomm_req(buf
->app_sock
, send_hdr
, (char *)send_msg
,
119 if ((result
< 0 && (errno
== ECONNRESET
|| errno
== EPIPE
)) ||
121 DBG("app died while being traced");
122 return PUT_SUBBUF_DIED
;
123 } else if (result
< 0) {
124 ERR("put_subbuffer: ustcomm_req failed");
128 if (!recv_hdr
->result
) {
129 DBG("put subbuffer %s", buf
->name
);
130 return PUT_SUBBUF_OK
;
131 } else if (recv_hdr
->result
== -ENODATA
) {
132 DBG("For buffer %s, the trace was not found. This likely means"
133 " it was destroyed by the user.", buf
->name
);
134 return PUT_SUBBUF_DIED
;
137 DBG("error getting subbuffer %s", buf
->name
);
138 return recv_hdr
->result
;
141 void decrement_active_buffers(void *arg
)
143 struct ustconsumer_instance
*instance
= arg
;
144 pthread_mutex_lock(&instance
->mutex
);
145 instance
->active_buffers
--;
146 pthread_mutex_unlock(&instance
->mutex
);
149 static int get_pidunique(int sock
, s64
*pidunique
)
151 struct ustcomm_header _send_hdr
, *send_hdr
;
152 struct ustcomm_header _recv_hdr
, *recv_hdr
;
153 struct ustcomm_pidunique _recv_msg
, *recv_msg
;
156 send_hdr
= &_send_hdr
;
157 recv_hdr
= &_recv_hdr
;
158 recv_msg
= &_recv_msg
;
160 memset(send_hdr
, 0, sizeof(*send_hdr
));
162 send_hdr
->command
= GET_PIDUNIQUE
;
163 result
= ustcomm_req(sock
, send_hdr
, NULL
, recv_hdr
, (char *)recv_msg
);
167 if (recv_hdr
->result
< 0) {
168 ERR("App responded with error: %s", strerror(recv_hdr
->result
));
169 return recv_hdr
->result
;
172 *pidunique
= recv_msg
->pidunique
;
177 static int get_buf_shmid_pipe_fd(int sock
, struct buffer_info
*buf
,
178 int *buf_shmid
, int *buf_struct_shmid
,
181 struct ustcomm_header _send_hdr
, *send_hdr
;
182 struct ustcomm_header _recv_hdr
, *recv_hdr
;
183 struct ustcomm_buffer_info _send_msg
, *send_msg
;
184 struct ustcomm_buffer_info _recv_msg
, *recv_msg
;
185 int result
, recv_pipe_fd
;
187 send_hdr
= &_send_hdr
;
188 recv_hdr
= &_recv_hdr
;
189 send_msg
= &_send_msg
;
190 recv_msg
= &_recv_msg
;
192 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
, buf
->trace
,
193 buf
->channel
, buf
->channel_cpu
);
195 ERR("Failed to pack buffer info");
199 send_hdr
->command
= GET_BUF_SHMID_PIPE_FD
;
201 result
= ustcomm_send(sock
, send_hdr
, (char *)send_msg
);
203 ERR("Failed to send request");
206 result
= ustcomm_recv_fd(sock
, recv_hdr
, (char *)recv_msg
, &recv_pipe_fd
);
208 ERR("Failed to receive message and fd");
211 if (recv_hdr
->result
< 0) {
212 ERR("App responded with error %s", strerror(recv_hdr
->result
));
213 return recv_hdr
->result
;
216 *buf_shmid
= recv_msg
->buf_shmid
;
217 *buf_struct_shmid
= recv_msg
->buf_struct_shmid
;
218 *buf_pipe_fd
= recv_pipe_fd
;
223 static int get_subbuf_num_size(int sock
, struct buffer_info
*buf
,
224 int *subbuf_num
, int *subbuf_size
)
226 struct ustcomm_header _send_hdr
, *send_hdr
;
227 struct ustcomm_header _recv_hdr
, *recv_hdr
;
228 struct ustcomm_channel_info _send_msg
, *send_msg
;
229 struct ustcomm_channel_info _recv_msg
, *recv_msg
;
232 send_hdr
= &_send_hdr
;
233 recv_hdr
= &_recv_hdr
;
234 send_msg
= &_send_msg
;
235 recv_msg
= &_recv_msg
;
237 result
= ustcomm_pack_channel_info(send_hdr
, send_msg
, buf
->trace
,
243 send_hdr
->command
= GET_SUBBUF_NUM_SIZE
;
245 result
= ustcomm_req(sock
, send_hdr
, (char *)send_msg
,
246 recv_hdr
, (char *)recv_msg
);
251 *subbuf_num
= recv_msg
->subbuf_num
;
252 *subbuf_size
= recv_msg
->subbuf_size
;
254 return recv_hdr
->result
;
258 static int notify_buffer_mapped(int sock
, struct buffer_info
*buf
)
260 struct ustcomm_header _send_hdr
, *send_hdr
;
261 struct ustcomm_header _recv_hdr
, *recv_hdr
;
262 struct ustcomm_buffer_info _send_msg
, *send_msg
;
265 send_hdr
= &_send_hdr
;
266 recv_hdr
= &_recv_hdr
;
267 send_msg
= &_send_msg
;
269 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
, buf
->trace
,
270 buf
->channel
, buf
->channel_cpu
);
275 send_hdr
->command
= NOTIFY_BUF_MAPPED
;
277 result
= ustcomm_req(sock
, send_hdr
, (char *)send_msg
,
283 return recv_hdr
->result
;
287 struct buffer_info
*connect_buffer(struct ustconsumer_instance
*instance
, pid_t pid
,
288 const char *trace
, const char *channel
,
291 struct buffer_info
*buf
;
293 struct shmid_ds shmds
;
295 buf
= (struct buffer_info
*) zmalloc(sizeof(struct buffer_info
));
297 ERR("add_buffer: insufficient memory");
301 buf
->trace
= strdup(trace
);
306 buf
->channel
= strdup(channel
);
311 result
= asprintf(&buf
->name
, "%s_%d", channel
, channel_cpu
);
312 if (result
< 0 || buf
->name
== NULL
) {
313 goto free_buf_channel
;
316 buf
->channel_cpu
= channel_cpu
;
319 result
= ustcomm_connect_app(buf
->pid
, &buf
->app_sock
);
321 WARN("unable to connect to process, it probably died before we were able to connect");
326 result
= get_pidunique(buf
->app_sock
, &buf
->pidunique
);
328 ERR("Failed to get pidunique");
332 /* get shmid and pipe fd */
333 result
= get_buf_shmid_pipe_fd(buf
->app_sock
, buf
, &buf
->shmid
,
334 &buf
->bufstruct_shmid
, &buf
->pipe_fd
);
336 ERR("Failed to get buf_shmid and pipe_fd");
340 fstat(buf
->pipe_fd
, &temp
);
341 if (!S_ISFIFO(temp
.st_mode
)) {
342 ERR("Didn't receive a fifo from the app");
348 /* get number of subbufs and subbuf size */
349 result
= get_subbuf_num_size(buf
->app_sock
, buf
, &buf
->n_subbufs
,
352 ERR("Failed to get subbuf number and size");
357 buf
->mem
= shmat(buf
->shmid
, NULL
, 0);
358 if(buf
->mem
== (void *) 0) {
362 DBG("successfully attached buffer memory");
364 buf
->bufstruct_mem
= shmat(buf
->bufstruct_shmid
, NULL
, 0);
365 if(buf
->bufstruct_mem
== (void *) 0) {
369 DBG("successfully attached buffer bufstruct memory");
371 /* obtain info on the memory segment */
372 result
= shmctl(buf
->shmid
, IPC_STAT
, &shmds
);
375 goto shmdt_bufstruct_mem
;
377 buf
->memlen
= shmds
.shm_segsz
;
379 /* Notify the application that we have mapped the buffer */
380 result
= notify_buffer_mapped(buf
->app_sock
, buf
);
382 goto shmdt_bufstruct_mem
;
385 if(instance
->callbacks
->on_open_buffer
)
386 instance
->callbacks
->on_open_buffer(instance
->callbacks
, buf
);
388 pthread_mutex_lock(&instance
->mutex
);
389 instance
->active_buffers
++;
390 pthread_mutex_unlock(&instance
->mutex
);
395 shmdt(buf
->bufstruct_mem
);
404 close(buf
->app_sock
);
420 static void destroy_buffer(struct ustconsumer_callbacks
*callbacks
,
421 struct buffer_info
*buf
)
425 result
= close(buf
->app_sock
);
427 WARN("problem calling ustcomm_close_app");
430 result
= shmdt(buf
->mem
);
435 result
= shmdt(buf
->bufstruct_mem
);
440 if(callbacks
->on_close_buffer
)
441 callbacks
->on_close_buffer(callbacks
, buf
);
446 int consumer_loop(struct ustconsumer_instance
*instance
, struct buffer_info
*buf
)
452 pthread_cleanup_push(decrement_active_buffers
, instance
);
455 read_result
= read(buf
->pipe_fd
, &read_buf
, 1);
456 /* get the subbuffer */
457 if (read_result
== 1) {
458 result
= get_subbuffer(buf
);
460 ERR("error getting subbuffer");
462 } else if (result
== GET_SUBBUF_DIED
) {
463 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
466 } else if ((read_result
== -1 && (errno
== ECONNRESET
|| errno
== EPIPE
)) ||
468 DBG("App died while being traced");
469 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
473 if(instance
->callbacks
->on_read_subbuffer
)
474 instance
->callbacks
->on_read_subbuffer(instance
->callbacks
, buf
);
476 /* put the subbuffer */
477 result
= put_subbuffer(buf
);
479 ERR("unknown error putting subbuffer (channel=%s)", buf
->name
);
482 else if(result
== PUT_SUBBUF_PUSHED
) {
483 ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf
->name
);
486 else if(result
== PUT_SUBBUF_DIED
) {
487 DBG("application died while putting subbuffer");
488 /* Skip the first subbuffer. We are not sure it is trustable
489 * because the put_subbuffer() did not complete.
491 /* TODO: check on_put_error return value */
492 if(instance
->callbacks
->on_put_error
)
493 instance
->callbacks
->on_put_error(instance
->callbacks
, buf
);
495 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
498 else if(result
== PUT_SUBBUF_DONE
) {
499 /* Done with this subbuffer */
500 /* FIXME: add a case where this branch is used? Upon
501 * normal trace termination, at put_subbuf time, a
502 * special last-subbuffer code could be returned by
507 else if(result
== PUT_SUBBUF_OK
) {
511 DBG("thread for buffer %s is stopping", buf
->name
);
513 /* FIXME: destroy, unalloc... */
515 pthread_cleanup_pop(1);
520 struct consumer_thread_args
{
525 struct ustconsumer_instance
*instance
;
528 void *consumer_thread(void *arg
)
530 struct buffer_info
*buf
;
531 struct consumer_thread_args
*args
= (struct consumer_thread_args
*) arg
;
535 if(args
->instance
->callbacks
->on_new_thread
)
536 args
->instance
->callbacks
->on_new_thread(args
->instance
->callbacks
);
538 /* Block signals that should be handled by the main thread. */
539 result
= sigemptyset(&sigset
);
541 PERROR("sigemptyset");
544 result
= sigaddset(&sigset
, SIGTERM
);
549 result
= sigaddset(&sigset
, SIGINT
);
554 result
= sigprocmask(SIG_BLOCK
, &sigset
, NULL
);
556 PERROR("sigprocmask");
560 buf
= connect_buffer(args
->instance
, args
->pid
, args
->trace
,
561 args
->channel
, args
->channel_cpu
);
563 ERR("failed to connect to buffer");
567 consumer_loop(args
->instance
, buf
);
569 destroy_buffer(args
->instance
->callbacks
, buf
);
573 if(args
->instance
->callbacks
->on_close_thread
)
574 args
->instance
->callbacks
->on_close_thread(args
->instance
->callbacks
);
576 free((void *)args
->channel
);
581 int start_consuming_buffer(struct ustconsumer_instance
*instance
, pid_t pid
,
582 const char *trace
, const char *channel
,
586 struct consumer_thread_args
*args
;
589 DBG("beginning of start_consuming_buffer: args: pid %d bufname %s_%d", pid
, channel
,
592 args
= (struct consumer_thread_args
*) zmalloc(sizeof(struct consumer_thread_args
));
598 args
->trace
= strdup(trace
);
599 args
->channel
= strdup(channel
);
600 args
->channel_cpu
= channel_cpu
;
601 args
->instance
= instance
;
602 DBG("beginning2 of start_consuming_buffer: args: pid %d trace %s"
603 " bufname %s_%d", args
->pid
, args
->trace
, args
->channel
, args
->channel_cpu
);
605 result
= pthread_create(&thr
, NULL
, consumer_thread
, args
);
607 ERR("pthread_create failed");
610 result
= pthread_detach(thr
);
612 ERR("pthread_detach failed");
615 DBG("end of start_consuming_buffer: args: pid %d trace %s "
616 "bufname %s_%d", args
->pid
, args
->channel
, args
->trace
, args
->channel_cpu
);
620 static void process_client_cmd(int sock
, struct ustcomm_header
*req_header
,
621 char *recvbuf
, struct ustconsumer_instance
*instance
)
624 struct ustcomm_header _res_header
;
625 struct ustcomm_header
*res_header
= &_res_header
;
626 struct ustcomm_buffer_info
*buf_inf
;
628 DBG("Processing client command");
630 switch (req_header
->command
) {
633 buf_inf
= (struct ustcomm_buffer_info
*)recvbuf
;
634 result
= ustcomm_unpack_buffer_info(buf_inf
);
636 ERR("Couldn't unpack buffer info");
640 DBG("Going to consume trace %s buffer %s_%d in process %d",
641 buf_inf
->trace
, buf_inf
->channel
, buf_inf
->ch_cpu
,
643 result
= start_consuming_buffer(instance
, buf_inf
->pid
,
648 ERR("error in add_buffer");
652 res_header
->result
= 0;
655 res_header
->result
= 0;
656 /* Only there to force poll to return */
659 res_header
->result
= -EINVAL
;
660 WARN("unknown command: %d", req_header
->command
);
663 if (ustcomm_send(sock
, res_header
, NULL
) <= 0) {
664 ERR("couldn't send command response");
668 #define MAX_EVENTS 10
670 int ustconsumer_start_instance(struct ustconsumer_instance
*instance
)
672 struct ustcomm_header recv_hdr
;
673 char recv_buf
[USTCOMM_BUFFER_SIZE
];
674 struct ustcomm_sock
*epoll_sock
;
675 struct epoll_event events
[MAX_EVENTS
];
676 struct sockaddr addr
;
677 int result
, epoll_fd
, accept_fd
, nfds
, i
, addr_size
, timeout
;
679 if(!instance
->is_init
) {
680 ERR("libustconsumer instance not initialized");
683 epoll_fd
= instance
->epoll_fd
;
689 nfds
= epoll_wait(epoll_fd
, events
, MAX_EVENTS
, timeout
);
690 if (nfds
== -1 && errno
== EINTR
) {
692 } else if (nfds
== -1) {
693 PERROR("ustconsumer_start_instance: epoll_wait failed");
697 for (i
= 0; i
< nfds
; ++i
) {
698 epoll_sock
= (struct ustcomm_sock
*)events
[i
].data
.ptr
;
699 if (epoll_sock
== instance
->listen_sock
) {
700 addr_size
= sizeof(struct sockaddr
);
701 accept_fd
= accept(epoll_sock
->fd
,
703 (socklen_t
*)&addr_size
);
704 if (accept_fd
== -1) {
705 PERROR("ustconsumer_start_instance: "
709 ustcomm_init_sock(accept_fd
, epoll_fd
,
710 &instance
->connections
);
712 result
= ustcomm_recv(epoll_sock
->fd
, &recv_hdr
,
715 ustcomm_del_sock(epoll_sock
, 0);
717 process_client_cmd(epoll_sock
->fd
,
725 if (instance
->quit_program
) {
726 pthread_mutex_lock(&instance
->mutex
);
727 if(instance
->active_buffers
== 0) {
728 pthread_mutex_unlock(&instance
->mutex
);
731 pthread_mutex_unlock(&instance
->mutex
);
736 if(instance
->callbacks
->on_trace_end
)
737 instance
->callbacks
->on_trace_end(instance
);
739 ustconsumer_delete_instance(instance
);
744 /* FIXME: threads and connections !? */
745 void ustconsumer_delete_instance(struct ustconsumer_instance
*instance
)
747 if (instance
->is_init
) {
748 ustcomm_del_named_sock(instance
->listen_sock
, 0);
749 close(instance
->epoll_fd
);
752 pthread_mutex_destroy(&instance
->mutex
);
753 free(instance
->sock_path
);
757 /* FIXME: Do something about the fixed path length, maybe get rid
758 * of the whole concept and use a pipe?
760 int ustconsumer_stop_instance(struct ustconsumer_instance
*instance
, int send_msg
)
768 instance
->quit_program
= 1;
773 /* Send a message through the socket to force poll to return */
775 struct sockaddr_un addr
;
777 result
= fd
= socket(PF_UNIX
, SOCK_STREAM
, 0);
783 addr
.sun_family
= AF_UNIX
;
785 strncpy(addr
.sun_path
, instance
->sock_path
, UNIX_PATH_MAX
);
786 addr
.sun_path
[UNIX_PATH_MAX
-1] = '\0';
788 result
= connect(fd
, (struct sockaddr
*)&addr
, sizeof(addr
));
793 while(bytes
!= sizeof(msg
))
794 bytes
+= send(fd
, msg
, sizeof(msg
), 0);
801 struct ustconsumer_instance
802 *ustconsumer_new_instance(struct ustconsumer_callbacks
*callbacks
,
805 struct ustconsumer_instance
*instance
=
806 zmalloc(sizeof(struct ustconsumer_instance
));
811 instance
->callbacks
= callbacks
;
812 instance
->quit_program
= 0;
813 instance
->is_init
= 0;
814 instance
->active_buffers
= 0;
815 pthread_mutex_init(&instance
->mutex
, NULL
);
818 instance
->sock_path
= strdup(sock_path
);
820 instance
->sock_path
= NULL
;
826 static int init_ustconsumer_socket(struct ustconsumer_instance
*instance
)
830 if (instance
->sock_path
) {
831 if (asprintf(&name
, "%s", instance
->sock_path
) < 0) {
832 ERR("ustcomm_init_ustconsumer : asprintf failed (sock_path %s)",
833 instance
->sock_path
);
839 /* Only check if socket dir exists if we are using the default directory */
840 result
= ensure_dir_exists(SOCK_DIR
);
842 ERR("Unable to create socket directory %s", SOCK_DIR
);
846 if (asprintf(&name
, "%s/%s", SOCK_DIR
, "ustconsumer") < 0) {
847 ERR("ustcomm_init_ustconsumer : asprintf failed (%s/ustconsumer)",
854 instance
->epoll_fd
= epoll_create(MAX_EVENTS
);
855 if (instance
->epoll_fd
== -1) {
856 ERR("epoll_create failed, start instance bailing");
860 /* Create the named socket */
861 instance
->listen_sock
= ustcomm_init_named_socket(name
,
863 if(!instance
->listen_sock
) {
864 ERR("error initializing named socket at %s", name
);
868 CDS_INIT_LIST_HEAD(&instance
->connections
);
875 close(instance
->epoll_fd
);
882 int ustconsumer_init_instance(struct ustconsumer_instance
*instance
)
885 result
= init_ustconsumer_socket(instance
);
887 ERR("failed to initialize socket");
890 instance
->is_init
= 1;