2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; only version 2
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
28 #include <sys/socket.h>
29 #include <sys/types.h>
32 #include <lttng-kernel-ctl.h>
33 #include <lttng-sessiond-comm.h>
34 #include <lttng/lttng-kconsumerd.h>
37 static struct lttng_kconsumerd_global_data
{
39 * kconsumerd_data.lock protects kconsumerd_data.fd_list,
40 * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It ensures
41 * the count matches the number of items in the fd_list. It ensures the
42 * list updates *always* trigger an fd_array update (therefore need to make
43 * list update vs kconsumerd_data.need_update flag update atomic, and also
44 * flag read, fd array and flag clear atomic).
48 * Number of element for the list below. Protected by kconsumerd_data.lock.
50 unsigned int fds_count
;
52 * List of FDs. Protected by kconsumerd_data.lock.
54 struct lttng_kconsumerd_fd_list fd_list
;
56 * Flag specifying if the local array of FDs needs update in the poll
57 * function. Protected by kconsumerd_data.lock.
59 unsigned int need_update
;
61 .fd_list
.head
= CDS_LIST_HEAD_INIT(kconsumerd_data
.fd_list
.head
),
66 /* timeout parameter, to control the polling thread grace period. */
67 static int kconsumerd_poll_timeout
= -1;
70 * Flag to inform the polling thread to quit when all fd hung up. Updated by
71 * the kconsumerd_thread_receive_fds when it notices that all fds has hung up.
72 * Also updated by the signal handler (kconsumerd_should_exit()). Read by the
75 static volatile int kconsumerd_quit
= 0;
78 * Find a session fd in the global list. The kconsumerd_data.lock must be
79 * locked during this call.
81 * Return 1 if found else 0.
83 static int kconsumerd_find_session_fd(int fd
)
85 struct lttng_kconsumerd_fd
*iter
;
87 cds_list_for_each_entry(iter
, &kconsumerd_data
.fd_list
.head
, list
) {
88 if (iter
->sessiond_fd
== fd
) {
89 DBG("Duplicate session fd %d", fd
);
98 * Remove a fd from the global list protected by a mutex.
100 static void kconsumerd_del_fd(struct lttng_kconsumerd_fd
*lcf
)
103 pthread_mutex_lock(&kconsumerd_data
.lock
);
104 cds_list_del(&lcf
->list
);
105 if (kconsumerd_data
.fds_count
> 0) {
106 kconsumerd_data
.fds_count
--;
108 if (lcf
->mmap_base
!= NULL
) {
109 ret
= munmap(lcf
->mmap_base
, lcf
->mmap_len
);
114 if (lcf
->out_fd
!= 0) {
117 close(lcf
->consumerd_fd
);
122 kconsumerd_data
.need_update
= 1;
123 pthread_mutex_unlock(&kconsumerd_data
.lock
);
127 * Create a struct lttcomm_kconsumerd_msg from the
128 * information received on the receiving socket
130 struct lttng_kconsumerd_fd
*kconsumerd_allocate_fd(
131 struct lttcomm_kconsumerd_msg
*buf
,
134 struct lttng_kconsumerd_fd
*tmp_fd
;
136 tmp_fd
= malloc(sizeof(struct lttng_kconsumerd_fd
));
137 if (tmp_fd
== NULL
) {
138 perror("malloc struct lttng_kconsumerd_fd");
142 tmp_fd
->sessiond_fd
= buf
->fd
;
143 tmp_fd
->consumerd_fd
= consumerd_fd
;
144 tmp_fd
->state
= buf
->state
;
145 tmp_fd
->max_sb_size
= buf
->max_sb_size
;
147 tmp_fd
->out_fd_offset
= 0;
148 tmp_fd
->mmap_len
= 0;
149 tmp_fd
->mmap_base
= NULL
;
150 tmp_fd
->output
= buf
->output
;
151 strncpy(tmp_fd
->path_name
, buf
->path_name
, PATH_MAX
);
152 tmp_fd
->path_name
[PATH_MAX
- 1] = '\0';
153 DBG("Allocated %s (sessiond_fd %d, consumerd_fd %d, out_fd %d)",
154 tmp_fd
->path_name
, tmp_fd
->sessiond_fd
,
155 tmp_fd
->consumerd_fd
, tmp_fd
->out_fd
);
162 * Add a fd to the global list protected by a mutex.
164 static int kconsumerd_add_fd(struct lttng_kconsumerd_fd
*tmp_fd
)
168 pthread_mutex_lock(&kconsumerd_data
.lock
);
169 /* Check if already exist */
170 ret
= kconsumerd_find_session_fd(tmp_fd
->sessiond_fd
);
174 cds_list_add(&tmp_fd
->list
, &kconsumerd_data
.fd_list
.head
);
175 kconsumerd_data
.fds_count
++;
176 kconsumerd_data
.need_update
= 1;
179 pthread_mutex_unlock(&kconsumerd_data
.lock
);
184 * Update a fd according to what we just received.
186 static void kconsumerd_change_fd_state(int sessiond_fd
,
187 enum lttng_kconsumerd_fd_state state
)
189 struct lttng_kconsumerd_fd
*iter
;
191 pthread_mutex_lock(&kconsumerd_data
.lock
);
192 cds_list_for_each_entry(iter
, &kconsumerd_data
.fd_list
.head
, list
) {
193 if (iter
->sessiond_fd
== sessiond_fd
) {
198 kconsumerd_data
.need_update
= 1;
199 pthread_mutex_unlock(&kconsumerd_data
.lock
);
203 * Allocate the pollfd structure and the local view of the out fds to avoid
204 * doing a lookup in the linked list and concurrency issues when writing is
205 * needed. Called with kconsumerd_data.lock held.
207 * Returns the number of fds in the structures.
209 static int kconsumerd_update_poll_array(
210 struct lttng_kconsumerd_local_data
*ctx
, struct pollfd
**pollfd
,
211 struct lttng_kconsumerd_fd
**local_kconsumerd_fd
)
213 struct lttng_kconsumerd_fd
*iter
;
216 DBG("Updating poll fd array");
217 cds_list_for_each_entry(iter
, &kconsumerd_data
.fd_list
.head
, list
) {
218 if (iter
->state
== ACTIVE_FD
) {
219 DBG("Active FD %d", iter
->consumerd_fd
);
220 (*pollfd
)[i
].fd
= iter
->consumerd_fd
;
221 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
222 local_kconsumerd_fd
[i
] = iter
;
228 * Insert the kconsumerd_poll_pipe at the end of the array and don't
229 * increment i so nb_fd is the number of real FD.
231 (*pollfd
)[i
].fd
= ctx
->kconsumerd_poll_pipe
[0];
232 (*pollfd
)[i
].events
= POLLIN
;
237 * Receives an array of file descriptors and the associated structures
238 * describing each fd (path name).
240 * Returns the size of received data
242 static int kconsumerd_consumerd_recv_fd(
243 struct lttng_kconsumerd_local_data
*ctx
, int sfd
,
244 struct pollfd
*kconsumerd_sockpoll
, int size
,
245 enum lttng_kconsumerd_command cmd_type
)
248 int ret
= 0, i
, j
, tmp2
;
249 struct cmsghdr
*cmsg
;
251 char recv_fd
[CMSG_SPACE(sizeof(int))];
252 struct lttcomm_kconsumerd_msg lkm
;
253 struct lttng_kconsumerd_fd
*new_fd
;
259 /* the number of fds we are about to receive */
260 nb_fd
= size
/ sizeof(struct lttcomm_kconsumerd_msg
);
263 * nb_fd is the number of fds we receive. One fd per recvmsg.
265 for (i
= 0; i
< nb_fd
; i
++) {
266 struct msghdr msg
= { 0 };
268 /* Prepare to receive the structures */
269 iov
[0].iov_base
= &lkm
;
270 iov
[0].iov_len
= sizeof(lkm
);
274 msg
.msg_control
= recv_fd
;
275 msg
.msg_controllen
= sizeof(recv_fd
);
277 DBG("Waiting to receive fd");
278 if (lttng_kconsumerd_poll_socket(kconsumerd_sockpoll
) < 0) {
282 if ((ret
= recvmsg(sfd
, &msg
, 0)) < 0) {
287 if (ret
!= (size
/ nb_fd
)) {
288 ERR("Received only %d, expected %d", ret
, size
);
289 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_ERROR_RECV_FD
);
293 cmsg
= CMSG_FIRSTHDR(&msg
);
295 ERR("Invalid control message header");
297 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_ERROR_RECV_FD
);
301 /* if we received fds */
302 if (cmsg
->cmsg_level
== SOL_SOCKET
&& cmsg
->cmsg_type
== SCM_RIGHTS
) {
305 for (j
= 0; j
< sizeof(int); j
++)
306 tmp
.vc
[j
] = CMSG_DATA(cmsg
)[j
];
307 DBG("kconsumerd_add_fd %s (%d)", lkm
.path_name
, tmp
.vi
);
308 new_fd
= kconsumerd_allocate_fd(&lkm
, tmp
.vi
);
309 if (new_fd
== NULL
) {
310 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_OUTFD_ERROR
);
314 if (ctx
->on_recv_fd
!= NULL
) {
315 ret
= ctx
->on_recv_fd(new_fd
);
317 kconsumerd_add_fd(new_fd
);
318 } else if (ret
< 0) {
322 kconsumerd_add_fd(new_fd
);
326 if (ctx
->on_update_fd
!= NULL
) {
327 ret
= ctx
->on_update_fd(lkm
.fd
, lkm
.state
);
329 kconsumerd_change_fd_state(lkm
.fd
, lkm
.state
);
330 } else if (ret
< 0) {
334 kconsumerd_change_fd_state(lkm
.fd
, lkm
.state
);
340 /* signal the poll thread */
341 tmp2
= write(ctx
->kconsumerd_poll_pipe
[1], "4", 1);
343 perror("write kconsumerd poll");
346 ERR("Didn't received any fd");
347 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_ERROR_RECV_FD
);
358 * Set the error socket.
360 void lttng_kconsumerd_set_error_sock(
361 struct lttng_kconsumerd_local_data
*ctx
, int sock
)
363 ctx
->kconsumerd_error_socket
= sock
;
367 * Set the command socket path.
370 void lttng_kconsumerd_set_command_sock_path(
371 struct lttng_kconsumerd_local_data
*ctx
, char *sock
)
373 ctx
->kconsumerd_command_sock_path
= sock
;
376 static void lttng_kconsumerd_sync_trace_file(
377 struct lttng_kconsumerd_fd
*kconsumerd_fd
, off_t orig_offset
)
379 int outfd
= kconsumerd_fd
->out_fd
;
381 * This does a blocking write-and-wait on any page that belongs to the
382 * subbuffer prior to the one we just wrote.
383 * Don't care about error values, as these are just hints and ways to
384 * limit the amount of page cache used.
386 if (orig_offset
>= kconsumerd_fd
->max_sb_size
) {
387 sync_file_range(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
388 kconsumerd_fd
->max_sb_size
,
389 SYNC_FILE_RANGE_WAIT_BEFORE
390 | SYNC_FILE_RANGE_WRITE
391 | SYNC_FILE_RANGE_WAIT_AFTER
);
393 * Give hints to the kernel about how we access the file:
394 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
397 * We need to call fadvise again after the file grows because the
398 * kernel does not seem to apply fadvise to non-existing parts of the
401 * Call fadvise _after_ having waited for the page writeback to
402 * complete because the dirty page writeback semantic is not well
403 * defined. So it can be expected to lead to lower throughput in
406 posix_fadvise(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
407 kconsumerd_fd
->max_sb_size
, POSIX_FADV_DONTNEED
);
413 * Mmap the ring buffer, read it and write the data to the tracefile.
415 * Returns the number of bytes written
417 int lttng_kconsumerd_on_read_subbuffer_mmap(
418 struct lttng_kconsumerd_local_data
*ctx
,
419 struct lttng_kconsumerd_fd
*kconsumerd_fd
, unsigned long len
)
421 unsigned long mmap_offset
;
423 off_t orig_offset
= kconsumerd_fd
->out_fd_offset
;
424 int fd
= kconsumerd_fd
->consumerd_fd
;
425 int outfd
= kconsumerd_fd
->out_fd
;
427 /* get the offset inside the fd to mmap */
428 ret
= kernctl_get_mmap_read_offset(fd
, &mmap_offset
);
431 perror("kernctl_get_mmap_read_offset");
436 ret
= write(outfd
, kconsumerd_fd
->mmap_base
+ mmap_offset
, len
);
439 } else if (ret
< 0) {
441 perror("Error in file write");
444 /* This won't block, but will start writeout asynchronously */
445 sync_file_range(outfd
, kconsumerd_fd
->out_fd_offset
, ret
,
446 SYNC_FILE_RANGE_WRITE
);
447 kconsumerd_fd
->out_fd_offset
+= ret
;
450 lttng_kconsumerd_sync_trace_file(kconsumerd_fd
, orig_offset
);
459 * Splice the data from the ring buffer to the tracefile.
461 * Returns the number of bytes spliced.
463 int lttng_kconsumerd_on_read_subbuffer_splice(
464 struct lttng_kconsumerd_local_data
*ctx
,
465 struct lttng_kconsumerd_fd
*kconsumerd_fd
, unsigned long len
)
469 off_t orig_offset
= kconsumerd_fd
->out_fd_offset
;
470 int fd
= kconsumerd_fd
->consumerd_fd
;
471 int outfd
= kconsumerd_fd
->out_fd
;
474 DBG("splice chan to pipe offset %lu (fd : %d)",
475 (unsigned long)offset
, fd
);
476 ret
= splice(fd
, &offset
, ctx
->kconsumerd_thread_pipe
[1], NULL
, len
,
477 SPLICE_F_MOVE
| SPLICE_F_MORE
);
478 DBG("splice chan to pipe ret %ld", ret
);
481 perror("Error in relay splice");
485 ret
= splice(ctx
->kconsumerd_thread_pipe
[0], NULL
, outfd
, NULL
, ret
,
486 SPLICE_F_MOVE
| SPLICE_F_MORE
);
487 DBG("splice pipe to file %ld", ret
);
490 perror("Error in file splice");
494 /* This won't block, but will start writeout asynchronously */
495 sync_file_range(outfd
, kconsumerd_fd
->out_fd_offset
, ret
,
496 SYNC_FILE_RANGE_WRITE
);
497 kconsumerd_fd
->out_fd_offset
+= ret
;
499 lttng_kconsumerd_sync_trace_file(kconsumerd_fd
, orig_offset
);
504 /* send the appropriate error description to sessiond */
507 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_SPLICE_EBADF
);
510 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_SPLICE_EINVAL
);
513 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_SPLICE_ENOMEM
);
516 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_SPLICE_ESPIPE
);
525 * Take a snapshot for a specific fd
527 * Returns 0 on success, < 0 on error
529 int lttng_kconsumerd_take_snapshot(struct lttng_kconsumerd_local_data
*ctx
,
530 struct lttng_kconsumerd_fd
*kconsumerd_fd
)
533 int infd
= kconsumerd_fd
->consumerd_fd
;
535 ret
= kernctl_snapshot(infd
);
538 perror("Getting sub-buffer snapshot.");
545 * Get the produced position
547 * Returns 0 on success, < 0 on error
549 int lttng_kconsumerd_get_produced_snapshot(
550 struct lttng_kconsumerd_local_data
*ctx
,
551 struct lttng_kconsumerd_fd
*kconsumerd_fd
,
555 int infd
= kconsumerd_fd
->consumerd_fd
;
557 ret
= kernctl_snapshot_get_produced(infd
, pos
);
560 perror("kernctl_snapshot_get_produced");
567 * Poll on the should_quit pipe and the command socket return -1 on error and
568 * should exit, 0 if data is available on the command socket
570 int lttng_kconsumerd_poll_socket(struct pollfd
*kconsumerd_sockpoll
)
574 num_rdy
= poll(kconsumerd_sockpoll
, 2, -1);
576 perror("Poll error");
579 if (kconsumerd_sockpoll
[0].revents
== POLLIN
) {
580 DBG("kconsumerd_should_quit wake up");
590 * This thread polls the fds in the ltt_fd_list to consume the data and write
591 * it to tracefile if necessary.
593 void *lttng_kconsumerd_thread_poll_fds(void *data
)
595 int num_rdy
, num_hup
, high_prio
, ret
, i
;
596 struct pollfd
*pollfd
= NULL
;
597 /* local view of the fds */
598 struct lttng_kconsumerd_fd
**local_kconsumerd_fd
= NULL
;
599 /* local view of kconsumerd_data.fds_count */
603 struct lttng_kconsumerd_local_data
*ctx
= data
;
606 local_kconsumerd_fd
= malloc(sizeof(struct lttng_kconsumerd_fd
));
613 * the ltt_fd_list has been updated, we need to update our
614 * local array as well
616 pthread_mutex_lock(&kconsumerd_data
.lock
);
617 if (kconsumerd_data
.need_update
) {
618 if (pollfd
!= NULL
) {
622 if (local_kconsumerd_fd
!= NULL
) {
623 free(local_kconsumerd_fd
);
624 local_kconsumerd_fd
= NULL
;
627 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
628 pollfd
= malloc((kconsumerd_data
.fds_count
+ 1) * sizeof(struct pollfd
));
629 if (pollfd
== NULL
) {
630 perror("pollfd malloc");
631 pthread_mutex_unlock(&kconsumerd_data
.lock
);
635 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
636 local_kconsumerd_fd
= malloc((kconsumerd_data
.fds_count
+ 1) *
637 sizeof(struct lttng_kconsumerd_fd
));
638 if (local_kconsumerd_fd
== NULL
) {
639 perror("local_kconsumerd_fd malloc");
640 pthread_mutex_unlock(&kconsumerd_data
.lock
);
643 ret
= kconsumerd_update_poll_array(ctx
, &pollfd
, local_kconsumerd_fd
);
645 ERR("Error in allocating pollfd or local_outfds");
646 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_POLL_ERROR
);
647 pthread_mutex_unlock(&kconsumerd_data
.lock
);
651 kconsumerd_data
.need_update
= 0;
653 pthread_mutex_unlock(&kconsumerd_data
.lock
);
655 /* poll on the array of fds */
656 DBG("polling on %d fd", nb_fd
+ 1);
657 num_rdy
= poll(pollfd
, nb_fd
+ 1, kconsumerd_poll_timeout
);
658 DBG("poll num_rdy : %d", num_rdy
);
660 perror("Poll error");
661 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_POLL_ERROR
);
663 } else if (num_rdy
== 0) {
664 DBG("Polling thread timed out");
668 /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */
669 if (nb_fd
== 0 && kconsumerd_quit
== 1) {
674 * If the kconsumerd_poll_pipe triggered poll go
675 * directly to the beginning of the loop to update the
676 * array. We want to prioritize array update over
677 * low-priority reads.
679 if (pollfd
[nb_fd
].revents
== POLLIN
) {
680 DBG("kconsumerd_poll_pipe wake up");
681 tmp2
= read(ctx
->kconsumerd_poll_pipe
[0], &tmp
, 1);
683 perror("read kconsumerd poll");
688 /* Take care of high priority channels first. */
689 for (i
= 0; i
< nb_fd
; i
++) {
690 switch(pollfd
[i
].revents
) {
692 ERR("Error returned in polling fd %d.", pollfd
[i
].fd
);
693 kconsumerd_del_fd(local_kconsumerd_fd
[i
]);
697 DBG("Polling fd %d tells it has hung up.", pollfd
[i
].fd
);
698 kconsumerd_del_fd(local_kconsumerd_fd
[i
]);
702 ERR("Polling fd %d tells fd is not open.", pollfd
[i
].fd
);
703 kconsumerd_del_fd(local_kconsumerd_fd
[i
]);
707 DBG("Urgent read on fd %d", pollfd
[i
].fd
);
709 ret
= ctx
->on_buffer_ready(local_kconsumerd_fd
[i
]);
710 /* it's ok to have an unavailable sub-buffer */
718 /* If every buffer FD has hung up, we end the read loop here */
719 if (nb_fd
> 0 && num_hup
== nb_fd
) {
720 DBG("every buffer FD has hung up\n");
721 if (kconsumerd_quit
== 1) {
727 /* Take care of low priority channels. */
728 if (high_prio
== 0) {
729 for (i
= 0; i
< nb_fd
; i
++) {
730 if (pollfd
[i
].revents
== POLLIN
) {
731 DBG("Normal read on fd %d", pollfd
[i
].fd
);
732 ret
= ctx
->on_buffer_ready(local_kconsumerd_fd
[i
]);
733 /* it's ok to have an unavailable subbuffer */
742 DBG("polling thread exiting");
743 if (pollfd
!= NULL
) {
747 if (local_kconsumerd_fd
!= NULL
) {
748 free(local_kconsumerd_fd
);
749 local_kconsumerd_fd
= NULL
;
755 * Initialise the necessary environnement :
756 * - create a new context
757 * - create the poll_pipe
758 * - create the should_quit pipe (for signal handler)
759 * - create the thread pipe (for splice)
761 * Takes a function pointer as argument, this function is called when data is
762 * available on a buffer. This function is responsible to do the
763 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
764 * buffer configuration and then kernctl_put_next_subbuf at the end.
766 * Returns a pointer to the new context or NULL on error.
768 struct lttng_kconsumerd_local_data
*lttng_kconsumerd_create(
769 int (*buffer_ready
)(struct lttng_kconsumerd_fd
*kconsumerd_fd
),
770 int (*recv_fd
)(struct lttng_kconsumerd_fd
*kconsumerd_fd
),
771 int (*update_fd
)(int sessiond_fd
, uint32_t state
))
774 struct lttng_kconsumerd_local_data
*ctx
;
776 ctx
= malloc(sizeof(struct lttng_kconsumerd_local_data
));
778 perror("allocating context");
782 ctx
->kconsumerd_error_socket
= -1;
783 /* assign the callbacks */
784 ctx
->on_buffer_ready
= buffer_ready
;
785 ctx
->on_recv_fd
= recv_fd
;
786 ctx
->on_update_fd
= update_fd
;
788 ret
= pipe(ctx
->kconsumerd_poll_pipe
);
790 perror("Error creating poll pipe");
791 goto error_poll_pipe
;
794 ret
= pipe(ctx
->kconsumerd_should_quit
);
796 perror("Error creating recv pipe");
797 goto error_quit_pipe
;
800 ret
= pipe(ctx
->kconsumerd_thread_pipe
);
802 perror("Error creating thread pipe");
803 goto error_thread_pipe
;
810 for (i
= 0; i
< 2; i
++) {
813 err
= close(ctx
->kconsumerd_should_quit
[i
]);
817 for (i
= 0; i
< 2; i
++) {
820 err
= close(ctx
->kconsumerd_poll_pipe
[i
]);
830 * Close all fds associated with the instance and free the context.
832 void lttng_kconsumerd_destroy(struct lttng_kconsumerd_local_data
*ctx
)
834 close(ctx
->kconsumerd_error_socket
);
835 close(ctx
->kconsumerd_thread_pipe
[0]);
836 close(ctx
->kconsumerd_thread_pipe
[1]);
837 close(ctx
->kconsumerd_poll_pipe
[0]);
838 close(ctx
->kconsumerd_poll_pipe
[1]);
839 close(ctx
->kconsumerd_should_quit
[0]);
840 close(ctx
->kconsumerd_should_quit
[1]);
841 unlink(ctx
->kconsumerd_command_sock_path
);
847 * This thread listens on the consumerd socket and receives the file
848 * descriptors from the session daemon.
850 void *lttng_kconsumerd_thread_receive_fds(void *data
)
852 int sock
, client_socket
, ret
;
853 struct lttcomm_kconsumerd_header tmp
;
855 * structure to poll for incoming data on communication socket avoids
856 * making blocking sockets.
858 struct pollfd kconsumerd_sockpoll
[2];
859 struct lttng_kconsumerd_local_data
*ctx
= data
;
862 DBG("Creating command socket %s", ctx
->kconsumerd_command_sock_path
);
863 unlink(ctx
->kconsumerd_command_sock_path
);
864 client_socket
= lttcomm_create_unix_sock(ctx
->kconsumerd_command_sock_path
);
865 if (client_socket
< 0) {
866 ERR("Cannot create command socket");
870 ret
= lttcomm_listen_unix_sock(client_socket
);
875 DBG("Sending ready command to ltt-sessiond");
876 ret
= lttng_kconsumerd_send_error(ctx
, KCONSUMERD_COMMAND_SOCK_READY
);
877 /* return < 0 on error, but == 0 is not fatal */
879 ERR("Error sending ready command to ltt-sessiond");
883 ret
= fcntl(client_socket
, F_SETFL
, O_NONBLOCK
);
885 perror("fcntl O_NONBLOCK");
889 /* prepare the FDs to poll : to client socket and the should_quit pipe */
890 kconsumerd_sockpoll
[0].fd
= ctx
->kconsumerd_should_quit
[0];
891 kconsumerd_sockpoll
[0].events
= POLLIN
| POLLPRI
;
892 kconsumerd_sockpoll
[1].fd
= client_socket
;
893 kconsumerd_sockpoll
[1].events
= POLLIN
| POLLPRI
;
895 if (lttng_kconsumerd_poll_socket(kconsumerd_sockpoll
) < 0) {
898 DBG("Connection on client_socket");
900 /* Blocking call, waiting for transmission */
901 sock
= lttcomm_accept_unix_sock(client_socket
);
906 ret
= fcntl(sock
, F_SETFL
, O_NONBLOCK
);
908 perror("fcntl O_NONBLOCK");
912 /* update the polling structure to poll on the established socket */
913 kconsumerd_sockpoll
[1].fd
= sock
;
914 kconsumerd_sockpoll
[1].events
= POLLIN
| POLLPRI
;
917 if (lttng_kconsumerd_poll_socket(kconsumerd_sockpoll
) < 0) {
920 DBG("Incoming fds on sock");
922 /* We first get the number of fd we are about to receive */
923 ret
= lttcomm_recv_unix_sock(sock
, &tmp
,
924 sizeof(struct lttcomm_kconsumerd_header
));
926 ERR("Communication interrupted on command socket");
929 if (tmp
.cmd_type
== STOP
) {
930 DBG("Received STOP command");
933 if (kconsumerd_quit
) {
934 DBG("kconsumerd_thread_receive_fds received quit from signal");
938 /* we received a command to add or update fds */
939 ret
= kconsumerd_consumerd_recv_fd(ctx
, sock
, kconsumerd_sockpoll
,
940 tmp
.payload_size
, tmp
.cmd_type
);
942 ERR("Receiving the FD, exiting");
945 DBG("received fds on sock");
949 DBG("kconsumerd_thread_receive_fds exiting");
952 * when all fds have hung up, the polling thread
958 * 2s of grace period, if no polling events occur during
959 * this period, the polling thread will exit even if there
960 * are still open FDs (should not happen, but safety mechanism).
962 kconsumerd_poll_timeout
= LTTNG_KCONSUMERD_POLL_GRACE_PERIOD
;
964 /* wake up the polling thread */
965 ret
= write(ctx
->kconsumerd_poll_pipe
[1], "4", 1);
967 perror("poll pipe write");
973 * Close all the tracefiles and stream fds, should be called when all instances
976 void lttng_kconsumerd_cleanup(void)
978 struct lttng_kconsumerd_fd
*iter
, *tmp
;
981 * close all outfd. Called when there are no more threads
982 * running (after joining on the threads), no need to protect
983 * list iteration with mutex.
985 cds_list_for_each_entry_safe(iter
, tmp
,
986 &kconsumerd_data
.fd_list
.head
, list
) {
987 kconsumerd_del_fd(iter
);
992 * Called from signal handler.
994 void lttng_kconsumerd_should_exit(struct lttng_kconsumerd_local_data
*ctx
)
998 ret
= write(ctx
->kconsumerd_should_quit
[1], "4", 1);
1000 perror("write kconsumerd quit");
1005 * Send return code to the session daemon.
1006 * If the socket is not defined, we return 0, it is not a fatal error
1008 int lttng_kconsumerd_send_error(
1009 struct lttng_kconsumerd_local_data
*ctx
, int cmd
)
1011 if (ctx
->kconsumerd_error_socket
> 0) {
1012 return lttcomm_send_unix_sock(ctx
->kconsumerd_error_socket
, &cmd
,
1013 sizeof(enum lttcomm_sessiond_command
));