2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; only version 2
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
27 #include <sys/socket.h>
28 #include <sys/types.h>
30 #include <urcu/list.h>
33 #include "kernelctl.h"
34 #include "lttkconsumerd.h"
38 struct kconsumerd_global_data
{
40 * kconsumerd_data.lock protects kconsumerd_data.fd_list,
41 * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It
42 * ensures the count matches the number of items in the fd_list.
43 * It ensures the list updates *always* trigger an fd_array
44 * update (therefore need to make list update vs
45 * kconsumerd_data.need_update flag update atomic, and also flag
46 * read, fd array and flag clear atomic).
50 * Number of element for the list below. Protected by
51 * kconsumerd_data.lock.
53 unsigned int fds_count
;
55 * List of FDs. Protected by kconsumerd_data.lock.
57 struct kconsumerd_fd_list fd_list
;
59 * Flag specifying if the local array of FDs needs update in the
60 * poll function. Protected by kconsumerd_data.lock.
62 unsigned int need_update
;
64 .fd_list
.head
= CDS_LIST_HEAD_INIT(kconsumerd_data
.fd_list
.head
),
69 /* timeout parameter, to control the polling thread grace period */
70 static int kconsumerd_poll_timeout
= -1;
73 * flag to inform the polling thread to quit when all fd hung up.
74 * Updated by the kconsumerd_thread_receive_fds when it notices that all
75 * fds has hung up. Also updated by the signal handler
76 * (kconsumerd_should_exit()). Read by the polling threads.
78 static volatile int kconsumerd_quit
= 0;
81 * kconsumerd_set_error_socket
83 * Set the error socket
85 void kconsumerd_set_error_socket(struct kconsumerd_local_data
*ctx
, int sock
)
87 ctx
->kconsumerd_error_socket
= sock
;
91 * kconsumerd_set_command_socket_path
93 * Set the command socket path
95 void kconsumerd_set_command_socket_path(struct kconsumerd_local_data
*ctx
,
98 ctx
->kconsumerd_command_sock_path
= sock
;
102 * kconsumerd_find_session_fd
104 * Find a session fd in the global list.
105 * The kconsumerd_data.lock must be locked during this call
107 * Return 1 if found else 0
109 static int kconsumerd_find_session_fd(int fd
)
111 struct kconsumerd_fd
*iter
;
113 cds_list_for_each_entry(iter
, &kconsumerd_data
.fd_list
.head
, list
) {
114 if (iter
->sessiond_fd
== fd
) {
115 DBG("Duplicate session fd %d", fd
);
126 * Remove a fd from the global list protected by a mutex
128 static void kconsumerd_del_fd(struct kconsumerd_fd
*lcf
)
130 pthread_mutex_lock(&kconsumerd_data
.lock
);
131 cds_list_del(&lcf
->list
);
132 if (kconsumerd_data
.fds_count
> 0) {
133 kconsumerd_data
.fds_count
--;
135 if (lcf
->out_fd
!= 0) {
138 close(lcf
->consumerd_fd
);
143 kconsumerd_data
.need_update
= 1;
144 pthread_mutex_unlock(&kconsumerd_data
.lock
);
150 * Add a fd to the global list protected by a mutex
152 static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg
*buf
, int consumerd_fd
)
154 struct kconsumerd_fd
*tmp_fd
;
157 pthread_mutex_lock(&kconsumerd_data
.lock
);
158 /* Check if already exist */
159 ret
= kconsumerd_find_session_fd(buf
->fd
);
164 tmp_fd
= malloc(sizeof(struct kconsumerd_fd
));
165 tmp_fd
->sessiond_fd
= buf
->fd
;
166 tmp_fd
->consumerd_fd
= consumerd_fd
;
167 tmp_fd
->state
= buf
->state
;
168 tmp_fd
->max_sb_size
= buf
->max_sb_size
;
170 tmp_fd
->out_fd_offset
= 0;
171 strncpy(tmp_fd
->path_name
, buf
->path_name
, PATH_MAX
);
172 tmp_fd
->path_name
[PATH_MAX
- 1] = '\0';
174 /* Opening the tracefile in write mode */
175 if (tmp_fd
->path_name
!= NULL
) {
176 ret
= open(tmp_fd
->path_name
,
177 O_WRONLY
|O_CREAT
|O_TRUNC
, S_IRWXU
|S_IRWXG
|S_IRWXO
);
179 ERR("Opening %s", tmp_fd
->path_name
);
183 tmp_fd
->out_fd
= ret
;
184 DBG("Adding %s (%d, %d, %d)", tmp_fd
->path_name
,
185 tmp_fd
->sessiond_fd
, tmp_fd
->consumerd_fd
, tmp_fd
->out_fd
);
188 cds_list_add(&tmp_fd
->list
, &kconsumerd_data
.fd_list
.head
);
189 kconsumerd_data
.fds_count
++;
190 kconsumerd_data
.need_update
= 1;
192 pthread_mutex_unlock(&kconsumerd_data
.lock
);
197 * kconsumerd_change_fd_state
199 * Update a fd according to what we just received
201 static void kconsumerd_change_fd_state(int sessiond_fd
,
202 enum kconsumerd_fd_state state
)
204 struct kconsumerd_fd
*iter
;
206 pthread_mutex_lock(&kconsumerd_data
.lock
);
207 cds_list_for_each_entry(iter
, &kconsumerd_data
.fd_list
.head
, list
) {
208 if (iter
->sessiond_fd
== sessiond_fd
) {
213 kconsumerd_data
.need_update
= 1;
214 pthread_mutex_unlock(&kconsumerd_data
.lock
);
218 * kconsumerd_update_poll_array
220 * Allocate the pollfd structure and the local view of the out fds
221 * to avoid doing a lookup in the linked list and concurrency issues
222 * when writing is needed.
223 * Returns the number of fds in the structures
224 * Called with kconsumerd_data.lock held.
226 static int kconsumerd_update_poll_array(struct kconsumerd_local_data
*ctx
,
227 struct pollfd
**pollfd
, struct kconsumerd_fd
**local_kconsumerd_fd
)
229 struct kconsumerd_fd
*iter
;
232 DBG("Updating poll fd array");
233 cds_list_for_each_entry(iter
, &kconsumerd_data
.fd_list
.head
, list
) {
234 if (iter
->state
== ACTIVE_FD
) {
235 DBG("Active FD %d", iter
->consumerd_fd
);
236 (*pollfd
)[i
].fd
= iter
->consumerd_fd
;
237 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
238 local_kconsumerd_fd
[i
] = iter
;
244 * insert the kconsumerd_poll_pipe at the end of the array and don't
245 * increment i so nb_fd is the number of real FD
247 (*pollfd
)[i
].fd
= ctx
->kconsumerd_poll_pipe
[0];
248 (*pollfd
)[i
].events
= POLLIN
;
254 * kconsumerd_on_read_subbuffer_mmap
256 * mmap the ring buffer, read it and write the data to the tracefile.
257 * Returns the number of bytes written
259 int kconsumerd_on_read_subbuffer_mmap(struct kconsumerd_local_data
*ctx
,
260 struct kconsumerd_fd
*kconsumerd_fd
, unsigned long len
)
262 unsigned long mmap_len
, mmap_offset
, padded_len
, padding_len
;
264 char *padding
= NULL
;
266 off_t orig_offset
= kconsumerd_fd
->out_fd_offset
;
267 int fd
= kconsumerd_fd
->consumerd_fd
;
268 int outfd
= kconsumerd_fd
->out_fd
;
270 /* get the padded subbuffer size to know the padding required */
271 ret
= kernctl_get_padded_subbuf_size(fd
, &padded_len
);
274 perror("kernctl_get_padded_subbuf_size");
277 padding_len
= padded_len
- len
;
278 padding
= malloc(padding_len
* sizeof(char));
279 memset(padding
, '\0', padding_len
);
281 /* get the len of the mmap region */
282 ret
= kernctl_get_mmap_len(fd
, &mmap_len
);
285 perror("kernctl_get_mmap_len");
289 /* get the offset inside the fd to mmap */
290 ret
= kernctl_get_mmap_read_offset(fd
, &mmap_offset
);
293 perror("kernctl_get_mmap_read_offset");
297 mmap_base
= mmap(NULL
, mmap_len
, PROT_READ
, MAP_PRIVATE
, fd
, mmap_offset
);
298 if (mmap_base
== MAP_FAILED
) {
299 perror("Error mmaping");
305 ret
= write(outfd
, mmap_base
, len
);
308 } else if (ret
< 0) {
310 perror("Error in file write");
313 /* This won't block, but will start writeout asynchronously */
314 sync_file_range(outfd
, kconsumerd_fd
->out_fd_offset
, ret
,
315 SYNC_FILE_RANGE_WRITE
);
316 kconsumerd_fd
->out_fd_offset
+= ret
;
319 /* once all the data is written, write the padding to disk */
320 ret
= write(outfd
, padding
, padding_len
);
323 perror("Error writing padding to file");
328 * This does a blocking write-and-wait on any page that belongs to the
329 * subbuffer prior to the one we just wrote.
330 * Don't care about error values, as these are just hints and ways to
331 * limit the amount of page cache used.
333 if (orig_offset
>= kconsumerd_fd
->max_sb_size
) {
334 sync_file_range(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
335 kconsumerd_fd
->max_sb_size
,
336 SYNC_FILE_RANGE_WAIT_BEFORE
337 | SYNC_FILE_RANGE_WRITE
338 | SYNC_FILE_RANGE_WAIT_AFTER
);
341 * Give hints to the kernel about how we access the file:
342 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
345 * We need to call fadvise again after the file grows because the
346 * kernel does not seem to apply fadvise to non-existing parts of the
349 * Call fadvise _after_ having waited for the page writeback to
350 * complete because the dirty page writeback semantic is not well
351 * defined. So it can be expected to lead to lower throughput in
354 posix_fadvise(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
355 kconsumerd_fd
->max_sb_size
, POSIX_FADV_DONTNEED
);
360 if (padding
!= NULL
) {
367 * kconsumerd_on_read_subbuffer
369 * Splice the data from the ring buffer to the tracefile.
370 * Returns the number of bytes spliced
372 int kconsumerd_on_read_subbuffer_splice(struct kconsumerd_local_data
*ctx
,
373 struct kconsumerd_fd
*kconsumerd_fd
, unsigned long len
)
377 off_t orig_offset
= kconsumerd_fd
->out_fd_offset
;
378 int fd
= kconsumerd_fd
->consumerd_fd
;
379 int outfd
= kconsumerd_fd
->out_fd
;
382 DBG("splice chan to pipe offset %lu (fd : %d)",
383 (unsigned long)offset
, fd
);
384 ret
= splice(fd
, &offset
, ctx
->kconsumerd_thread_pipe
[1], NULL
, len
,
385 SPLICE_F_MOVE
| SPLICE_F_MORE
);
386 DBG("splice chan to pipe ret %ld", ret
);
389 perror("Error in relay splice");
393 ret
= splice(ctx
->kconsumerd_thread_pipe
[0], NULL
, outfd
, NULL
, ret
,
394 SPLICE_F_MOVE
| SPLICE_F_MORE
);
395 DBG("splice pipe to file %ld", ret
);
398 perror("Error in file splice");
404 /* This won't block, but will start writeout asynchronously */
405 sync_file_range(outfd
, kconsumerd_fd
->out_fd_offset
, ret
,
406 SYNC_FILE_RANGE_WRITE
);
407 kconsumerd_fd
->out_fd_offset
+= ret
;
411 * This does a blocking write-and-wait on any page that belongs to the
412 * subbuffer prior to the one we just wrote.
413 * Don't care about error values, as these are just hints and ways to
414 * limit the amount of page cache used.
416 if (orig_offset
>= kconsumerd_fd
->max_sb_size
) {
417 sync_file_range(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
418 kconsumerd_fd
->max_sb_size
,
419 SYNC_FILE_RANGE_WAIT_BEFORE
420 | SYNC_FILE_RANGE_WRITE
421 | SYNC_FILE_RANGE_WAIT_AFTER
);
423 * Give hints to the kernel about how we access the file:
424 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
427 * We need to call fadvise again after the file grows because the
428 * kernel does not seem to apply fadvise to non-existing parts of the
431 * Call fadvise _after_ having waited for the page writeback to
432 * complete because the dirty page writeback semantic is not well
433 * defined. So it can be expected to lead to lower throughput in
436 posix_fadvise(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
437 kconsumerd_fd
->max_sb_size
, POSIX_FADV_DONTNEED
);
442 /* send the appropriate error description to sessiond */
445 kconsumerd_send_error(ctx
, KCONSUMERD_SPLICE_EBADF
);
448 kconsumerd_send_error(ctx
, KCONSUMERD_SPLICE_EINVAL
);
451 kconsumerd_send_error(ctx
, KCONSUMERD_SPLICE_ENOMEM
);
454 kconsumerd_send_error(ctx
, KCONSUMERD_SPLICE_ESPIPE
);
463 * kconsumerd_poll_socket
465 * Poll on the should_quit pipe and the command socket
466 * return -1 on error and should exit, 0 if data is
467 * available on the command socket
469 int kconsumerd_poll_socket(struct pollfd
*kconsumerd_sockpoll
)
473 num_rdy
= poll(kconsumerd_sockpoll
, 2, -1);
475 perror("Poll error");
478 if (kconsumerd_sockpoll
[0].revents
== POLLIN
) {
479 DBG("kconsumerd_should_quit wake up");
489 * kconsumerd_consumerd_recv_fd
491 * Receives an array of file descriptors and the associated
492 * structures describing each fd (path name).
493 * Returns the size of received data
495 static int kconsumerd_consumerd_recv_fd(struct kconsumerd_local_data
*ctx
,
496 int sfd
, struct pollfd
*kconsumerd_sockpoll
, int size
,
497 enum kconsumerd_command cmd_type
)
500 int ret
= 0, i
, tmp2
;
501 struct cmsghdr
*cmsg
;
503 char recv_fd
[CMSG_SPACE(sizeof(int))];
504 struct lttcomm_kconsumerd_msg lkm
;
506 /* the number of fds we are about to receive */
507 nb_fd
= size
/ sizeof(struct lttcomm_kconsumerd_msg
);
510 * nb_fd is the number of fds we receive. One fd per recvmsg.
512 for (i
= 0; i
< nb_fd
; i
++) {
513 struct msghdr msg
= { 0 };
515 /* Prepare to receive the structures */
516 iov
[0].iov_base
= &lkm
;
517 iov
[0].iov_len
= sizeof(lkm
);
521 msg
.msg_control
= recv_fd
;
522 msg
.msg_controllen
= sizeof(recv_fd
);
524 DBG("Waiting to receive fd");
525 if (kconsumerd_poll_socket(kconsumerd_sockpoll
) < 0) {
529 if ((ret
= recvmsg(sfd
, &msg
, 0)) < 0) {
534 if (ret
!= (size
/ nb_fd
)) {
535 ERR("Received only %d, expected %d", ret
, size
);
536 kconsumerd_send_error(ctx
, KCONSUMERD_ERROR_RECV_FD
);
540 cmsg
= CMSG_FIRSTHDR(&msg
);
542 ERR("Invalid control message header");
544 kconsumerd_send_error(ctx
, KCONSUMERD_ERROR_RECV_FD
);
548 /* if we received fds */
549 if (cmsg
->cmsg_level
== SOL_SOCKET
&& cmsg
->cmsg_type
== SCM_RIGHTS
) {
552 DBG("kconsumerd_add_fd %s (%d)", lkm
.path_name
, ((int *) CMSG_DATA(cmsg
))[0]);
553 ret
= kconsumerd_add_fd(&lkm
, ((int *) CMSG_DATA(cmsg
))[0]);
555 kconsumerd_send_error(ctx
, KCONSUMERD_OUTFD_ERROR
);
560 kconsumerd_change_fd_state(lkm
.fd
, lkm
.state
);
565 /* signal the poll thread */
566 tmp2
= write(ctx
->kconsumerd_poll_pipe
[1], "4", 1);
568 perror("write kconsumerd poll");
571 ERR("Didn't received any fd");
572 kconsumerd_send_error(ctx
, KCONSUMERD_ERROR_RECV_FD
);
583 * kconsumerd_thread_poll_fds
585 * This thread polls the fds in the ltt_fd_list to consume the data
586 * and write it to tracefile if necessary.
588 void *kconsumerd_thread_poll_fds(void *data
)
590 int num_rdy
, num_hup
, high_prio
, ret
, i
;
591 struct pollfd
*pollfd
= NULL
;
592 /* local view of the fds */
593 struct kconsumerd_fd
**local_kconsumerd_fd
= NULL
;
594 /* local view of kconsumerd_data.fds_count */
598 struct kconsumerd_local_data
*ctx
= data
;
601 local_kconsumerd_fd
= malloc(sizeof(struct kconsumerd_fd
));
608 * the ltt_fd_list has been updated, we need to update our
609 * local array as well
611 pthread_mutex_lock(&kconsumerd_data
.lock
);
612 if (kconsumerd_data
.need_update
) {
613 if (pollfd
!= NULL
) {
617 if (local_kconsumerd_fd
!= NULL
) {
618 free(local_kconsumerd_fd
);
619 local_kconsumerd_fd
= NULL
;
622 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
623 pollfd
= malloc((kconsumerd_data
.fds_count
+ 1) * sizeof(struct pollfd
));
624 if (pollfd
== NULL
) {
625 perror("pollfd malloc");
626 pthread_mutex_unlock(&kconsumerd_data
.lock
);
630 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
631 local_kconsumerd_fd
= malloc((kconsumerd_data
.fds_count
+ 1) *
632 sizeof(struct kconsumerd_fd
));
633 if (local_kconsumerd_fd
== NULL
) {
634 perror("local_kconsumerd_fd malloc");
635 pthread_mutex_unlock(&kconsumerd_data
.lock
);
638 ret
= kconsumerd_update_poll_array(ctx
, &pollfd
, local_kconsumerd_fd
);
640 ERR("Error in allocating pollfd or local_outfds");
641 kconsumerd_send_error(ctx
, KCONSUMERD_POLL_ERROR
);
642 pthread_mutex_unlock(&kconsumerd_data
.lock
);
646 kconsumerd_data
.need_update
= 0;
648 pthread_mutex_unlock(&kconsumerd_data
.lock
);
650 /* poll on the array of fds */
651 DBG("polling on %d fd", nb_fd
+ 1);
652 num_rdy
= poll(pollfd
, nb_fd
+ 1, kconsumerd_poll_timeout
);
653 DBG("poll num_rdy : %d", num_rdy
);
655 perror("Poll error");
656 kconsumerd_send_error(ctx
, KCONSUMERD_POLL_ERROR
);
658 } else if (num_rdy
== 0) {
659 DBG("Polling thread timed out");
663 /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */
664 if (nb_fd
== 0 && kconsumerd_quit
== 1) {
669 * If the kconsumerd_poll_pipe triggered poll go
670 * directly to the beginning of the loop to update the
671 * array. We want to prioritize array update over
672 * low-priority reads.
674 if (pollfd
[nb_fd
].revents
== POLLIN
) {
675 DBG("kconsumerd_poll_pipe wake up");
676 tmp2
= read(ctx
->kconsumerd_poll_pipe
[0], &tmp
, 1);
678 perror("read kconsumerd poll");
683 /* Take care of high priority channels first. */
684 for (i
= 0; i
< nb_fd
; i
++) {
685 switch(pollfd
[i
].revents
) {
687 ERR("Error returned in polling fd %d.", pollfd
[i
].fd
);
688 kconsumerd_del_fd(local_kconsumerd_fd
[i
]);
692 DBG("Polling fd %d tells it has hung up.", pollfd
[i
].fd
);
693 kconsumerd_del_fd(local_kconsumerd_fd
[i
]);
697 ERR("Polling fd %d tells fd is not open.", pollfd
[i
].fd
);
698 kconsumerd_del_fd(local_kconsumerd_fd
[i
]);
702 DBG("Urgent read on fd %d", pollfd
[i
].fd
);
704 ret
= ctx
->on_buffer_ready(local_kconsumerd_fd
[i
]);
705 /* it's ok to have an unavailable sub-buffer */
713 /* If every buffer FD has hung up, we end the read loop here */
714 if (nb_fd
> 0 && num_hup
== nb_fd
) {
715 DBG("every buffer FD has hung up\n");
716 if (kconsumerd_quit
== 1) {
722 /* Take care of low priority channels. */
723 if (high_prio
== 0) {
724 for (i
= 0; i
< nb_fd
; i
++) {
725 if (pollfd
[i
].revents
== POLLIN
) {
726 DBG("Normal read on fd %d", pollfd
[i
].fd
);
727 ret
= ctx
->on_buffer_ready(local_kconsumerd_fd
[i
]);
728 /* it's ok to have an unavailable subbuffer */
737 DBG("polling thread exiting");
738 if (pollfd
!= NULL
) {
742 if (local_kconsumerd_fd
!= NULL
) {
743 free(local_kconsumerd_fd
);
744 local_kconsumerd_fd
= NULL
;
752 * initialise the necessary environnement :
753 * - create a new context
754 * - create the poll_pipe
755 * - create the should_quit pipe (for signal handler)
756 * - create the thread pipe (for splice)
757 * Takes a function pointer as argument, this function is called when data is
758 * available on a buffer. This function is responsible to do the
759 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
760 * buffer configuration and then kernctl_put_next_subbuf at the end.
761 * Returns a pointer to the new context or NULL on error.
763 struct kconsumerd_local_data
*kconsumerd_create(
764 int (*buffer_ready
)(struct kconsumerd_fd
*kconsumerd_fd
))
767 struct kconsumerd_local_data
*ctx
;
769 ctx
= malloc(sizeof(struct kconsumerd_local_data
));
771 perror("allocating context");
775 ctx
->on_buffer_ready
= buffer_ready
;
777 ret
= pipe(ctx
->kconsumerd_poll_pipe
);
779 perror("Error creating poll pipe");
784 ret
= pipe(ctx
->kconsumerd_should_quit
);
786 perror("Error creating recv pipe");
791 ret
= pipe(ctx
->kconsumerd_thread_pipe
);
793 perror("Error creating thread pipe");
805 * Close all fds associated with the instance and free the context
807 void kconsumerd_destroy(struct kconsumerd_local_data
*ctx
)
809 close(ctx
->kconsumerd_error_socket
);
810 close(ctx
->kconsumerd_thread_pipe
[0]);
811 close(ctx
->kconsumerd_thread_pipe
[1]);
812 close(ctx
->kconsumerd_poll_pipe
[0]);
813 close(ctx
->kconsumerd_poll_pipe
[1]);
814 close(ctx
->kconsumerd_should_quit
[0]);
815 close(ctx
->kconsumerd_should_quit
[1]);
816 unlink(ctx
->kconsumerd_command_sock_path
);
822 * kconsumerd_thread_receive_fds
824 * This thread listens on the consumerd socket and
825 * receives the file descriptors from ltt-sessiond
827 void *kconsumerd_thread_receive_fds(void *data
)
829 int sock
, client_socket
, ret
;
830 struct lttcomm_kconsumerd_header tmp
;
832 * structure to poll for incoming data on communication socket
833 * avoids making blocking sockets
835 struct pollfd kconsumerd_sockpoll
[2];
836 struct kconsumerd_local_data
*ctx
= data
;
839 DBG("Creating command socket %s", ctx
->kconsumerd_command_sock_path
);
840 unlink(ctx
->kconsumerd_command_sock_path
);
841 client_socket
= lttcomm_create_unix_sock(ctx
->kconsumerd_command_sock_path
);
842 if (client_socket
< 0) {
843 ERR("Cannot create command socket");
847 ret
= lttcomm_listen_unix_sock(client_socket
);
852 DBG("Sending ready command to ltt-sessiond");
853 ret
= kconsumerd_send_error(ctx
, KCONSUMERD_COMMAND_SOCK_READY
);
855 ERR("Error sending ready command to ltt-sessiond");
859 ret
= fcntl(client_socket
, F_SETFL
, O_NONBLOCK
);
861 perror("fcntl O_NONBLOCK");
865 /* prepare the FDs to poll : to client socket and the should_quit pipe */
866 kconsumerd_sockpoll
[0].fd
= ctx
->kconsumerd_should_quit
[0];
867 kconsumerd_sockpoll
[0].events
= POLLIN
| POLLPRI
;
868 kconsumerd_sockpoll
[1].fd
= client_socket
;
869 kconsumerd_sockpoll
[1].events
= POLLIN
| POLLPRI
;
871 if (kconsumerd_poll_socket(kconsumerd_sockpoll
) < 0) {
874 DBG("Connection on client_socket");
876 /* Blocking call, waiting for transmission */
877 sock
= lttcomm_accept_unix_sock(client_socket
);
882 ret
= fcntl(sock
, F_SETFL
, O_NONBLOCK
);
884 perror("fcntl O_NONBLOCK");
888 /* update the polling structure to poll on the established socket */
889 kconsumerd_sockpoll
[1].fd
= sock
;
890 kconsumerd_sockpoll
[1].events
= POLLIN
| POLLPRI
;
893 if (kconsumerd_poll_socket(kconsumerd_sockpoll
) < 0) {
896 DBG("Incoming fds on sock");
898 /* We first get the number of fd we are about to receive */
899 ret
= lttcomm_recv_unix_sock(sock
, &tmp
,
900 sizeof(struct lttcomm_kconsumerd_header
));
902 ERR("Communication interrupted on command socket");
905 if (tmp
.cmd_type
== STOP
) {
906 DBG("Received STOP command");
909 if (kconsumerd_quit
) {
910 DBG("kconsumerd_thread_receive_fds received quit from signal");
914 /* we received a command to add or update fds */
915 ret
= kconsumerd_consumerd_recv_fd(ctx
, sock
, kconsumerd_sockpoll
,
916 tmp
.payload_size
, tmp
.cmd_type
);
918 ERR("Receiving the FD, exiting");
921 DBG("received fds on sock");
925 DBG("kconsumerd_thread_receive_fds exiting");
928 * when all fds have hung up, the polling thread
934 * 2s of grace period, if no polling events occur during
935 * this period, the polling thread will exit even if there
936 * are still open FDs (should not happen, but safety mechanism).
938 kconsumerd_poll_timeout
= KCONSUMERD_POLL_GRACE_PERIOD
;
940 /* wake up the polling thread */
941 ret
= write(ctx
->kconsumerd_poll_pipe
[1], "4", 1);
943 perror("poll pipe write");
951 * Close all the tracefiles and stream fds, should be called when all
952 * instances are destroyed.
954 void kconsumerd_cleanup(void)
956 struct kconsumerd_fd
*iter
, *tmp
;
959 * close all outfd. Called when there are no more threads
960 * running (after joining on the threads), no need to protect
961 * list iteration with mutex.
963 cds_list_for_each_entry_safe(iter
, tmp
, &kconsumerd_data
.fd_list
.head
, list
) {
964 kconsumerd_del_fd(iter
);
969 * kconsumerd_should_exit
971 * Called from signal handler.
973 void kconsumerd_should_exit(struct kconsumerd_local_data
*ctx
)
977 ret
= write(ctx
->kconsumerd_should_quit
[1], "4", 1);
979 perror("write kconsumerd quit");
984 * kconsumerd_send_error
986 * send return code to ltt-sessiond
988 int kconsumerd_send_error(struct kconsumerd_local_data
*ctx
, enum lttcomm_return_code cmd
)
990 if (ctx
->kconsumerd_error_socket
> 0) {
991 return lttcomm_send_unix_sock(ctx
->kconsumerd_error_socket
, &cmd
,
992 sizeof(enum lttcomm_sessiond_command
));