default channel selection cleanup
[lttng-tools.git] / kconsumerd / kconsumerd.c
CommitLineData
d4a1283e
JD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20#define _GNU_SOURCE
21#include <fcntl.h>
22#include <getopt.h>
23#include <grp.h>
24#include <limits.h>
25#include <pthread.h>
26#include <signal.h>
27#include <stdio.h>
28#include <stdlib.h>
29#include <string.h>
30#include <sys/ipc.h>
31#include <sys/shm.h>
32#include <sys/socket.h>
33#include <sys/stat.h>
34#include <sys/types.h>
35#include <urcu/list.h>
36#include <poll.h>
37#include <unistd.h>
03424a9b 38#include <sys/mman.h>
d4a1283e
JD
39
40#include "lttngerr.h"
41#include "libkernelctl.h"
42#include "liblttsessiondcomm.h"
43#include "kconsumerd.h"
44
45/* Init the list of FDs */
46static struct ltt_kconsumerd_fd_list kconsumerd_fd_list = {
47 .head = CDS_LIST_HEAD_INIT(kconsumerd_fd_list.head),
48};
49
50/* Number of element for the list below. */
51static unsigned int fds_count;
52
53/* If the local array of FDs needs update in the poll function */
54static unsigned int update_fd_array = 1;
55
56/* lock the fd array and structures */
57static pthread_mutex_t kconsumerd_lock_fds;
58
59/* the two threads (receive fd and poll) */
60static pthread_t threads[2];
61
62/* communication with splice */
63static int thread_pipe[2];
64
252fd492
JD
65/* pipe to wake the poll thread when necessary */
66static int poll_pipe[2];
67
d4a1283e
JD
68/* socket to communicate errors with sessiond */
69static int error_socket = -1;
70
13e44745
JD
71/* to count the number of time the user pressed ctrl+c */
72static int sigintcount = 0;
73
9d26659a
JD
74/* flag to inform the polling thread to quit when all fd hung up */
75static int quit = 0;
76
d4a1283e
JD
77/* Argument variables */
78int opt_quiet;
79int opt_verbose;
80static int opt_daemon;
81static const char *progname;
82static char command_sock_path[PATH_MAX]; /* Global command socket path */
83static char error_sock_path[PATH_MAX]; /* Global error path */
84
bcd8d9db
JD
85/*
86 * del_fd
87 *
88 * Remove a fd from the global list protected by a mutex
89 */
90static void del_fd(struct ltt_kconsumerd_fd *lcf)
91{
6aea26bc 92 DBG("Removing %d", lcf->consumerd_fd);
bcd8d9db
JD
93 pthread_mutex_lock(&kconsumerd_lock_fds);
94 cds_list_del(&lcf->list);
95 if (fds_count > 0) {
96 fds_count--;
97 DBG("Removed ltt_kconsumerd_fd");
98 if (lcf != NULL) {
99 close(lcf->out_fd);
100 close(lcf->consumerd_fd);
101 free(lcf);
102 lcf = NULL;
103 }
104 }
105 pthread_mutex_unlock(&kconsumerd_lock_fds);
106}
107
d4a1283e
JD
108/*
109 * cleanup
110 *
111 * Cleanup the daemon's socket on exit
112 */
113static void cleanup()
114{
bcd8d9db
JD
115 struct ltt_kconsumerd_fd *iter;
116
bcd8d9db 117 /* remove the socket file */
d4a1283e 118 unlink(command_sock_path);
bcd8d9db
JD
119
120 /* unblock the threads */
121 WARN("Terminating the threads before exiting");
122 pthread_cancel(threads[0]);
123 pthread_cancel(threads[1]);
124
125 /* close all outfd */
126 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
127 del_fd(iter);
128 }
d4a1283e
JD
129}
130
6aea26bc
JD
131/*
132 * send_error
d4a1283e
JD
133 *
134 * send return code to ltt-sessiond
135 */
136static int send_error(enum lttcomm_return_code cmd)
137{
138 if (error_socket > 0) {
139 return lttcomm_send_unix_sock(error_socket, &cmd,
140 sizeof(enum lttcomm_sessiond_command));
141 } else {
142 return 0;
143 }
144}
145
d4a1283e
JD
146/*
147 * add_fd
148 *
149 * Add a fd to the global list protected by a mutex
150 */
151static int add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd)
152{
153 struct ltt_kconsumerd_fd *tmp_fd;
154 int ret;
155
156 tmp_fd = malloc(sizeof(struct ltt_kconsumerd_fd));
157 tmp_fd->sessiond_fd = buf->fd;
158 tmp_fd->consumerd_fd = consumerd_fd;
159 tmp_fd->state = buf->state;
160 tmp_fd->max_sb_size = buf->max_sb_size;
161 strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
162
163 /* Opening the tracefile in write mode */
164 DBG("Opening %s for writing", tmp_fd->path_name);
165 ret = open(tmp_fd->path_name,
46258765 166 O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
d4a1283e
JD
167 if (ret < 0) {
168 ERR("Opening %s", tmp_fd->path_name);
169 perror("open");
170 goto end;
171 }
172 tmp_fd->out_fd = ret;
173 tmp_fd->out_fd_offset = 0;
174
175 DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
176 tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
177
178 pthread_mutex_lock(&kconsumerd_lock_fds);
179 cds_list_add(&tmp_fd->list, &kconsumerd_fd_list.head);
180 fds_count++;
181 pthread_mutex_unlock(&kconsumerd_lock_fds);
182
183end:
184 return ret;
185}
186
d4a1283e
JD
187
188/*
189 * sighandler
190 *
191 * Signal handler for the daemon
192 */
193static void sighandler(int sig)
194{
13e44745
JD
195 if (sig == SIGINT && sigintcount++ == 0) {
196 DBG("ignoring first SIGINT");
197 return;
198 }
199
d4a1283e
JD
200 cleanup();
201
202 return;
203}
204
205/*
206 * set_signal_handler
207 *
208 * Setup signal handler for :
209 * SIGINT, SIGTERM, SIGPIPE
210 */
211static int set_signal_handler(void)
212{
213 int ret = 0;
214 struct sigaction sa;
215 sigset_t sigset;
216
217 if ((ret = sigemptyset(&sigset)) < 0) {
218 perror("sigemptyset");
219 return ret;
220 }
221
222 sa.sa_handler = sighandler;
223 sa.sa_mask = sigset;
224 sa.sa_flags = 0;
225 if ((ret = sigaction(SIGTERM, &sa, NULL)) < 0) {
226 perror("sigaction");
227 return ret;
228 }
229
230 if ((ret = sigaction(SIGINT, &sa, NULL)) < 0) {
231 perror("sigaction");
232 return ret;
233 }
234
235 if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) {
236 perror("sigaction");
237 return ret;
238 }
239
240 return ret;
241}
242
03424a9b
JD
243/*
244 * on_read_subbuffer_mmap
245 *
246 * mmap the ring buffer, read it and write the data to the tracefile.
247 * Returns the number of bytes written
248 */
249static int on_read_subbuffer_mmap(struct ltt_kconsumerd_fd *kconsumerd_fd,
250 unsigned long len)
251{
252 unsigned long mmap_len;
253 unsigned long mmap_offset;
254 unsigned long padded_len;
255 unsigned long padding_len;
256 char *mmap_base;
257 char *padding = NULL;
258 long ret = 0;
259 off_t orig_offset = kconsumerd_fd->out_fd_offset;
260 int fd = kconsumerd_fd->consumerd_fd;
261 int outfd = kconsumerd_fd->out_fd;
262
263 /* get the padded subbuffer size to know the padding required */
264 ret = kernctl_get_padded_subbuf_size(fd, &padded_len);
265 if (ret != 0) {
266 ret = errno;
267 perror("kernctl_get_padded_subbuf_size");
268 goto end;
269 }
270 padding_len = padded_len - len;
271 padding = malloc(padding_len * sizeof(char));
272 memset(padding, '\0', padding_len);
273
274 /* get the len of the mmap region */
275 ret = kernctl_get_mmap_len(fd, &mmap_len);
276 if (ret != 0) {
277 ret = errno;
278 perror("kernctl_get_mmap_len");
279 goto end;
280 }
281
282 /* get the offset inside the fd to mmap */
283 ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
284 if (ret != 0) {
285 ret = errno;
286 perror("kernctl_get_mmap_read_offset");
287 goto end;
288 }
289
290 mmap_base = mmap(NULL, mmap_len, PROT_READ, MAP_PRIVATE, fd, mmap_offset);
291 if (mmap_base == MAP_FAILED) {
292 perror("Error mmaping");
293 ret = -1;
294 goto end;
295 }
296
297 while (len > 0) {
298 ret = write(outfd, mmap_base, len);
299 if (ret >= len) {
300 len = 0;
301 } else if (ret < 0) {
302 ret = errno;
303 perror("Error in file write");
304 goto end;
305 }
306 /* This won't block, but will start writeout asynchronously */
307 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
308 SYNC_FILE_RANGE_WRITE);
309 kconsumerd_fd->out_fd_offset += ret;
310 }
311
312 /* once all the data is written, write the padding to disk */
313 ret = write(outfd, padding, padding_len);
314 if (ret < 0) {
315 ret = errno;
316 perror("Error writing padding to file");
317 goto end;
318 }
319
320 /*
321 * This does a blocking write-and-wait on any page that belongs to the
322 * subbuffer prior to the one we just wrote.
323 * Don't care about error values, as these are just hints and ways to
324 * limit the amount of page cache used.
325 */
326 if (orig_offset >= kconsumerd_fd->max_sb_size) {
327 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
328 kconsumerd_fd->max_sb_size,
329 SYNC_FILE_RANGE_WAIT_BEFORE
330 | SYNC_FILE_RANGE_WRITE
331 | SYNC_FILE_RANGE_WAIT_AFTER);
332 /*
333 * Give hints to the kernel about how we access the file:
334 * POSIX_FADV_DONTNEED : we won't re-access data in a near
335 * future after we write it.
336 * We need to call fadvise again after the file grows because
337 * the kernel does not seem to apply fadvise to non-existing
338 * parts of the file.
339 * Call fadvise _after_ having waited for the page writeback to
340 * complete because the dirty page writeback semantic is not
341 * well defined. So it can be expected to lead to lower
342 * throughput in streaming.
343 */
344 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
345 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
346 }
347 goto end;
348
349end:
350 if (padding != NULL) {
351 free(padding);
352 }
353 return ret;
354}
355
d4a1283e
JD
356/*
357 * on_read_subbuffer
358 *
359 * Splice the data from the ring buffer to the tracefile.
360 * Returns the number of bytes spliced
361 */
362static int on_read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd,
363 unsigned long len)
364{
365 long ret = 0;
366 loff_t offset = 0;
367 off_t orig_offset = kconsumerd_fd->out_fd_offset;
368 int fd = kconsumerd_fd->consumerd_fd;
369 int outfd = kconsumerd_fd->out_fd;
370
371 while (len > 0) {
372 DBG("splice chan to pipe offset %lu (fd : %d)",
373 (unsigned long)offset, fd);
374 ret = splice(fd, &offset, thread_pipe[1], NULL, len,
375 SPLICE_F_MOVE | SPLICE_F_MORE);
376 DBG("splice chan to pipe ret %ld", ret);
377 if (ret < 0) {
0632499a 378 ret = errno;
d4a1283e 379 perror("Error in relay splice");
0632499a 380 goto splice_error;
d4a1283e
JD
381 }
382
383 ret = splice(thread_pipe[0], NULL, outfd, NULL, ret,
384 SPLICE_F_MOVE | SPLICE_F_MORE);
385 DBG("splice pipe to file %ld", ret);
386 if (ret < 0) {
0632499a 387 ret = errno;
d4a1283e 388 perror("Error in file splice");
0632499a 389 goto splice_error;
d4a1283e
JD
390 }
391 if (ret >= len) {
392 len = 0;
393 }
394 /* This won't block, but will start writeout asynchronously */
395 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
396 SYNC_FILE_RANGE_WRITE);
397 kconsumerd_fd->out_fd_offset += ret;
398 }
0632499a 399
d4a1283e
JD
400 /*
401 * This does a blocking write-and-wait on any page that belongs to the
402 * subbuffer prior to the one we just wrote.
403 * Don't care about error values, as these are just hints and ways to
404 * limit the amount of page cache used.
405 */
406 if (orig_offset >= kconsumerd_fd->max_sb_size) {
407 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
408 kconsumerd_fd->max_sb_size,
409 SYNC_FILE_RANGE_WAIT_BEFORE
410 | SYNC_FILE_RANGE_WRITE
411 | SYNC_FILE_RANGE_WAIT_AFTER);
412 /*
413 * Give hints to the kernel about how we access the file:
414 * POSIX_FADV_DONTNEED : we won't re-access data in a near
415 * future after we write it.
416 * We need to call fadvise again after the file grows because
417 * the kernel does not seem to apply fadvise to non-existing
418 * parts of the file.
419 * Call fadvise _after_ having waited for the page writeback to
420 * complete because the dirty page writeback semantic is not
421 * well defined. So it can be expected to lead to lower
422 * throughput in streaming.
423 */
424 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
425 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
426 }
0632499a
JD
427 goto end;
428
429splice_error:
430 /* send the appropriate error description to sessiond */
431 switch(ret) {
914a571b
JD
432 case EBADF:
433 send_error(KCONSUMERD_SPLICE_EBADF);
434 break;
435 case EINVAL:
436 send_error(KCONSUMERD_SPLICE_EINVAL);
437 break;
438 case ENOMEM:
439 send_error(KCONSUMERD_SPLICE_ENOMEM);
440 break;
441 case ESPIPE:
442 send_error(KCONSUMERD_SPLICE_ESPIPE);
443 break;
0632499a
JD
444 }
445
446end:
d4a1283e
JD
447 return ret;
448}
449
450/*
451 * read_subbuffer
452 *
453 * Consume data on a file descriptor and write it on a trace file
454 */
455static int read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd)
456{
457 unsigned long len;
458 int err;
459 long ret = 0;
460 int infd = kconsumerd_fd->consumerd_fd;
461
6aea26bc 462 DBG("In read_subbuffer (infd : %d)", infd);
d4a1283e
JD
463 /* Get the next subbuffer */
464 err = kernctl_get_next_subbuf(infd);
465 if (err != 0) {
466 ret = errno;
467 perror("Reserving sub buffer failed (everything is normal, "
468 "it is due to concurrency)");
469 goto end;
470 }
471
7d452e12
MD
472 switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
473 case LTTNG_KERNEL_SPLICE:
03424a9b
JD
474 /* read the whole subbuffer */
475 err = kernctl_get_padded_subbuf_size(infd, &len);
476 if (err != 0) {
477 ret = errno;
478 perror("Getting sub-buffer len failed.");
479 goto end;
480 }
d4a1283e 481
03424a9b
JD
482 /* splice the subbuffer to the tracefile */
483 ret = on_read_subbuffer(kconsumerd_fd, len);
484 if (ret < 0) {
485 /*
486 * display the error but continue processing to try
487 * to release the subbuffer
488 */
489 ERR("Error splicing to tracefile");
490 }
7d452e12
MD
491 break;
492 case LTTNG_KERNEL_MMAP:
03424a9b
JD
493 /* read the used subbuffer size */
494 err = kernctl_get_subbuf_size(infd, &len);
495 if (err != 0) {
496 ret = errno;
497 perror("Getting sub-buffer len failed.");
498 goto end;
499 }
500
501 /* write the subbuffer to the tracefile */
502 ret = on_read_subbuffer_mmap(kconsumerd_fd, len);
503 if (ret < 0) {
504 /*
505 * display the error but continue processing to try
506 * to release the subbuffer
507 */
508 ERR("Error writing to tracefile");
509 }
7d452e12
MD
510 break;
511 default:
03424a9b
JD
512 ERR("Unknown output method");
513 ret = -1;
d4a1283e
JD
514 }
515
516 err = kernctl_put_next_subbuf(infd);
517 if (err != 0) {
518 ret = errno;
519 if (errno == EFAULT) {
520 perror("Error in unreserving sub buffer\n");
521 } else if (errno == EIO) {
522 /* Should never happen with newer LTTng versions */
523 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
524 }
525 goto end;
526 }
527
528end:
529 return ret;
530}
531
532/*
533 * change_fd_state
534 *
535 * Update a fd according to what we just received
536 */
537static void change_fd_state(int sessiond_fd,
5dc18550 538 enum kconsumerd_fd_state state)
d4a1283e
JD
539{
540 struct ltt_kconsumerd_fd *iter;
541 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
542 if (iter->sessiond_fd == sessiond_fd) {
543 iter->state = state;
544 break;
545 }
546 }
547}
548
549/*
550 * consumerd_recv_fd
551 *
552 * Receives an array of file descriptors and the associated
553 * structures describing each fd (path name).
554 * Returns the size of received data
555 */
556static int consumerd_recv_fd(int sfd, int size,
5dc18550 557 enum kconsumerd_command cmd_type)
d4a1283e
JD
558{
559 struct msghdr msg;
560 struct iovec iov[1];
33a2b854 561 int ret = 0, i, tmp2;
d4a1283e
JD
562 struct cmsghdr *cmsg;
563 int nb_fd;
33a2b854
DG
564 char recv_fd[CMSG_SPACE(sizeof(int))];
565 struct lttcomm_kconsumerd_msg lkm;
566
d4a1283e 567 /* the number of fds we are about to receive */
33a2b854 568 nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
d4a1283e 569
33a2b854
DG
570 for (i = 0; i < nb_fd; i++) {
571 memset(&msg, 0, sizeof(msg));
d4a1283e 572
33a2b854
DG
573 /* Prepare to receive the structures */
574 iov[0].iov_base = &lkm;
575 iov[0].iov_len = sizeof(lkm);
576 msg.msg_iov = iov;
577 msg.msg_iovlen = 1;
d4a1283e 578
33a2b854
DG
579 msg.msg_control = recv_fd;
580 msg.msg_controllen = sizeof(recv_fd);
d4a1283e 581
33a2b854
DG
582 DBG("Waiting to receive fd");
583 if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
584 perror("recvmsg");
585 continue;
586 }
d4a1283e 587
33a2b854
DG
588 if (ret != (size / nb_fd)) {
589 ERR("Received only %d, expected %d", ret, size);
590 send_error(KCONSUMERD_ERROR_RECV_FD);
591 goto end;
592 }
d4a1283e 593
33a2b854
DG
594 cmsg = CMSG_FIRSTHDR(&msg);
595 if (!cmsg) {
596 ERR("Invalid control message header");
597 ret = -1;
598 send_error(KCONSUMERD_ERROR_RECV_FD);
599 goto end;
600 }
d4a1283e 601
33a2b854
DG
602 /* if we received fds */
603 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
d4a1283e 604 switch (cmd_type) {
5dc18550 605 case ADD_STREAM:
33a2b854
DG
606 DBG("add_fd %s (%d)", lkm.path_name, (CMSG_DATA(cmsg)[0]));
607 ret = add_fd(&lkm, (CMSG_DATA(cmsg)[0]));
914a571b
JD
608 if (ret < 0) {
609 send_error(KCONSUMERD_OUTFD_ERROR);
610 goto end;
611 }
612 break;
5dc18550 613 case UPDATE_STREAM:
33a2b854 614 change_fd_state(lkm.fd, lkm.state);
914a571b
JD
615 break;
616 default:
617 break;
d4a1283e 618 }
33a2b854
DG
619 /* flag to tell the polling thread to update its fd array */
620 update_fd_array = 1;
621 /* signal the poll thread */
622 tmp2 = write(poll_pipe[1], "4", 1);
623 } else {
624 ERR("Didn't received any fd");
625 send_error(KCONSUMERD_ERROR_RECV_FD);
626 ret = -1;
627 goto end;
d4a1283e 628 }
d4a1283e
JD
629 }
630
631end:
9d26659a 632 DBG("consumerd_recv_fd thread exiting");
d4a1283e
JD
633 return ret;
634}
635
636/*
637 * thread_receive_fds
638 *
639 * This thread listens on the consumerd socket and
640 * receives the file descriptors from ltt-sessiond
641 */
642static void *thread_receive_fds(void *data)
643{
644 int sock, client_socket, ret;
645 struct lttcomm_kconsumerd_header tmp;
646
647 DBG("Creating command socket %s", command_sock_path);
648 unlink(command_sock_path);
649 client_socket = lttcomm_create_unix_sock(command_sock_path);
650 if (client_socket < 0) {
651 ERR("Cannot create command socket");
9d26659a 652 goto end;
d4a1283e
JD
653 }
654
655 ret = lttcomm_listen_unix_sock(client_socket);
656 if (ret < 0) {
9d26659a 657 goto end;
d4a1283e
JD
658 }
659
660 DBG("Sending ready command to ltt-sessiond");
661 ret = send_error(KCONSUMERD_COMMAND_SOCK_READY);
662 if (ret < 0) {
663 ERR("Error sending ready command to ltt-sessiond");
9d26659a 664 goto end;
d4a1283e
JD
665 }
666
7e8c38c6
JD
667 /* Blocking call, waiting for transmission */
668 sock = lttcomm_accept_unix_sock(client_socket);
669 if (sock <= 0) {
4abc0780 670 WARN("On accept");
9d26659a 671 goto end;
7e8c38c6 672 }
d4a1283e 673 while (1) {
d4a1283e
JD
674 /* We first get the number of fd we are about to receive */
675 ret = lttcomm_recv_unix_sock(sock, &tmp,
676 sizeof(struct lttcomm_kconsumerd_header));
6aea26bc 677 if (ret <= 0) {
9d26659a
JD
678 ERR("Communication interrupted on command socket");
679 goto end;
680 }
681 if (tmp.cmd_type == STOP) {
682 DBG("Received STOP command");
9d26659a 683 goto end;
d4a1283e 684 }
9d26659a 685 /* we received a command to add or update fds */
d4a1283e 686 ret = consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
6aea26bc 687 if (ret <= 0) {
bcd8d9db 688 ERR("Receiving the FD, exiting");
9d26659a 689 goto end;
d4a1283e
JD
690 }
691 }
692
9d26659a
JD
693end:
694 DBG("thread_receive_fds exiting");
d13606b9
DG
695 quit = 1;
696 ret = write(poll_pipe[1], "4", 1);
697 if (ret < 0) {
698 perror("poll pipe write");
699 }
d4a1283e
JD
700 return NULL;
701}
702
703/*
704 * update_poll_array
705 *
706 * Allocate the pollfd structure and the local view of the out fds
707 * to avoid doing a lookup in the linked list and concurrency issues
708 * when writing is needed.
709 * Returns the number of fds in the structures
710 */
711static int update_poll_array(struct pollfd **pollfd,
712 struct ltt_kconsumerd_fd **local_kconsumerd_fd)
713{
714 struct ltt_kconsumerd_fd *iter;
715 int i = 0;
716
d4a1283e
JD
717
718 DBG("Updating poll fd array");
719 pthread_mutex_lock(&kconsumerd_lock_fds);
720
721 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
722 DBG("Inside for each");
723 if (iter->state == ACTIVE_FD) {
724 DBG("Active FD %d", iter->consumerd_fd);
1b686c3f
JD
725 (*pollfd)[i].fd = iter->consumerd_fd;
726 (*pollfd)[i].events = POLLIN | POLLPRI;
d4a1283e
JD
727 local_kconsumerd_fd[i] = iter;
728 i++;
d4a1283e
JD
729 }
730 }
252fd492
JD
731 /*
732 * insert the poll_pipe at the end of the array and don't increment i
733 * so nb_fd is the number of real FD
734 */
735 (*pollfd)[i].fd = poll_pipe[0];
736 (*pollfd)[i].events = POLLIN;
737
d4a1283e
JD
738 update_fd_array = 0;
739 pthread_mutex_unlock(&kconsumerd_lock_fds);
740 return i;
741
d4a1283e
JD
742}
743
744/*
745 * thread_poll_fds
746 *
747 * This thread polls the fds in the ltt_fd_list to consume the data
748 * and write it to tracefile if necessary.
749 */
750static void *thread_poll_fds(void *data)
751{
752 int num_rdy, num_hup, high_prio, ret, i;
753 struct pollfd *pollfd = NULL;
754 /* local view of the fds */
6aea26bc 755 struct ltt_kconsumerd_fd **local_kconsumerd_fd = NULL;
d4a1283e
JD
756 /* local view of fds_count */
757 int nb_fd = 0;
252fd492
JD
758 char tmp;
759 int tmp2;
d4a1283e
JD
760
761 ret = pipe(thread_pipe);
762 if (ret < 0) {
763 perror("Error creating pipe");
764 goto end;
765 }
766
6aea26bc
JD
767 local_kconsumerd_fd = malloc(sizeof(struct ltt_kconsumerd_fd));
768
d4a1283e
JD
769 while (1) {
770 high_prio = 0;
771 num_hup = 0;
772
773 /*
774 * the ltt_fd_list has been updated, we need to update our
775 * local array as well
776 */
914a571b 777 if (update_fd_array == 1) {
4abc0780
JD
778 if (pollfd != NULL) {
779 free(pollfd);
780 pollfd = NULL;
781 }
782 if (local_kconsumerd_fd != NULL) {
783 free(local_kconsumerd_fd);
784 local_kconsumerd_fd = NULL;
785 }
786 /* allocate for all fds + 1 for the poll_pipe */
787 pollfd = malloc((fds_count + 1) * sizeof(struct pollfd));
788 if (pollfd == NULL) {
789 perror("pollfd malloc");
790 goto end;
791 }
792 /* allocate for all fds + 1 for the poll_pipe */
793 local_kconsumerd_fd = malloc((fds_count + 1) * sizeof(struct ltt_kconsumerd_fd));
794 if (local_kconsumerd_fd == NULL) {
795 perror("local_kconsumerd_fd malloc");
796 goto end;
797 }
798
6aea26bc 799 ret = update_poll_array(&pollfd, local_kconsumerd_fd);
d4a1283e
JD
800 if (ret < 0) {
801 ERR("Error in allocating pollfd or local_outfds");
802 send_error(KCONSUMERD_POLL_ERROR);
803 goto end;
804 }
805 nb_fd = ret;
806 }
807
808 /* poll on the array of fds */
252fd492
JD
809 DBG("polling on %d fd", nb_fd + 1);
810 num_rdy = poll(pollfd, nb_fd + 1, -1);
d4a1283e
JD
811 DBG("poll num_rdy : %d", num_rdy);
812 if (num_rdy == -1) {
813 perror("Poll error");
814 send_error(KCONSUMERD_POLL_ERROR);
815 goto end;
816 }
817
d13606b9
DG
818 /* No FDs and quit, cleanup the thread */
819 if (nb_fd == 0 && quit == 1) {
820 goto end;
821 }
822
252fd492
JD
823 /*
824 * if only the poll_pipe triggered poll to return just return to the
825 * beginning of the loop to update the array
826 */
827 if (num_rdy == 1 && pollfd[nb_fd].revents == POLLIN) {
828 DBG("poll_pipe wake up");
829 tmp2 = read(poll_pipe[0], &tmp, 1);
830 continue;
831 }
832
d4a1283e
JD
833 /* Take care of high priority channels first. */
834 for (i = 0; i < nb_fd; i++) {
835 switch(pollfd[i].revents) {
914a571b
JD
836 case POLLERR:
837 ERR("Error returned in polling fd %d.", pollfd[i].fd);
9d26659a
JD
838 del_fd(local_kconsumerd_fd[i]);
839 update_fd_array = 1;
914a571b 840 num_hup++;
914a571b
JD
841 break;
842 case POLLHUP:
843 ERR("Polling fd %d tells it has hung up.", pollfd[i].fd);
9d26659a
JD
844 del_fd(local_kconsumerd_fd[i]);
845 update_fd_array = 1;
914a571b
JD
846 num_hup++;
847 break;
848 case POLLNVAL:
849 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
9d26659a
JD
850 del_fd(local_kconsumerd_fd[i]);
851 update_fd_array = 1;
914a571b
JD
852 num_hup++;
853 break;
854 case POLLPRI:
855 DBG("Urgent read on fd %d", pollfd[i].fd);
856 high_prio = 1;
857 ret = read_subbuffer(local_kconsumerd_fd[i]);
858 /* it's ok to have an unavailable sub-buffer (FIXME : is it ?) */
859 if (ret == EAGAIN) {
860 ret = 0;
861 }
862 break;
d4a1283e
JD
863 }
864 }
865
866 /* If every buffer FD has hung up, we end the read loop here */
867 if (nb_fd > 0 && num_hup == nb_fd) {
868 DBG("every buffer FD has hung up\n");
9d26659a
JD
869 if (quit == 1) {
870 goto end;
871 }
872 continue;
d4a1283e
JD
873 }
874
875 /* Take care of low priority channels. */
914a571b 876 if (high_prio == 0) {
d4a1283e 877 for (i = 0; i < nb_fd; i++) {
914a571b
JD
878 if (pollfd[i].revents == POLLIN) {
879 DBG("Normal read on fd %d", pollfd[i].fd);
880 ret = read_subbuffer(local_kconsumerd_fd[i]);
881 /* it's ok to have an unavailable subbuffer (FIXME : is it ?) */
882 if (ret == EAGAIN) {
883 ret = 0;
884 }
d4a1283e
JD
885 }
886 }
887 }
888 }
889end:
9d26659a 890 DBG("polling thread exiting");
d4a1283e
JD
891 if (pollfd != NULL) {
892 free(pollfd);
893 pollfd = NULL;
894 }
895 if (local_kconsumerd_fd != NULL) {
896 free(local_kconsumerd_fd);
897 local_kconsumerd_fd = NULL;
898 }
bcd8d9db 899 cleanup();
d4a1283e
JD
900 return NULL;
901}
902
903/*
904 * usage function on stderr
905 */
906static void usage(void)
907{
908 fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname);
909 fprintf(stderr, " -h, --help "
910 "Display this usage.\n");
911 fprintf(stderr, " -c, --kconsumerd-cmd-sock PATH "
912 "Specify path for the command socket\n");
913 fprintf(stderr, " -e, --kconsumerd-err-sock PATH "
914 "Specify path for the error socket\n");
915 fprintf(stderr, " -d, --daemonize "
916 "Start as a daemon.\n");
917 fprintf(stderr, " -q, --quiet "
918 "No output at all.\n");
919 fprintf(stderr, " -v, --verbose "
920 "Verbose mode. Activate DBG() macro.\n");
921 fprintf(stderr, " -V, --version "
922 "Show version number.\n");
923}
924
925/*
926 * daemon argument parsing
927 */
928static void parse_args(int argc, char **argv)
929{
930 int c;
931
932 static struct option long_options[] = {
933 { "kconsumerd-cmd-sock", 1, 0, 'c' },
934 { "kconsumerd-err-sock", 1, 0, 'e' },
935 { "daemonize", 0, 0, 'd' },
936 { "help", 0, 0, 'h' },
937 { "quiet", 0, 0, 'q' },
938 { "verbose", 0, 0, 'v' },
939 { "version", 0, 0, 'V' },
940 { NULL, 0, 0, 0 }
941 };
942
943 while (1) {
944 int option_index = 0;
945 c = getopt_long(argc, argv, "dhqvV" "c:e:", long_options, &option_index);
946 if (c == -1) {
947 break;
948 }
949
950 switch (c) {
914a571b
JD
951 case 0:
952 fprintf(stderr, "option %s", long_options[option_index].name);
953 if (optarg) {
954 fprintf(stderr, " with arg %s\n", optarg);
955 }
956 break;
957 case 'c':
958 snprintf(command_sock_path, PATH_MAX, "%s", optarg);
959 break;
960 case 'e':
961 snprintf(error_sock_path, PATH_MAX, "%s", optarg);
962 break;
963 case 'd':
964 opt_daemon = 1;
965 break;
966 case 'h':
967 usage();
968 exit(EXIT_FAILURE);
969 case 'q':
970 opt_quiet = 1;
971 break;
972 case 'v':
973 opt_verbose = 1;
974 break;
975 case 'V':
976 fprintf(stdout, "%s\n", VERSION);
977 exit(EXIT_SUCCESS);
978 default:
979 usage();
980 exit(EXIT_FAILURE);
d4a1283e
JD
981 }
982 }
983}
984
985
986/*
987 * main
988 */
989int main(int argc, char **argv)
990{
991 int i;
992 int ret = 0;
993 void *status;
994
995 /* Parse arguments */
996 progname = argv[0];
997 parse_args(argc, argv);
998
999 /* Daemonize */
1000 if (opt_daemon) {
1001 ret = daemon(0, 0);
1002 if (ret < 0) {
1003 perror("daemon");
1004 goto error;
1005 }
1006 }
1007
1008 if (strlen(command_sock_path) == 0) {
1009 snprintf(command_sock_path, PATH_MAX,
1010 KCONSUMERD_CMD_SOCK_PATH);
1011 }
1012 if (strlen(error_sock_path) == 0) {
1013 snprintf(error_sock_path, PATH_MAX,
1014 KCONSUMERD_ERR_SOCK_PATH);
1015 }
1016
1017 if (set_signal_handler() < 0) {
1018 goto error;
1019 }
1020
252fd492
JD
1021 /* create the pipe to wake to polling thread when needed */
1022 ret = pipe(poll_pipe);
1023 if (ret < 0) {
1024 perror("Error creating poll pipe");
1025 goto end;
1026 }
1027
d4a1283e
JD
1028 /* Connect to the socket created by ltt-sessiond to report errors */
1029 DBG("Connecting to error socket %s", error_sock_path);
1030 error_socket = lttcomm_connect_unix_sock(error_sock_path);
1031 /* not a fatal error, but all communication with ltt-sessiond will fail */
1032 if (error_socket < 0) {
1033 WARN("Cannot connect to error socket, is ltt-sessiond started ?");
1034 }
1035
1036 /* Create the thread to manage the receive of fd */
1037 ret = pthread_create(&threads[0], NULL, thread_receive_fds, (void *) NULL);
1038 if (ret != 0) {
1039 perror("pthread_create");
1040 goto error;
1041 }
1042
1043 /* Create thread to manage the polling/writing of traces */
1044 ret = pthread_create(&threads[1], NULL, thread_poll_fds, (void *) NULL);
1045 if (ret != 0) {
1046 perror("pthread_create");
1047 goto error;
1048 }
1049
1050 for (i = 0; i < 2; i++) {
1051 ret = pthread_join(threads[i], &status);
1052 if (ret != 0) {
1053 perror("pthread_join");
1054 goto error;
1055 }
1056 }
1057 ret = EXIT_SUCCESS;
1058 send_error(KCONSUMERD_EXIT_SUCCESS);
1059 goto end;
1060
1061error:
1062 ret = EXIT_FAILURE;
1063 send_error(KCONSUMERD_EXIT_FAILURE);
1064
1065end:
1066 cleanup();
1067
1068 return ret;
1069}
This page took 0.064707 seconds and 4 git commands to generate.