2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; only version 2
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
27 #include <sys/socket.h>
28 #include <sys/types.h>
30 #include <urcu/list.h>
33 #include "kernelctl.h"
34 #include "lttkconsumerd.h"
38 struct kconsumerd_global_data
{
40 * kconsumerd_data.lock protects kconsumerd_data.fd_list,
41 * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It
42 * ensures the count matches the number of items in the fd_list.
43 * It ensures the list updates *always* trigger an fd_array
44 * update (therefore need to make list update vs
45 * kconsumerd_data.need_update flag update atomic, and also flag
46 * read, fd array and flag clear atomic).
50 * Number of element for the list below. Protected by
51 * kconsumerd_data.lock.
53 unsigned int fds_count
;
55 * List of FDs. Protected by kconsumerd_data.lock.
57 struct kconsumerd_fd_list fd_list
;
59 * Flag specifying if the local array of FDs needs update in the
60 * poll function. Protected by kconsumerd_data.lock.
62 unsigned int need_update
;
64 .fd_list
.head
= CDS_LIST_HEAD_INIT(kconsumerd_data
.fd_list
.head
),
67 /* communication with splice */
68 static int kconsumerd_thread_pipe
[2];
70 /* pipe to wake the poll thread when necessary */
71 static int kconsumerd_poll_pipe
[2];
73 /* to let the signal handler wake up the fd receiver thread */
74 static int kconsumerd_should_quit
[2];
76 /* timeout parameter, to control the polling thread grace period */
77 static int kconsumerd_poll_timeout
= -1;
79 /* socket to communicate errors with sessiond */
80 static int kconsumerd_error_socket
;
82 /* socket to exchange commands with sessiond */
83 static char *kconsumerd_command_sock_path
;
86 * flag to inform the polling thread to quit when all fd hung up.
87 * Updated by the kconsumerd_thread_receive_fds when it notices that all
88 * fds has hung up. Also updated by the signal handler
89 * (kconsumerd_should_exit()). Read by the polling threads.
91 static volatile int kconsumerd_quit
= 0;
94 * kconsumerd_set_error_socket
96 * Set the error socket
98 void kconsumerd_set_error_socket(int sock
)
100 kconsumerd_error_socket
= sock
;
104 * kconsumerd_set_command_socket_path
106 * Set the command socket path
108 void kconsumerd_set_command_socket_path(char *sock
)
110 kconsumerd_command_sock_path
= sock
;
114 * kconsumerd_find_session_fd
116 * Find a session fd in the global list.
117 * The kconsumerd_data.lock must be locked during this call
119 * Return 1 if found else 0
121 static int kconsumerd_find_session_fd(int fd
)
123 struct kconsumerd_fd
*iter
;
125 cds_list_for_each_entry(iter
, &kconsumerd_data
.fd_list
.head
, list
) {
126 if (iter
->sessiond_fd
== fd
) {
127 DBG("Duplicate session fd %d", fd
);
138 * Remove a fd from the global list protected by a mutex
140 static void kconsumerd_del_fd(struct kconsumerd_fd
*lcf
)
142 pthread_mutex_lock(&kconsumerd_data
.lock
);
143 cds_list_del(&lcf
->list
);
144 if (kconsumerd_data
.fds_count
> 0) {
145 kconsumerd_data
.fds_count
--;
148 close(lcf
->consumerd_fd
);
153 kconsumerd_data
.need_update
= 1;
154 pthread_mutex_unlock(&kconsumerd_data
.lock
);
160 * Add a fd to the global list protected by a mutex
162 static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg
*buf
, int consumerd_fd
)
165 struct kconsumerd_fd
*tmp_fd
;
167 pthread_mutex_lock(&kconsumerd_data
.lock
);
168 /* Check if already exist */
169 ret
= kconsumerd_find_session_fd(buf
->fd
);
174 tmp_fd
= malloc(sizeof(struct kconsumerd_fd
));
175 tmp_fd
->sessiond_fd
= buf
->fd
;
176 tmp_fd
->consumerd_fd
= consumerd_fd
;
177 tmp_fd
->state
= buf
->state
;
178 tmp_fd
->max_sb_size
= buf
->max_sb_size
;
179 strncpy(tmp_fd
->path_name
, buf
->path_name
, PATH_MAX
);
180 tmp_fd
->path_name
[PATH_MAX
- 1] = '\0';
182 /* Opening the tracefile in write mode */
183 ret
= open(tmp_fd
->path_name
,
184 O_WRONLY
|O_CREAT
|O_TRUNC
, S_IRWXU
|S_IRWXG
|S_IRWXO
);
186 ERR("Opening %s", tmp_fd
->path_name
);
190 tmp_fd
->out_fd
= ret
;
191 tmp_fd
->out_fd_offset
= 0;
193 DBG("Adding %s (%d, %d, %d)", tmp_fd
->path_name
,
194 tmp_fd
->sessiond_fd
, tmp_fd
->consumerd_fd
, tmp_fd
->out_fd
);
196 cds_list_add(&tmp_fd
->list
, &kconsumerd_data
.fd_list
.head
);
197 kconsumerd_data
.fds_count
++;
198 kconsumerd_data
.need_update
= 1;
200 pthread_mutex_unlock(&kconsumerd_data
.lock
);
205 * kconsumerd_change_fd_state
207 * Update a fd according to what we just received
209 static void kconsumerd_change_fd_state(int sessiond_fd
,
210 enum kconsumerd_fd_state state
)
212 struct kconsumerd_fd
*iter
;
214 pthread_mutex_lock(&kconsumerd_data
.lock
);
215 cds_list_for_each_entry(iter
, &kconsumerd_data
.fd_list
.head
, list
) {
216 if (iter
->sessiond_fd
== sessiond_fd
) {
221 kconsumerd_data
.need_update
= 1;
222 pthread_mutex_unlock(&kconsumerd_data
.lock
);
226 * kconsumerd_update_poll_array
228 * Allocate the pollfd structure and the local view of the out fds
229 * to avoid doing a lookup in the linked list and concurrency issues
230 * when writing is needed.
231 * Returns the number of fds in the structures
232 * Called with kconsumerd_data.lock held.
234 static int kconsumerd_update_poll_array(struct pollfd
**pollfd
,
235 struct kconsumerd_fd
**local_kconsumerd_fd
)
237 struct kconsumerd_fd
*iter
;
240 DBG("Updating poll fd array");
242 cds_list_for_each_entry(iter
, &kconsumerd_data
.fd_list
.head
, list
) {
243 DBG("Inside for each");
244 if (iter
->state
== ACTIVE_FD
) {
245 DBG("Active FD %d", iter
->consumerd_fd
);
246 (*pollfd
)[i
].fd
= iter
->consumerd_fd
;
247 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
248 local_kconsumerd_fd
[i
] = iter
;
254 * insert the kconsumerd_poll_pipe at the end of the array and don't
255 * increment i so nb_fd is the number of real FD
257 (*pollfd
)[i
].fd
= kconsumerd_poll_pipe
[0];
258 (*pollfd
)[i
].events
= POLLIN
;
264 * kconsumerd_on_read_subbuffer_mmap
266 * mmap the ring buffer, read it and write the data to the tracefile.
267 * Returns the number of bytes written
269 static int kconsumerd_on_read_subbuffer_mmap(
270 struct kconsumerd_fd
*kconsumerd_fd
, unsigned long len
)
272 unsigned long mmap_len
, mmap_offset
, padded_len
, padding_len
;
274 char *padding
= NULL
;
276 off_t orig_offset
= kconsumerd_fd
->out_fd_offset
;
277 int fd
= kconsumerd_fd
->consumerd_fd
;
278 int outfd
= kconsumerd_fd
->out_fd
;
280 /* get the padded subbuffer size to know the padding required */
281 ret
= kernctl_get_padded_subbuf_size(fd
, &padded_len
);
284 perror("kernctl_get_padded_subbuf_size");
287 padding_len
= padded_len
- len
;
288 padding
= malloc(padding_len
* sizeof(char));
289 memset(padding
, '\0', padding_len
);
291 /* get the len of the mmap region */
292 ret
= kernctl_get_mmap_len(fd
, &mmap_len
);
295 perror("kernctl_get_mmap_len");
299 /* get the offset inside the fd to mmap */
300 ret
= kernctl_get_mmap_read_offset(fd
, &mmap_offset
);
303 perror("kernctl_get_mmap_read_offset");
307 mmap_base
= mmap(NULL
, mmap_len
, PROT_READ
, MAP_PRIVATE
, fd
, mmap_offset
);
308 if (mmap_base
== MAP_FAILED
) {
309 perror("Error mmaping");
315 ret
= write(outfd
, mmap_base
, len
);
318 } else if (ret
< 0) {
320 perror("Error in file write");
323 /* This won't block, but will start writeout asynchronously */
324 sync_file_range(outfd
, kconsumerd_fd
->out_fd_offset
, ret
,
325 SYNC_FILE_RANGE_WRITE
);
326 kconsumerd_fd
->out_fd_offset
+= ret
;
329 /* once all the data is written, write the padding to disk */
330 ret
= write(outfd
, padding
, padding_len
);
333 perror("Error writing padding to file");
338 * This does a blocking write-and-wait on any page that belongs to the
339 * subbuffer prior to the one we just wrote.
340 * Don't care about error values, as these are just hints and ways to
341 * limit the amount of page cache used.
343 if (orig_offset
>= kconsumerd_fd
->max_sb_size
) {
344 sync_file_range(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
345 kconsumerd_fd
->max_sb_size
,
346 SYNC_FILE_RANGE_WAIT_BEFORE
347 | SYNC_FILE_RANGE_WRITE
348 | SYNC_FILE_RANGE_WAIT_AFTER
);
351 * Give hints to the kernel about how we access the file:
352 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
355 * We need to call fadvise again after the file grows because the
356 * kernel does not seem to apply fadvise to non-existing parts of the
359 * Call fadvise _after_ having waited for the page writeback to
360 * complete because the dirty page writeback semantic is not well
361 * defined. So it can be expected to lead to lower throughput in
364 posix_fadvise(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
365 kconsumerd_fd
->max_sb_size
, POSIX_FADV_DONTNEED
);
370 if (padding
!= NULL
) {
377 * kconsumerd_on_read_subbuffer
379 * Splice the data from the ring buffer to the tracefile.
380 * Returns the number of bytes spliced
382 static int kconsumerd_on_read_subbuffer(
383 struct kconsumerd_fd
*kconsumerd_fd
, unsigned long len
)
387 off_t orig_offset
= kconsumerd_fd
->out_fd_offset
;
388 int fd
= kconsumerd_fd
->consumerd_fd
;
389 int outfd
= kconsumerd_fd
->out_fd
;
392 DBG("splice chan to pipe offset %lu (fd : %d)",
393 (unsigned long)offset
, fd
);
394 ret
= splice(fd
, &offset
, kconsumerd_thread_pipe
[1], NULL
, len
,
395 SPLICE_F_MOVE
| SPLICE_F_MORE
);
396 DBG("splice chan to pipe ret %ld", ret
);
399 perror("Error in relay splice");
403 ret
= splice(kconsumerd_thread_pipe
[0], NULL
, outfd
, NULL
, ret
,
404 SPLICE_F_MOVE
| SPLICE_F_MORE
);
405 DBG("splice pipe to file %ld", ret
);
408 perror("Error in file splice");
414 /* This won't block, but will start writeout asynchronously */
415 sync_file_range(outfd
, kconsumerd_fd
->out_fd_offset
, ret
,
416 SYNC_FILE_RANGE_WRITE
);
417 kconsumerd_fd
->out_fd_offset
+= ret
;
421 * This does a blocking write-and-wait on any page that belongs to the
422 * subbuffer prior to the one we just wrote.
423 * Don't care about error values, as these are just hints and ways to
424 * limit the amount of page cache used.
426 if (orig_offset
>= kconsumerd_fd
->max_sb_size
) {
427 sync_file_range(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
428 kconsumerd_fd
->max_sb_size
,
429 SYNC_FILE_RANGE_WAIT_BEFORE
430 | SYNC_FILE_RANGE_WRITE
431 | SYNC_FILE_RANGE_WAIT_AFTER
);
433 * Give hints to the kernel about how we access the file:
434 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
437 * We need to call fadvise again after the file grows because the
438 * kernel does not seem to apply fadvise to non-existing parts of the
441 * Call fadvise _after_ having waited for the page writeback to
442 * complete because the dirty page writeback semantic is not well
443 * defined. So it can be expected to lead to lower throughput in
446 posix_fadvise(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
447 kconsumerd_fd
->max_sb_size
, POSIX_FADV_DONTNEED
);
452 /* send the appropriate error description to sessiond */
455 kconsumerd_send_error(KCONSUMERD_SPLICE_EBADF
);
458 kconsumerd_send_error(KCONSUMERD_SPLICE_EINVAL
);
461 kconsumerd_send_error(KCONSUMERD_SPLICE_ENOMEM
);
464 kconsumerd_send_error(KCONSUMERD_SPLICE_ESPIPE
);
473 * kconsumerd_read_subbuffer
475 * Consume data on a file descriptor and write it on a trace file
477 static int kconsumerd_read_subbuffer(struct kconsumerd_fd
*kconsumerd_fd
)
482 int infd
= kconsumerd_fd
->consumerd_fd
;
484 DBG("In kconsumerd_read_subbuffer (infd : %d)", infd
);
485 /* Get the next subbuffer */
486 err
= kernctl_get_next_subbuf(infd
);
489 perror("Reserving sub buffer failed (everything is normal, "
490 "it is due to concurrency)");
494 switch (DEFAULT_KERNEL_CHANNEL_OUTPUT
) {
495 case LTTNG_EVENT_SPLICE
:
496 /* read the whole subbuffer */
497 err
= kernctl_get_padded_subbuf_size(infd
, &len
);
500 perror("Getting sub-buffer len failed.");
504 /* splice the subbuffer to the tracefile */
505 ret
= kconsumerd_on_read_subbuffer(kconsumerd_fd
, len
);
508 * display the error but continue processing to try
509 * to release the subbuffer
511 ERR("Error splicing to tracefile");
514 case LTTNG_EVENT_MMAP
:
515 /* read the used subbuffer size */
516 err
= kernctl_get_subbuf_size(infd
, &len
);
519 perror("Getting sub-buffer len failed.");
522 /* write the subbuffer to the tracefile */
523 ret
= kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd
, len
);
526 * display the error but continue processing to try
527 * to release the subbuffer
529 ERR("Error writing to tracefile");
533 ERR("Unknown output method");
537 err
= kernctl_put_next_subbuf(infd
);
540 if (errno
== EFAULT
) {
541 perror("Error in unreserving sub buffer\n");
542 } else if (errno
== EIO
) {
543 /* Should never happen with newer LTTng versions */
544 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
554 * kconsumerd_poll_socket
556 * Poll on the should_quit pipe and the command socket
557 * return -1 on error and should exit, 0 if data is
558 * available on the command socket
560 int kconsumerd_poll_socket(struct pollfd
*kconsumerd_sockpoll
)
564 num_rdy
= poll(kconsumerd_sockpoll
, 2, -1);
566 perror("Poll error");
569 if (kconsumerd_sockpoll
[0].revents
== POLLIN
) {
570 DBG("kconsumerd_should_quit wake up");
580 * kconsumerd_consumerd_recv_fd
582 * Receives an array of file descriptors and the associated
583 * structures describing each fd (path name).
584 * Returns the size of received data
586 static int kconsumerd_consumerd_recv_fd(int sfd
,
587 struct pollfd
*kconsumerd_sockpoll
, int size
,
588 enum kconsumerd_command cmd_type
)
591 int ret
= 0, i
, tmp2
;
592 struct cmsghdr
*cmsg
;
594 char recv_fd
[CMSG_SPACE(sizeof(int))];
595 struct lttcomm_kconsumerd_msg lkm
;
597 /* the number of fds we are about to receive */
598 nb_fd
= size
/ sizeof(struct lttcomm_kconsumerd_msg
);
601 * nb_fd is the number of fds we receive. One fd per recvmsg.
603 for (i
= 0; i
< nb_fd
; i
++) {
604 struct msghdr msg
= { 0 };
606 /* Prepare to receive the structures */
607 iov
[0].iov_base
= &lkm
;
608 iov
[0].iov_len
= sizeof(lkm
);
612 msg
.msg_control
= recv_fd
;
613 msg
.msg_controllen
= sizeof(recv_fd
);
615 DBG("Waiting to receive fd");
616 if (kconsumerd_poll_socket(kconsumerd_sockpoll
) < 0) {
620 if ((ret
= recvmsg(sfd
, &msg
, 0)) < 0) {
625 if (ret
!= (size
/ nb_fd
)) {
626 ERR("Received only %d, expected %d", ret
, size
);
627 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD
);
631 cmsg
= CMSG_FIRSTHDR(&msg
);
633 ERR("Invalid control message header");
635 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD
);
639 /* if we received fds */
640 if (cmsg
->cmsg_level
== SOL_SOCKET
&& cmsg
->cmsg_type
== SCM_RIGHTS
) {
643 DBG("kconsumerd_add_fd %s (%d)", lkm
.path_name
, ((int *) CMSG_DATA(cmsg
))[0]);
644 ret
= kconsumerd_add_fd(&lkm
, ((int *) CMSG_DATA(cmsg
))[0]);
646 kconsumerd_send_error(KCONSUMERD_OUTFD_ERROR
);
651 kconsumerd_change_fd_state(lkm
.fd
, lkm
.state
);
656 /* signal the poll thread */
657 tmp2
= write(kconsumerd_poll_pipe
[1], "4", 1);
659 perror("write kconsumerd poll");
662 ERR("Didn't received any fd");
663 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD
);
674 * kconsumerd_thread_poll_fds
676 * This thread polls the fds in the ltt_fd_list to consume the data
677 * and write it to tracefile if necessary.
679 void *kconsumerd_thread_poll_fds(void *data
)
681 int num_rdy
, num_hup
, high_prio
, ret
, i
;
682 struct pollfd
*pollfd
= NULL
;
683 /* local view of the fds */
684 struct kconsumerd_fd
**local_kconsumerd_fd
= NULL
;
685 /* local view of kconsumerd_data.fds_count */
690 ret
= pipe(kconsumerd_thread_pipe
);
692 perror("Error creating pipe");
696 local_kconsumerd_fd
= malloc(sizeof(struct kconsumerd_fd
));
703 * the ltt_fd_list has been updated, we need to update our
704 * local array as well
706 pthread_mutex_lock(&kconsumerd_data
.lock
);
707 if (kconsumerd_data
.need_update
) {
708 if (pollfd
!= NULL
) {
712 if (local_kconsumerd_fd
!= NULL
) {
713 free(local_kconsumerd_fd
);
714 local_kconsumerd_fd
= NULL
;
717 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
718 pollfd
= malloc((kconsumerd_data
.fds_count
+ 1) * sizeof(struct pollfd
));
719 if (pollfd
== NULL
) {
720 perror("pollfd malloc");
721 pthread_mutex_unlock(&kconsumerd_data
.lock
);
725 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
726 local_kconsumerd_fd
= malloc((kconsumerd_data
.fds_count
+ 1) *
727 sizeof(struct kconsumerd_fd
));
728 if (local_kconsumerd_fd
== NULL
) {
729 perror("local_kconsumerd_fd malloc");
730 pthread_mutex_unlock(&kconsumerd_data
.lock
);
733 ret
= kconsumerd_update_poll_array(&pollfd
, local_kconsumerd_fd
);
735 ERR("Error in allocating pollfd or local_outfds");
736 kconsumerd_send_error(KCONSUMERD_POLL_ERROR
);
737 pthread_mutex_unlock(&kconsumerd_data
.lock
);
741 kconsumerd_data
.need_update
= 0;
743 pthread_mutex_unlock(&kconsumerd_data
.lock
);
745 /* poll on the array of fds */
746 DBG("polling on %d fd", nb_fd
+ 1);
747 num_rdy
= poll(pollfd
, nb_fd
+ 1, kconsumerd_poll_timeout
);
748 DBG("poll num_rdy : %d", num_rdy
);
750 perror("Poll error");
751 kconsumerd_send_error(KCONSUMERD_POLL_ERROR
);
753 } else if (num_rdy
== 0) {
754 DBG("Polling thread timed out");
758 /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */
759 if (nb_fd
== 0 && kconsumerd_quit
== 1) {
764 * If the kconsumerd_poll_pipe triggered poll go
765 * directly to the beginning of the loop to update the
766 * array. We want to prioritize array update over
767 * low-priority reads.
769 if (pollfd
[nb_fd
].revents
== POLLIN
) {
770 DBG("kconsumerd_poll_pipe wake up");
771 tmp2
= read(kconsumerd_poll_pipe
[0], &tmp
, 1);
773 perror("read kconsumerd poll");
778 /* Take care of high priority channels first. */
779 for (i
= 0; i
< nb_fd
; i
++) {
780 switch(pollfd
[i
].revents
) {
782 ERR("Error returned in polling fd %d.", pollfd
[i
].fd
);
783 kconsumerd_del_fd(local_kconsumerd_fd
[i
]);
787 DBG("Polling fd %d tells it has hung up.", pollfd
[i
].fd
);
788 kconsumerd_del_fd(local_kconsumerd_fd
[i
]);
792 ERR("Polling fd %d tells fd is not open.", pollfd
[i
].fd
);
793 kconsumerd_del_fd(local_kconsumerd_fd
[i
]);
797 DBG("Urgent read on fd %d", pollfd
[i
].fd
);
799 ret
= kconsumerd_read_subbuffer(local_kconsumerd_fd
[i
]);
800 /* it's ok to have an unavailable sub-buffer */
808 /* If every buffer FD has hung up, we end the read loop here */
809 if (nb_fd
> 0 && num_hup
== nb_fd
) {
810 DBG("every buffer FD has hung up\n");
811 if (kconsumerd_quit
== 1) {
817 /* Take care of low priority channels. */
818 if (high_prio
== 0) {
819 for (i
= 0; i
< nb_fd
; i
++) {
820 if (pollfd
[i
].revents
== POLLIN
) {
821 DBG("Normal read on fd %d", pollfd
[i
].fd
);
822 ret
= kconsumerd_read_subbuffer(local_kconsumerd_fd
[i
]);
823 /* it's ok to have an unavailable subbuffer */
832 DBG("polling thread exiting");
833 if (pollfd
!= NULL
) {
837 if (local_kconsumerd_fd
!= NULL
) {
838 free(local_kconsumerd_fd
);
839 local_kconsumerd_fd
= NULL
;
845 * kconsumerd_init(void)
847 * initialise the necessary environnement :
848 * - inform the polling thread to update the polling array
849 * - create the poll_pipe
850 * - create the should_quit pipe (for signal handler)
852 int kconsumerd_init(void)
856 /* need to update the polling array at init time */
857 kconsumerd_data
.need_update
= 1;
859 ret
= pipe(kconsumerd_poll_pipe
);
861 perror("Error creating poll pipe");
865 ret
= pipe(kconsumerd_should_quit
);
867 perror("Error creating recv pipe");
876 * kconsumerd_thread_receive_fds
878 * This thread listens on the consumerd socket and
879 * receives the file descriptors from ltt-sessiond
881 void *kconsumerd_thread_receive_fds(void *data
)
883 int sock
, client_socket
, ret
;
884 struct lttcomm_kconsumerd_header tmp
;
886 * structure to poll for incoming data on communication socket
887 * avoids making blocking sockets
889 struct pollfd kconsumerd_sockpoll
[2];
892 DBG("Creating command socket %s", kconsumerd_command_sock_path
);
893 unlink(kconsumerd_command_sock_path
);
894 client_socket
= lttcomm_create_unix_sock(kconsumerd_command_sock_path
);
895 if (client_socket
< 0) {
896 ERR("Cannot create command socket");
900 ret
= lttcomm_listen_unix_sock(client_socket
);
905 DBG("Sending ready command to ltt-sessiond");
906 ret
= kconsumerd_send_error(KCONSUMERD_COMMAND_SOCK_READY
);
908 ERR("Error sending ready command to ltt-sessiond");
912 ret
= fcntl(client_socket
, F_SETFL
, O_NONBLOCK
);
914 perror("fcntl O_NONBLOCK");
918 /* prepare the FDs to poll : to client socket and the should_quit pipe */
919 kconsumerd_sockpoll
[0].fd
= kconsumerd_should_quit
[0];
920 kconsumerd_sockpoll
[0].events
= POLLIN
| POLLPRI
;
921 kconsumerd_sockpoll
[1].fd
= client_socket
;
922 kconsumerd_sockpoll
[1].events
= POLLIN
| POLLPRI
;
924 if (kconsumerd_poll_socket(kconsumerd_sockpoll
) < 0) {
927 DBG("Connection on client_socket");
929 /* Blocking call, waiting for transmission */
930 sock
= lttcomm_accept_unix_sock(client_socket
);
935 ret
= fcntl(sock
, F_SETFL
, O_NONBLOCK
);
937 perror("fcntl O_NONBLOCK");
941 /* update the polling structure to poll on the established socket */
942 kconsumerd_sockpoll
[1].fd
= sock
;
943 kconsumerd_sockpoll
[1].events
= POLLIN
| POLLPRI
;
946 if (kconsumerd_poll_socket(kconsumerd_sockpoll
) < 0) {
949 DBG("Incoming fds on sock");
951 /* We first get the number of fd we are about to receive */
952 ret
= lttcomm_recv_unix_sock(sock
, &tmp
,
953 sizeof(struct lttcomm_kconsumerd_header
));
955 ERR("Communication interrupted on command socket");
958 if (tmp
.cmd_type
== STOP
) {
959 DBG("Received STOP command");
962 if (kconsumerd_quit
) {
963 DBG("kconsumerd_thread_receive_fds received quit from signal");
967 /* we received a command to add or update fds */
968 ret
= kconsumerd_consumerd_recv_fd(sock
, kconsumerd_sockpoll
,
969 tmp
.payload_size
, tmp
.cmd_type
);
971 ERR("Receiving the FD, exiting");
974 DBG("received fds on sock");
978 DBG("kconsumerd_thread_receive_fds exiting");
981 * when all fds have hung up, the polling thread
987 * 2s of grace period, if no polling events occur during
988 * this period, the polling thread will exit even if there
989 * are still open FDs (should not happen, but safety mechanism).
991 kconsumerd_poll_timeout
= KCONSUMERD_POLL_GRACE_PERIOD
;
993 /* wake up the polling thread */
994 ret
= write(kconsumerd_poll_pipe
[1], "4", 1);
996 perror("poll pipe write");
1002 * kconsumerd_cleanup
1004 * Cleanup the daemon's socket on exit
1006 void kconsumerd_cleanup(void)
1008 struct kconsumerd_fd
*iter
, *tmp
;
1010 /* remove the socket file */
1011 unlink(kconsumerd_command_sock_path
);
1014 * close all outfd. Called when there are no more threads
1015 * running (after joining on the threads), no need to protect
1016 * list iteration with mutex.
1018 cds_list_for_each_entry_safe(iter
, tmp
, &kconsumerd_data
.fd_list
.head
, list
) {
1019 kconsumerd_del_fd(iter
);
1024 * kconsumerd_should_exit
1026 * Called from signal handler.
1028 void kconsumerd_should_exit(void)
1031 kconsumerd_quit
= 1;
1032 ret
= write(kconsumerd_should_quit
[1], "4", 1);
1034 perror("write kconsumerd quit");
1039 * kconsumerd_send_error
1041 * send return code to ltt-sessiond
1043 int kconsumerd_send_error(enum lttcomm_return_code cmd
)
1045 if (kconsumerd_error_socket
> 0) {
1046 return lttcomm_send_unix_sock(kconsumerd_error_socket
, &cmd
,
1047 sizeof(enum lttcomm_sessiond_command
));