52266f9eb2a5f18971acbd0b531f8ff4b0c0851e
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.cpp
1 /*
2 * Copyright (C) 2011 EfficiOS Inc.
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #define _LGPL_SOURCE
11 #include <poll.h>
12 #include <pthread.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <sys/mman.h>
16 #include <sys/socket.h>
17 #include <sys/types.h>
18 #include <inttypes.h>
19 #include <unistd.h>
20 #include <sys/stat.h>
21 #include <stdint.h>
22
23 #include <bin/lttng-consumerd/health-consumerd.h>
24 #include <common/common.h>
25 #include <common/kernel-ctl/kernel-ctl.h>
26 #include <common/sessiond-comm/sessiond-comm.h>
27 #include <common/sessiond-comm/relayd.h>
28 #include <common/compat/fcntl.h>
29 #include <common/compat/endian.h>
30 #include <common/pipe.h>
31 #include <common/relayd/relayd.h>
32 #include <common/utils.h>
33 #include <common/consumer/consumer-stream.h>
34 #include <common/index/index.h>
35 #include <common/consumer/consumer-timer.h>
36 #include <common/optional.h>
37 #include <common/buffer-view.h>
38 #include <common/consumer/consumer.h>
39 #include <common/consumer/metadata-bucket.h>
40
41 #include "kernel-consumer.h"
42
43 extern struct lttng_consumer_global_data the_consumer_data;
44 extern int consumer_poll_timeout;
45
46 /*
47 * Take a snapshot for a specific fd
48 *
49 * Returns 0 on success, < 0 on error
50 */
51 int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
52 {
53 int ret = 0;
54 int infd = stream->wait_fd;
55
56 ret = kernctl_snapshot(infd);
57 /*
58 * -EAGAIN is not an error, it just means that there is no data to
59 * be read.
60 */
61 if (ret != 0 && ret != -EAGAIN) {
62 PERROR("Getting sub-buffer snapshot.");
63 }
64
65 return ret;
66 }
67
68 /*
69 * Sample consumed and produced positions for a specific fd.
70 *
71 * Returns 0 on success, < 0 on error.
72 */
73 int lttng_kconsumer_sample_snapshot_positions(
74 struct lttng_consumer_stream *stream)
75 {
76 LTTNG_ASSERT(stream);
77
78 return kernctl_snapshot_sample_positions(stream->wait_fd);
79 }
80
81 /*
82 * Get the produced position
83 *
84 * Returns 0 on success, < 0 on error
85 */
86 int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
87 unsigned long *pos)
88 {
89 int ret;
90 int infd = stream->wait_fd;
91
92 ret = kernctl_snapshot_get_produced(infd, pos);
93 if (ret != 0) {
94 PERROR("kernctl_snapshot_get_produced");
95 }
96
97 return ret;
98 }
99
100 /*
101 * Get the consumerd position
102 *
103 * Returns 0 on success, < 0 on error
104 */
105 int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
106 unsigned long *pos)
107 {
108 int ret;
109 int infd = stream->wait_fd;
110
111 ret = kernctl_snapshot_get_consumed(infd, pos);
112 if (ret != 0) {
113 PERROR("kernctl_snapshot_get_consumed");
114 }
115
116 return ret;
117 }
118
119 static
120 int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
121 const char **addr)
122 {
123 int ret;
124 unsigned long mmap_offset;
125 const char *mmap_base = (const char *) stream->mmap_base;
126
127 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
128 if (ret < 0) {
129 PERROR("Failed to get mmap read offset");
130 goto error;
131 }
132
133 *addr = mmap_base + mmap_offset;
134 error:
135 return ret;
136 }
137
138 /*
139 * Take a snapshot of all the stream of a channel
140 * RCU read-side lock must be held across this function to ensure existence of
141 * channel.
142 *
143 * Returns 0 on success, < 0 on error
144 */
145 static int lttng_kconsumer_snapshot_channel(
146 struct lttng_consumer_channel *channel,
147 uint64_t key, char *path, uint64_t relayd_id,
148 uint64_t nb_packets_per_stream,
149 struct lttng_consumer_local_data *ctx)
150 {
151 int ret;
152 struct lttng_consumer_stream *stream;
153
154 DBG("Kernel consumer snapshot channel %" PRIu64, key);
155
156 /* Prevent channel modifications while we perform the snapshot.*/
157 pthread_mutex_lock(&channel->lock);
158
159 rcu_read_lock();
160
161 /* Splice is not supported yet for channel snapshot. */
162 if (channel->output != CONSUMER_CHANNEL_MMAP) {
163 ERR("Unsupported output type for channel \"%s\": mmap output is required to record a snapshot",
164 channel->name);
165 ret = -1;
166 goto end;
167 }
168
169 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
170 unsigned long consumed_pos, produced_pos;
171
172 health_code_update();
173
174 /*
175 * Lock stream because we are about to change its state.
176 */
177 pthread_mutex_lock(&stream->lock);
178
179 LTTNG_ASSERT(channel->trace_chunk);
180 if (!lttng_trace_chunk_get(channel->trace_chunk)) {
181 /*
182 * Can't happen barring an internal error as the channel
183 * holds a reference to the trace chunk.
184 */
185 ERR("Failed to acquire reference to channel's trace chunk");
186 ret = -1;
187 goto end_unlock;
188 }
189 LTTNG_ASSERT(!stream->trace_chunk);
190 stream->trace_chunk = channel->trace_chunk;
191
192 /*
193 * Assign the received relayd ID so we can use it for streaming. The streams
194 * are not visible to anyone so this is OK to change it.
195 */
196 stream->net_seq_idx = relayd_id;
197 channel->relayd_id = relayd_id;
198 if (relayd_id != (uint64_t) -1ULL) {
199 ret = consumer_send_relayd_stream(stream, path);
200 if (ret < 0) {
201 ERR("sending stream to relayd");
202 goto end_unlock;
203 }
204 } else {
205 ret = consumer_stream_create_output_files(stream,
206 false);
207 if (ret < 0) {
208 goto end_unlock;
209 }
210 DBG("Kernel consumer snapshot stream (%" PRIu64 ")",
211 stream->key);
212 }
213
214 ret = kernctl_buffer_flush_empty(stream->wait_fd);
215 if (ret < 0) {
216 /*
217 * Doing a buffer flush which does not take into
218 * account empty packets. This is not perfect
219 * for stream intersection, but required as a
220 * fall-back when "flush_empty" is not
221 * implemented by lttng-modules.
222 */
223 ret = kernctl_buffer_flush(stream->wait_fd);
224 if (ret < 0) {
225 ERR("Failed to flush kernel stream");
226 goto end_unlock;
227 }
228 goto end_unlock;
229 }
230
231 ret = lttng_kconsumer_take_snapshot(stream);
232 if (ret < 0) {
233 ERR("Taking kernel snapshot");
234 goto end_unlock;
235 }
236
237 ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
238 if (ret < 0) {
239 ERR("Produced kernel snapshot position");
240 goto end_unlock;
241 }
242
243 ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
244 if (ret < 0) {
245 ERR("Consumerd kernel snapshot position");
246 goto end_unlock;
247 }
248
249 consumed_pos = consumer_get_consume_start_pos(consumed_pos,
250 produced_pos, nb_packets_per_stream,
251 stream->max_sb_size);
252
253 while ((long) (consumed_pos - produced_pos) < 0) {
254 ssize_t read_len;
255 unsigned long len, padded_len;
256 const char *subbuf_addr;
257 struct lttng_buffer_view subbuf_view;
258
259 health_code_update();
260 DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
261
262 ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
263 if (ret < 0) {
264 if (ret != -EAGAIN) {
265 PERROR("kernctl_get_subbuf snapshot");
266 goto end_unlock;
267 }
268 DBG("Kernel consumer get subbuf failed. Skipping it.");
269 consumed_pos += stream->max_sb_size;
270 stream->chan->lost_packets++;
271 continue;
272 }
273
274 ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
275 if (ret < 0) {
276 ERR("Snapshot kernctl_get_subbuf_size");
277 goto error_put_subbuf;
278 }
279
280 ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
281 if (ret < 0) {
282 ERR("Snapshot kernctl_get_padded_subbuf_size");
283 goto error_put_subbuf;
284 }
285
286 ret = get_current_subbuf_addr(stream, &subbuf_addr);
287 if (ret) {
288 goto error_put_subbuf;
289 }
290
291 subbuf_view = lttng_buffer_view_init(
292 subbuf_addr, 0, padded_len);
293 read_len = lttng_consumer_on_read_subbuffer_mmap(
294 stream, &subbuf_view,
295 padded_len - len);
296 /*
297 * We write the padded len in local tracefiles but the data len
298 * when using a relay. Display the error but continue processing
299 * to try to release the subbuffer.
300 */
301 if (relayd_id != (uint64_t) -1ULL) {
302 if (read_len != len) {
303 ERR("Error sending to the relay (ret: %zd != len: %lu)",
304 read_len, len);
305 }
306 } else {
307 if (read_len != padded_len) {
308 ERR("Error writing to tracefile (ret: %zd != len: %lu)",
309 read_len, padded_len);
310 }
311 }
312
313 ret = kernctl_put_subbuf(stream->wait_fd);
314 if (ret < 0) {
315 ERR("Snapshot kernctl_put_subbuf");
316 goto end_unlock;
317 }
318 consumed_pos += stream->max_sb_size;
319 }
320
321 if (relayd_id == (uint64_t) -1ULL) {
322 if (stream->out_fd >= 0) {
323 ret = close(stream->out_fd);
324 if (ret < 0) {
325 PERROR("Kernel consumer snapshot close out_fd");
326 goto end_unlock;
327 }
328 stream->out_fd = -1;
329 }
330 } else {
331 close_relayd_stream(stream);
332 stream->net_seq_idx = (uint64_t) -1ULL;
333 }
334 lttng_trace_chunk_put(stream->trace_chunk);
335 stream->trace_chunk = NULL;
336 pthread_mutex_unlock(&stream->lock);
337 }
338
339 /* All good! */
340 ret = 0;
341 goto end;
342
343 error_put_subbuf:
344 ret = kernctl_put_subbuf(stream->wait_fd);
345 if (ret < 0) {
346 ERR("Snapshot kernctl_put_subbuf error path");
347 }
348 end_unlock:
349 pthread_mutex_unlock(&stream->lock);
350 end:
351 rcu_read_unlock();
352 pthread_mutex_unlock(&channel->lock);
353 return ret;
354 }
355
356 /*
357 * Read the whole metadata available for a snapshot.
358 * RCU read-side lock must be held across this function to ensure existence of
359 * metadata_channel.
360 *
361 * Returns 0 on success, < 0 on error
362 */
363 static int lttng_kconsumer_snapshot_metadata(
364 struct lttng_consumer_channel *metadata_channel,
365 uint64_t key, char *path, uint64_t relayd_id,
366 struct lttng_consumer_local_data *ctx)
367 {
368 int ret, use_relayd = 0;
369 ssize_t ret_read;
370 struct lttng_consumer_stream *metadata_stream;
371
372 LTTNG_ASSERT(ctx);
373
374 DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
375 key, path);
376
377 rcu_read_lock();
378
379 metadata_stream = metadata_channel->metadata_stream;
380 LTTNG_ASSERT(metadata_stream);
381
382 metadata_stream->read_subbuffer_ops.lock(metadata_stream);
383 LTTNG_ASSERT(metadata_channel->trace_chunk);
384 LTTNG_ASSERT(metadata_stream->trace_chunk);
385
386 /* Flag once that we have a valid relayd for the stream. */
387 if (relayd_id != (uint64_t) -1ULL) {
388 use_relayd = 1;
389 }
390
391 if (use_relayd) {
392 ret = consumer_send_relayd_stream(metadata_stream, path);
393 if (ret < 0) {
394 goto error_snapshot;
395 }
396 } else {
397 ret = consumer_stream_create_output_files(metadata_stream,
398 false);
399 if (ret < 0) {
400 goto error_snapshot;
401 }
402 }
403
404 do {
405 health_code_update();
406
407 ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
408 if (ret_read < 0) {
409 ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
410 ret_read);
411 ret = ret_read;
412 goto error_snapshot;
413 }
414 } while (ret_read > 0);
415
416 if (use_relayd) {
417 close_relayd_stream(metadata_stream);
418 metadata_stream->net_seq_idx = (uint64_t) -1ULL;
419 } else {
420 if (metadata_stream->out_fd >= 0) {
421 ret = close(metadata_stream->out_fd);
422 if (ret < 0) {
423 PERROR("Kernel consumer snapshot metadata close out_fd");
424 /*
425 * Don't go on error here since the snapshot was successful at this
426 * point but somehow the close failed.
427 */
428 }
429 metadata_stream->out_fd = -1;
430 lttng_trace_chunk_put(metadata_stream->trace_chunk);
431 metadata_stream->trace_chunk = NULL;
432 }
433 }
434
435 ret = 0;
436 error_snapshot:
437 metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
438 cds_list_del(&metadata_stream->send_node);
439 consumer_stream_destroy(metadata_stream, NULL);
440 metadata_channel->metadata_stream = NULL;
441 rcu_read_unlock();
442 return ret;
443 }
444
445 /*
446 * Receive command from session daemon and process it.
447 *
448 * Return 1 on success else a negative value or 0.
449 */
450 int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
451 int sock, struct pollfd *consumer_sockpoll)
452 {
453 int ret_func;
454 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
455 struct lttcomm_consumer_msg msg;
456
457 health_code_update();
458
459 {
460 ssize_t ret_recv;
461
462 ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
463 if (ret_recv != sizeof(msg)) {
464 if (ret_recv > 0) {
465 lttng_consumer_send_error(ctx,
466 LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
467 ret_recv = -1;
468 }
469 return ret_recv;
470 }
471 }
472
473 health_code_update();
474
475 /* Deprecated command */
476 LTTNG_ASSERT(msg.cmd_type != LTTNG_CONSUMER_STOP);
477
478 health_code_update();
479
480 /* relayd needs RCU read-side protection */
481 rcu_read_lock();
482
483 switch (msg.cmd_type) {
484 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
485 {
486 uint32_t major = msg.u.relayd_sock.major;
487 uint32_t minor = msg.u.relayd_sock.minor;
488 enum lttcomm_sock_proto protocol = (enum lttcomm_sock_proto)
489 msg.u.relayd_sock.relayd_socket_protocol;
490
491 /* Session daemon status message are handled in the following call. */
492 consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
493 msg.u.relayd_sock.type, ctx, sock,
494 consumer_sockpoll, msg.u.relayd_sock.session_id,
495 msg.u.relayd_sock.relayd_session_id, major,
496 minor, protocol);
497 goto end_nosignal;
498 }
499 case LTTNG_CONSUMER_ADD_CHANNEL:
500 {
501 struct lttng_consumer_channel *new_channel;
502 int ret_send_status, ret_add_channel = 0;
503 const uint64_t chunk_id = msg.u.channel.chunk_id.value;
504
505 health_code_update();
506
507 /* First send a status message before receiving the fds. */
508 ret_send_status = consumer_send_status_msg(sock, ret_code);
509 if (ret_send_status < 0) {
510 /* Somehow, the session daemon is not responding anymore. */
511 goto error_fatal;
512 }
513
514 health_code_update();
515
516 DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
517 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
518 msg.u.channel.session_id,
519 msg.u.channel.chunk_id.is_set ?
520 &chunk_id : NULL,
521 msg.u.channel.pathname,
522 msg.u.channel.name,
523 msg.u.channel.relayd_id, msg.u.channel.output,
524 msg.u.channel.tracefile_size,
525 msg.u.channel.tracefile_count, 0,
526 msg.u.channel.monitor,
527 msg.u.channel.live_timer_interval,
528 msg.u.channel.is_live,
529 NULL, NULL);
530 if (new_channel == NULL) {
531 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
532 goto end_nosignal;
533 }
534 new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
535 switch (msg.u.channel.output) {
536 case LTTNG_EVENT_SPLICE:
537 new_channel->output = CONSUMER_CHANNEL_SPLICE;
538 break;
539 case LTTNG_EVENT_MMAP:
540 new_channel->output = CONSUMER_CHANNEL_MMAP;
541 break;
542 default:
543 ERR("Channel output unknown %d", msg.u.channel.output);
544 goto end_nosignal;
545 }
546
547 /* Translate and save channel type. */
548 switch (msg.u.channel.type) {
549 case CONSUMER_CHANNEL_TYPE_DATA:
550 case CONSUMER_CHANNEL_TYPE_METADATA:
551 new_channel->type = (consumer_channel_type) msg.u.channel.type;
552 break;
553 default:
554 abort();
555 goto end_nosignal;
556 };
557
558 health_code_update();
559
560 if (ctx->on_recv_channel != NULL) {
561 int ret_recv_channel =
562 ctx->on_recv_channel(new_channel);
563 if (ret_recv_channel == 0) {
564 ret_add_channel = consumer_add_channel(
565 new_channel, ctx);
566 } else if (ret_recv_channel < 0) {
567 goto end_nosignal;
568 }
569 } else {
570 ret_add_channel =
571 consumer_add_channel(new_channel, ctx);
572 }
573 if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA &&
574 !ret_add_channel) {
575 int monitor_start_ret;
576
577 DBG("Consumer starting monitor timer");
578 consumer_timer_live_start(new_channel,
579 msg.u.channel.live_timer_interval);
580 monitor_start_ret = consumer_timer_monitor_start(
581 new_channel,
582 msg.u.channel.monitor_timer_interval);
583 if (monitor_start_ret < 0) {
584 ERR("Starting channel monitoring timer failed");
585 goto end_nosignal;
586 }
587 }
588
589 health_code_update();
590
591 /* If we received an error in add_channel, we need to report it. */
592 if (ret_add_channel < 0) {
593 ret_send_status = consumer_send_status_msg(
594 sock, ret_add_channel);
595 if (ret_send_status < 0) {
596 goto error_fatal;
597 }
598 goto end_nosignal;
599 }
600
601 goto end_nosignal;
602 }
603 case LTTNG_CONSUMER_ADD_STREAM:
604 {
605 int fd;
606 struct lttng_pipe *stream_pipe;
607 struct lttng_consumer_stream *new_stream;
608 struct lttng_consumer_channel *channel;
609 int alloc_ret = 0;
610 int ret_send_status, ret_poll, ret_get_max_subbuf_size;
611 ssize_t ret_pipe_write, ret_recv;
612
613 /*
614 * Get stream's channel reference. Needed when adding the stream to the
615 * global hash table.
616 */
617 channel = consumer_find_channel(msg.u.stream.channel_key);
618 if (!channel) {
619 /*
620 * We could not find the channel. Can happen if cpu hotplug
621 * happens while tearing down.
622 */
623 ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key);
624 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
625 }
626
627 health_code_update();
628
629 /* First send a status message before receiving the fds. */
630 ret_send_status = consumer_send_status_msg(sock, ret_code);
631 if (ret_send_status < 0) {
632 /* Somehow, the session daemon is not responding anymore. */
633 goto error_add_stream_fatal;
634 }
635
636 health_code_update();
637
638 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
639 /* Channel was not found. */
640 goto error_add_stream_nosignal;
641 }
642
643 /* Blocking call */
644 health_poll_entry();
645 ret_poll = lttng_consumer_poll_socket(consumer_sockpoll);
646 health_poll_exit();
647 if (ret_poll) {
648 goto error_add_stream_fatal;
649 }
650
651 health_code_update();
652
653 /* Get stream file descriptor from socket */
654 ret_recv = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
655 if (ret_recv != sizeof(fd)) {
656 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
657 ret_func = ret_recv;
658 goto end;
659 }
660
661 health_code_update();
662
663 /*
664 * Send status code to session daemon only if the recv works. If the
665 * above recv() failed, the session daemon is notified through the
666 * error socket and the teardown is eventually done.
667 */
668 ret_send_status = consumer_send_status_msg(sock, ret_code);
669 if (ret_send_status < 0) {
670 /* Somehow, the session daemon is not responding anymore. */
671 goto error_add_stream_nosignal;
672 }
673
674 health_code_update();
675
676 pthread_mutex_lock(&channel->lock);
677 new_stream = consumer_stream_create(
678 channel,
679 channel->key,
680 fd,
681 channel->name,
682 channel->relayd_id,
683 channel->session_id,
684 channel->trace_chunk,
685 msg.u.stream.cpu,
686 &alloc_ret,
687 channel->type,
688 channel->monitor);
689 if (new_stream == NULL) {
690 switch (alloc_ret) {
691 case -ENOMEM:
692 case -EINVAL:
693 default:
694 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
695 break;
696 }
697 pthread_mutex_unlock(&channel->lock);
698 goto error_add_stream_nosignal;
699 }
700
701 new_stream->wait_fd = fd;
702 ret_get_max_subbuf_size = kernctl_get_max_subbuf_size(
703 new_stream->wait_fd, &new_stream->max_sb_size);
704 if (ret_get_max_subbuf_size < 0) {
705 pthread_mutex_unlock(&channel->lock);
706 ERR("Failed to get kernel maximal subbuffer size");
707 goto error_add_stream_nosignal;
708 }
709
710 consumer_stream_update_channel_attributes(new_stream,
711 channel);
712
713 /*
714 * We've just assigned the channel to the stream so increment the
715 * refcount right now. We don't need to increment the refcount for
716 * streams in no monitor because we handle manually the cleanup of
717 * those. It is very important to make sure there is NO prior
718 * consumer_del_stream() calls or else the refcount will be unbalanced.
719 */
720 if (channel->monitor) {
721 uatomic_inc(&new_stream->chan->refcount);
722 }
723
724 /*
725 * The buffer flush is done on the session daemon side for the kernel
726 * so no need for the stream "hangup_flush_done" variable to be
727 * tracked. This is important for a kernel stream since we don't rely
728 * on the flush state of the stream to read data. It's not the case for
729 * user space tracing.
730 */
731 new_stream->hangup_flush_done = 0;
732
733 health_code_update();
734
735 pthread_mutex_lock(&new_stream->lock);
736 if (ctx->on_recv_stream) {
737 int ret_recv_stream = ctx->on_recv_stream(new_stream);
738 if (ret_recv_stream < 0) {
739 pthread_mutex_unlock(&new_stream->lock);
740 pthread_mutex_unlock(&channel->lock);
741 consumer_stream_free(new_stream);
742 goto error_add_stream_nosignal;
743 }
744 }
745 health_code_update();
746
747 if (new_stream->metadata_flag) {
748 channel->metadata_stream = new_stream;
749 }
750
751 /* Do not monitor this stream. */
752 if (!channel->monitor) {
753 DBG("Kernel consumer add stream %s in no monitor mode with "
754 "relayd id %" PRIu64, new_stream->name,
755 new_stream->net_seq_idx);
756 cds_list_add(&new_stream->send_node, &channel->streams.head);
757 pthread_mutex_unlock(&new_stream->lock);
758 pthread_mutex_unlock(&channel->lock);
759 goto end_add_stream;
760 }
761
762 /* Send stream to relayd if the stream has an ID. */
763 if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
764 int ret_send_relayd_stream;
765
766 ret_send_relayd_stream = consumer_send_relayd_stream(
767 new_stream, new_stream->chan->pathname);
768 if (ret_send_relayd_stream < 0) {
769 pthread_mutex_unlock(&new_stream->lock);
770 pthread_mutex_unlock(&channel->lock);
771 consumer_stream_free(new_stream);
772 goto error_add_stream_nosignal;
773 }
774
775 /*
776 * If adding an extra stream to an already
777 * existing channel (e.g. cpu hotplug), we need
778 * to send the "streams_sent" command to relayd.
779 */
780 if (channel->streams_sent_to_relayd) {
781 int ret_send_relayd_streams_sent;
782
783 ret_send_relayd_streams_sent =
784 consumer_send_relayd_streams_sent(
785 new_stream->net_seq_idx);
786 if (ret_send_relayd_streams_sent < 0) {
787 pthread_mutex_unlock(&new_stream->lock);
788 pthread_mutex_unlock(&channel->lock);
789 goto error_add_stream_nosignal;
790 }
791 }
792 }
793 pthread_mutex_unlock(&new_stream->lock);
794 pthread_mutex_unlock(&channel->lock);
795
796 /* Get the right pipe where the stream will be sent. */
797 if (new_stream->metadata_flag) {
798 consumer_add_metadata_stream(new_stream);
799 stream_pipe = ctx->consumer_metadata_pipe;
800 } else {
801 consumer_add_data_stream(new_stream);
802 stream_pipe = ctx->consumer_data_pipe;
803 }
804
805 /* Visible to other threads */
806 new_stream->globally_visible = 1;
807
808 health_code_update();
809
810 ret_pipe_write = lttng_pipe_write(
811 stream_pipe, &new_stream, sizeof(new_stream));
812 if (ret_pipe_write < 0) {
813 ERR("Consumer write %s stream to pipe %d",
814 new_stream->metadata_flag ? "metadata" : "data",
815 lttng_pipe_get_writefd(stream_pipe));
816 if (new_stream->metadata_flag) {
817 consumer_del_stream_for_metadata(new_stream);
818 } else {
819 consumer_del_stream_for_data(new_stream);
820 }
821 goto error_add_stream_nosignal;
822 }
823
824 DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64,
825 new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id);
826 end_add_stream:
827 break;
828 error_add_stream_nosignal:
829 goto end_nosignal;
830 error_add_stream_fatal:
831 goto error_fatal;
832 }
833 case LTTNG_CONSUMER_STREAMS_SENT:
834 {
835 struct lttng_consumer_channel *channel;
836 int ret_send_status;
837
838 /*
839 * Get stream's channel reference. Needed when adding the stream to the
840 * global hash table.
841 */
842 channel = consumer_find_channel(msg.u.sent_streams.channel_key);
843 if (!channel) {
844 /*
845 * We could not find the channel. Can happen if cpu hotplug
846 * happens while tearing down.
847 */
848 ERR("Unable to find channel key %" PRIu64,
849 msg.u.sent_streams.channel_key);
850 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
851 }
852
853 health_code_update();
854
855 /*
856 * Send status code to session daemon.
857 */
858 ret_send_status = consumer_send_status_msg(sock, ret_code);
859 if (ret_send_status < 0 ||
860 ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
861 /* Somehow, the session daemon is not responding anymore. */
862 goto error_streams_sent_nosignal;
863 }
864
865 health_code_update();
866
867 /*
868 * We should not send this message if we don't monitor the
869 * streams in this channel.
870 */
871 if (!channel->monitor) {
872 goto end_error_streams_sent;
873 }
874
875 health_code_update();
876 /* Send stream to relayd if the stream has an ID. */
877 if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
878 int ret_send_relay_streams;
879
880 ret_send_relay_streams = consumer_send_relayd_streams_sent(
881 msg.u.sent_streams.net_seq_idx);
882 if (ret_send_relay_streams < 0) {
883 goto error_streams_sent_nosignal;
884 }
885 channel->streams_sent_to_relayd = true;
886 }
887 end_error_streams_sent:
888 break;
889 error_streams_sent_nosignal:
890 goto end_nosignal;
891 }
892 case LTTNG_CONSUMER_UPDATE_STREAM:
893 {
894 rcu_read_unlock();
895 return -ENOSYS;
896 }
897 case LTTNG_CONSUMER_DESTROY_RELAYD:
898 {
899 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
900 struct consumer_relayd_sock_pair *relayd;
901 int ret_send_status;
902
903 DBG("Kernel consumer destroying relayd %" PRIu64, index);
904
905 /* Get relayd reference if exists. */
906 relayd = consumer_find_relayd(index);
907 if (relayd == NULL) {
908 DBG("Unable to find relayd %" PRIu64, index);
909 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
910 }
911
912 /*
913 * Each relayd socket pair has a refcount of stream attached to it
914 * which tells if the relayd is still active or not depending on the
915 * refcount value.
916 *
917 * This will set the destroy flag of the relayd object and destroy it
918 * if the refcount reaches zero when called.
919 *
920 * The destroy can happen either here or when a stream fd hangs up.
921 */
922 if (relayd) {
923 consumer_flag_relayd_for_destroy(relayd);
924 }
925
926 health_code_update();
927
928 ret_send_status = consumer_send_status_msg(sock, ret_code);
929 if (ret_send_status < 0) {
930 /* Somehow, the session daemon is not responding anymore. */
931 goto error_fatal;
932 }
933
934 goto end_nosignal;
935 }
936 case LTTNG_CONSUMER_DATA_PENDING:
937 {
938 int32_t ret_data_pending;
939 uint64_t id = msg.u.data_pending.session_id;
940 ssize_t ret_send;
941
942 DBG("Kernel consumer data pending command for id %" PRIu64, id);
943
944 ret_data_pending = consumer_data_pending(id);
945
946 health_code_update();
947
948 /* Send back returned value to session daemon */
949 ret_send = lttcomm_send_unix_sock(sock, &ret_data_pending,
950 sizeof(ret_data_pending));
951 if (ret_send < 0) {
952 PERROR("send data pending ret code");
953 goto error_fatal;
954 }
955
956 /*
957 * No need to send back a status message since the data pending
958 * returned value is the response.
959 */
960 break;
961 }
962 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
963 {
964 struct lttng_consumer_channel *channel;
965 uint64_t key = msg.u.snapshot_channel.key;
966 int ret_send_status;
967
968 channel = consumer_find_channel(key);
969 if (!channel) {
970 ERR("Channel %" PRIu64 " not found", key);
971 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
972 } else {
973 if (msg.u.snapshot_channel.metadata == 1) {
974 int ret_snapshot;
975
976 ret_snapshot = lttng_kconsumer_snapshot_metadata(
977 channel, key,
978 msg.u.snapshot_channel.pathname,
979 msg.u.snapshot_channel.relayd_id,
980 ctx);
981 if (ret_snapshot < 0) {
982 ERR("Snapshot metadata failed");
983 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
984 }
985 } else {
986 int ret_snapshot;
987
988 ret_snapshot = lttng_kconsumer_snapshot_channel(
989 channel, key,
990 msg.u.snapshot_channel.pathname,
991 msg.u.snapshot_channel.relayd_id,
992 msg.u.snapshot_channel
993 .nb_packets_per_stream,
994 ctx);
995 if (ret_snapshot < 0) {
996 ERR("Snapshot channel failed");
997 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
998 }
999 }
1000 }
1001 health_code_update();
1002
1003 ret_send_status = consumer_send_status_msg(sock, ret_code);
1004 if (ret_send_status < 0) {
1005 /* Somehow, the session daemon is not responding anymore. */
1006 goto end_nosignal;
1007 }
1008 break;
1009 }
1010 case LTTNG_CONSUMER_DESTROY_CHANNEL:
1011 {
1012 uint64_t key = msg.u.destroy_channel.key;
1013 struct lttng_consumer_channel *channel;
1014 int ret_send_status;
1015
1016 channel = consumer_find_channel(key);
1017 if (!channel) {
1018 ERR("Kernel consumer destroy channel %" PRIu64 " not found", key);
1019 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1020 }
1021
1022 health_code_update();
1023
1024 ret_send_status = consumer_send_status_msg(sock, ret_code);
1025 if (ret_send_status < 0) {
1026 /* Somehow, the session daemon is not responding anymore. */
1027 goto end_destroy_channel;
1028 }
1029
1030 health_code_update();
1031
1032 /* Stop right now if no channel was found. */
1033 if (!channel) {
1034 goto end_destroy_channel;
1035 }
1036
1037 /*
1038 * This command should ONLY be issued for channel with streams set in
1039 * no monitor mode.
1040 */
1041 LTTNG_ASSERT(!channel->monitor);
1042
1043 /*
1044 * The refcount should ALWAYS be 0 in the case of a channel in no
1045 * monitor mode.
1046 */
1047 LTTNG_ASSERT(!uatomic_sub_return(&channel->refcount, 1));
1048
1049 consumer_del_channel(channel);
1050 end_destroy_channel:
1051 goto end_nosignal;
1052 }
1053 case LTTNG_CONSUMER_DISCARDED_EVENTS:
1054 {
1055 ssize_t ret;
1056 uint64_t count;
1057 struct lttng_consumer_channel *channel;
1058 uint64_t id = msg.u.discarded_events.session_id;
1059 uint64_t key = msg.u.discarded_events.channel_key;
1060
1061 DBG("Kernel consumer discarded events command for session id %"
1062 PRIu64 ", channel key %" PRIu64, id, key);
1063
1064 channel = consumer_find_channel(key);
1065 if (!channel) {
1066 ERR("Kernel consumer discarded events channel %"
1067 PRIu64 " not found", key);
1068 count = 0;
1069 } else {
1070 count = channel->discarded_events;
1071 }
1072
1073 health_code_update();
1074
1075 /* Send back returned value to session daemon */
1076 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
1077 if (ret < 0) {
1078 PERROR("send discarded events");
1079 goto error_fatal;
1080 }
1081
1082 break;
1083 }
1084 case LTTNG_CONSUMER_LOST_PACKETS:
1085 {
1086 ssize_t ret;
1087 uint64_t count;
1088 struct lttng_consumer_channel *channel;
1089 uint64_t id = msg.u.lost_packets.session_id;
1090 uint64_t key = msg.u.lost_packets.channel_key;
1091
1092 DBG("Kernel consumer lost packets command for session id %"
1093 PRIu64 ", channel key %" PRIu64, id, key);
1094
1095 channel = consumer_find_channel(key);
1096 if (!channel) {
1097 ERR("Kernel consumer lost packets channel %"
1098 PRIu64 " not found", key);
1099 count = 0;
1100 } else {
1101 count = channel->lost_packets;
1102 }
1103
1104 health_code_update();
1105
1106 /* Send back returned value to session daemon */
1107 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
1108 if (ret < 0) {
1109 PERROR("send lost packets");
1110 goto error_fatal;
1111 }
1112
1113 break;
1114 }
1115 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1116 {
1117 int channel_monitor_pipe;
1118 int ret_send_status, ret_set_channel_monitor_pipe;
1119 ssize_t ret_recv;
1120
1121 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1122 /* Successfully received the command's type. */
1123 ret_send_status = consumer_send_status_msg(sock, ret_code);
1124 if (ret_send_status < 0) {
1125 goto error_fatal;
1126 }
1127
1128 ret_recv = lttcomm_recv_fds_unix_sock(
1129 sock, &channel_monitor_pipe, 1);
1130 if (ret_recv != sizeof(channel_monitor_pipe)) {
1131 ERR("Failed to receive channel monitor pipe");
1132 goto error_fatal;
1133 }
1134
1135 DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
1136 ret_set_channel_monitor_pipe =
1137 consumer_timer_thread_set_channel_monitor_pipe(
1138 channel_monitor_pipe);
1139 if (!ret_set_channel_monitor_pipe) {
1140 int flags;
1141 int ret_fcntl;
1142
1143 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1144 /* Set the pipe as non-blocking. */
1145 ret_fcntl = fcntl(channel_monitor_pipe, F_GETFL, 0);
1146 if (ret_fcntl == -1) {
1147 PERROR("fcntl get flags of the channel monitoring pipe");
1148 goto error_fatal;
1149 }
1150 flags = ret_fcntl;
1151
1152 ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL,
1153 flags | O_NONBLOCK);
1154 if (ret_fcntl == -1) {
1155 PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
1156 goto error_fatal;
1157 }
1158 DBG("Channel monitor pipe set as non-blocking");
1159 } else {
1160 ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
1161 }
1162 ret_send_status = consumer_send_status_msg(sock, ret_code);
1163 if (ret_send_status < 0) {
1164 goto error_fatal;
1165 }
1166 break;
1167 }
1168 case LTTNG_CONSUMER_ROTATE_CHANNEL:
1169 {
1170 struct lttng_consumer_channel *channel;
1171 uint64_t key = msg.u.rotate_channel.key;
1172 int ret_send_status;
1173
1174 DBG("Consumer rotate channel %" PRIu64, key);
1175
1176 channel = consumer_find_channel(key);
1177 if (!channel) {
1178 ERR("Channel %" PRIu64 " not found", key);
1179 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1180 } else {
1181 /*
1182 * Sample the rotate position of all the streams in this channel.
1183 */
1184 int ret_rotate_channel;
1185
1186 ret_rotate_channel = lttng_consumer_rotate_channel(
1187 channel, key,
1188 msg.u.rotate_channel.relayd_id,
1189 msg.u.rotate_channel.metadata, ctx);
1190 if (ret_rotate_channel < 0) {
1191 ERR("Rotate channel failed");
1192 ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
1193 }
1194
1195 health_code_update();
1196 }
1197
1198 ret_send_status = consumer_send_status_msg(sock, ret_code);
1199 if (ret_send_status < 0) {
1200 /* Somehow, the session daemon is not responding anymore. */
1201 goto error_rotate_channel;
1202 }
1203 if (channel) {
1204 /* Rotate the streams that are ready right now. */
1205 int ret_rotate;
1206
1207 ret_rotate = lttng_consumer_rotate_ready_streams(
1208 channel, key, ctx);
1209 if (ret_rotate < 0) {
1210 ERR("Rotate ready streams failed");
1211 }
1212 }
1213 break;
1214 error_rotate_channel:
1215 goto end_nosignal;
1216 }
1217 case LTTNG_CONSUMER_CLEAR_CHANNEL:
1218 {
1219 struct lttng_consumer_channel *channel;
1220 uint64_t key = msg.u.clear_channel.key;
1221 int ret_send_status;
1222
1223 channel = consumer_find_channel(key);
1224 if (!channel) {
1225 DBG("Channel %" PRIu64 " not found", key);
1226 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1227 } else {
1228 int ret_clear_channel;
1229
1230 ret_clear_channel =
1231 lttng_consumer_clear_channel(channel);
1232 if (ret_clear_channel) {
1233 ERR("Clear channel failed");
1234 ret_code = (lttcomm_return_code) ret_clear_channel;
1235 }
1236
1237 health_code_update();
1238 }
1239
1240 ret_send_status = consumer_send_status_msg(sock, ret_code);
1241 if (ret_send_status < 0) {
1242 /* Somehow, the session daemon is not responding anymore. */
1243 goto end_nosignal;
1244 }
1245
1246 break;
1247 }
1248 case LTTNG_CONSUMER_INIT:
1249 {
1250 int ret_send_status;
1251
1252 ret_code = lttng_consumer_init_command(ctx,
1253 msg.u.init.sessiond_uuid);
1254 health_code_update();
1255 ret_send_status = consumer_send_status_msg(sock, ret_code);
1256 if (ret_send_status < 0) {
1257 /* Somehow, the session daemon is not responding anymore. */
1258 goto end_nosignal;
1259 }
1260 break;
1261 }
1262 case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
1263 {
1264 const struct lttng_credentials credentials = {
1265 .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid),
1266 .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid),
1267 };
1268 const bool is_local_trace =
1269 !msg.u.create_trace_chunk.relayd_id.is_set;
1270 const uint64_t relayd_id =
1271 msg.u.create_trace_chunk.relayd_id.value;
1272 const char *chunk_override_name =
1273 *msg.u.create_trace_chunk.override_name ?
1274 msg.u.create_trace_chunk.override_name :
1275 NULL;
1276 struct lttng_directory_handle *chunk_directory_handle = NULL;
1277
1278 /*
1279 * The session daemon will only provide a chunk directory file
1280 * descriptor for local traces.
1281 */
1282 if (is_local_trace) {
1283 int chunk_dirfd;
1284 int ret_send_status;
1285 ssize_t ret_recv;
1286
1287 /* Acnowledge the reception of the command. */
1288 ret_send_status = consumer_send_status_msg(
1289 sock, LTTCOMM_CONSUMERD_SUCCESS);
1290 if (ret_send_status < 0) {
1291 /* Somehow, the session daemon is not responding anymore. */
1292 goto end_nosignal;
1293 }
1294
1295 ret_recv = lttcomm_recv_fds_unix_sock(
1296 sock, &chunk_dirfd, 1);
1297 if (ret_recv != sizeof(chunk_dirfd)) {
1298 ERR("Failed to receive trace chunk directory file descriptor");
1299 goto error_fatal;
1300 }
1301
1302 DBG("Received trace chunk directory fd (%d)",
1303 chunk_dirfd);
1304 chunk_directory_handle = lttng_directory_handle_create_from_dirfd(
1305 chunk_dirfd);
1306 if (!chunk_directory_handle) {
1307 ERR("Failed to initialize chunk directory handle from directory file descriptor");
1308 if (close(chunk_dirfd)) {
1309 PERROR("Failed to close chunk directory file descriptor");
1310 }
1311 goto error_fatal;
1312 }
1313 }
1314
1315 ret_code = lttng_consumer_create_trace_chunk(
1316 !is_local_trace ? &relayd_id : NULL,
1317 msg.u.create_trace_chunk.session_id,
1318 msg.u.create_trace_chunk.chunk_id,
1319 (time_t) msg.u.create_trace_chunk
1320 .creation_timestamp,
1321 chunk_override_name,
1322 msg.u.create_trace_chunk.credentials.is_set ?
1323 &credentials :
1324 NULL,
1325 chunk_directory_handle);
1326 lttng_directory_handle_put(chunk_directory_handle);
1327 goto end_msg_sessiond;
1328 }
1329 case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
1330 {
1331 enum lttng_trace_chunk_command_type close_command =
1332 (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value;
1333 const uint64_t relayd_id =
1334 msg.u.close_trace_chunk.relayd_id.value;
1335 struct lttcomm_consumer_close_trace_chunk_reply reply;
1336 char path[LTTNG_PATH_MAX];
1337 ssize_t ret_send;
1338
1339 ret_code = lttng_consumer_close_trace_chunk(
1340 msg.u.close_trace_chunk.relayd_id.is_set ?
1341 &relayd_id :
1342 NULL,
1343 msg.u.close_trace_chunk.session_id,
1344 msg.u.close_trace_chunk.chunk_id,
1345 (time_t) msg.u.close_trace_chunk.close_timestamp,
1346 msg.u.close_trace_chunk.close_command.is_set ?
1347 &close_command :
1348 NULL, path);
1349 reply.ret_code = ret_code;
1350 reply.path_length = strlen(path) + 1;
1351 ret_send = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
1352 if (ret_send != sizeof(reply)) {
1353 goto error_fatal;
1354 }
1355 ret_send = lttcomm_send_unix_sock(
1356 sock, path, reply.path_length);
1357 if (ret_send != reply.path_length) {
1358 goto error_fatal;
1359 }
1360 goto end_nosignal;
1361 }
1362 case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
1363 {
1364 const uint64_t relayd_id =
1365 msg.u.trace_chunk_exists.relayd_id.value;
1366
1367 ret_code = lttng_consumer_trace_chunk_exists(
1368 msg.u.trace_chunk_exists.relayd_id.is_set ?
1369 &relayd_id : NULL,
1370 msg.u.trace_chunk_exists.session_id,
1371 msg.u.trace_chunk_exists.chunk_id);
1372 goto end_msg_sessiond;
1373 }
1374 case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
1375 {
1376 const uint64_t key = msg.u.open_channel_packets.key;
1377 struct lttng_consumer_channel *channel =
1378 consumer_find_channel(key);
1379
1380 if (channel) {
1381 pthread_mutex_lock(&channel->lock);
1382 ret_code = lttng_consumer_open_channel_packets(channel);
1383 pthread_mutex_unlock(&channel->lock);
1384 } else {
1385 WARN("Channel %" PRIu64 " not found", key);
1386 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1387 }
1388
1389 health_code_update();
1390 goto end_msg_sessiond;
1391 }
1392 default:
1393 goto end_nosignal;
1394 }
1395
1396 end_nosignal:
1397 /*
1398 * Return 1 to indicate success since the 0 value can be a socket
1399 * shutdown during the recv() or send() call.
1400 */
1401 ret_func = 1;
1402 goto end;
1403 error_fatal:
1404 /* This will issue a consumer stop. */
1405 ret_func = -1;
1406 goto end;
1407 end_msg_sessiond:
1408 /*
1409 * The returned value here is not useful since either way we'll return 1 to
1410 * the caller because the session daemon socket management is done
1411 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
1412 */
1413 {
1414 int ret_send_status;
1415
1416 ret_send_status = consumer_send_status_msg(sock, ret_code);
1417 if (ret_send_status < 0) {
1418 goto error_fatal;
1419 }
1420 }
1421
1422 ret_func = 1;
1423
1424 end:
1425 health_code_update();
1426 rcu_read_unlock();
1427 return ret_func;
1428 }
1429
1430 /*
1431 * Sync metadata meaning request them to the session daemon and snapshot to the
1432 * metadata thread can consumer them.
1433 *
1434 * Metadata stream lock MUST be acquired.
1435 */
1436 enum sync_metadata_status lttng_kconsumer_sync_metadata(
1437 struct lttng_consumer_stream *metadata)
1438 {
1439 int ret;
1440 enum sync_metadata_status status;
1441
1442 LTTNG_ASSERT(metadata);
1443
1444 ret = kernctl_buffer_flush(metadata->wait_fd);
1445 if (ret < 0) {
1446 ERR("Failed to flush kernel stream");
1447 status = SYNC_METADATA_STATUS_ERROR;
1448 goto end;
1449 }
1450
1451 ret = kernctl_snapshot(metadata->wait_fd);
1452 if (ret < 0) {
1453 if (errno == EAGAIN) {
1454 /* No new metadata, exit. */
1455 DBG("Sync metadata, no new kernel metadata");
1456 status = SYNC_METADATA_STATUS_NO_DATA;
1457 } else {
1458 ERR("Sync metadata, taking kernel snapshot failed.");
1459 status = SYNC_METADATA_STATUS_ERROR;
1460 }
1461 } else {
1462 status = SYNC_METADATA_STATUS_NEW_DATA;
1463 }
1464
1465 end:
1466 return status;
1467 }
1468
1469 static
1470 int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
1471 struct stream_subbuffer *subbuf)
1472 {
1473 int ret;
1474
1475 ret = kernctl_get_subbuf_size(
1476 stream->wait_fd, &subbuf->info.data.subbuf_size);
1477 if (ret) {
1478 goto end;
1479 }
1480
1481 ret = kernctl_get_padded_subbuf_size(
1482 stream->wait_fd, &subbuf->info.data.padded_subbuf_size);
1483 if (ret) {
1484 goto end;
1485 }
1486
1487 end:
1488 return ret;
1489 }
1490
1491 static
1492 int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
1493 struct stream_subbuffer *subbuf)
1494 {
1495 int ret;
1496
1497 ret = extract_common_subbuffer_info(stream, subbuf);
1498 if (ret) {
1499 goto end;
1500 }
1501
1502 ret = kernctl_get_metadata_version(
1503 stream->wait_fd, &subbuf->info.metadata.version);
1504 if (ret) {
1505 goto end;
1506 }
1507
1508 end:
1509 return ret;
1510 }
1511
1512 static
1513 int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
1514 struct stream_subbuffer *subbuf)
1515 {
1516 int ret;
1517
1518 ret = extract_common_subbuffer_info(stream, subbuf);
1519 if (ret) {
1520 goto end;
1521 }
1522
1523 ret = kernctl_get_packet_size(
1524 stream->wait_fd, &subbuf->info.data.packet_size);
1525 if (ret < 0) {
1526 PERROR("Failed to get sub-buffer packet size");
1527 goto end;
1528 }
1529
1530 ret = kernctl_get_content_size(
1531 stream->wait_fd, &subbuf->info.data.content_size);
1532 if (ret < 0) {
1533 PERROR("Failed to get sub-buffer content size");
1534 goto end;
1535 }
1536
1537 ret = kernctl_get_timestamp_begin(
1538 stream->wait_fd, &subbuf->info.data.timestamp_begin);
1539 if (ret < 0) {
1540 PERROR("Failed to get sub-buffer begin timestamp");
1541 goto end;
1542 }
1543
1544 ret = kernctl_get_timestamp_end(
1545 stream->wait_fd, &subbuf->info.data.timestamp_end);
1546 if (ret < 0) {
1547 PERROR("Failed to get sub-buffer end timestamp");
1548 goto end;
1549 }
1550
1551 ret = kernctl_get_events_discarded(
1552 stream->wait_fd, &subbuf->info.data.events_discarded);
1553 if (ret) {
1554 PERROR("Failed to get sub-buffer events discarded count");
1555 goto end;
1556 }
1557
1558 ret = kernctl_get_sequence_number(stream->wait_fd,
1559 &subbuf->info.data.sequence_number.value);
1560 if (ret) {
1561 /* May not be supported by older LTTng-modules. */
1562 if (ret != -ENOTTY) {
1563 PERROR("Failed to get sub-buffer sequence number");
1564 goto end;
1565 }
1566 } else {
1567 subbuf->info.data.sequence_number.is_set = true;
1568 }
1569
1570 ret = kernctl_get_stream_id(
1571 stream->wait_fd, &subbuf->info.data.stream_id);
1572 if (ret < 0) {
1573 PERROR("Failed to get stream id");
1574 goto end;
1575 }
1576
1577 ret = kernctl_get_instance_id(stream->wait_fd,
1578 &subbuf->info.data.stream_instance_id.value);
1579 if (ret) {
1580 /* May not be supported by older LTTng-modules. */
1581 if (ret != -ENOTTY) {
1582 PERROR("Failed to get stream instance id");
1583 goto end;
1584 }
1585 } else {
1586 subbuf->info.data.stream_instance_id.is_set = true;
1587 }
1588 end:
1589 return ret;
1590 }
1591
1592 static
1593 enum get_next_subbuffer_status get_subbuffer_common(
1594 struct lttng_consumer_stream *stream,
1595 struct stream_subbuffer *subbuffer)
1596 {
1597 int ret;
1598 enum get_next_subbuffer_status status;
1599
1600 ret = kernctl_get_next_subbuf(stream->wait_fd);
1601 switch (ret) {
1602 case 0:
1603 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1604 break;
1605 case -ENODATA:
1606 case -EAGAIN:
1607 /*
1608 * The caller only expects -ENODATA when there is no data to
1609 * read, but the kernel tracer returns -EAGAIN when there is
1610 * currently no data for a non-finalized stream, and -ENODATA
1611 * when there is no data for a finalized stream. Those can be
1612 * combined into a -ENODATA return value.
1613 */
1614 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1615 goto end;
1616 default:
1617 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1618 goto end;
1619 }
1620
1621 ret = stream->read_subbuffer_ops.extract_subbuffer_info(
1622 stream, subbuffer);
1623 if (ret) {
1624 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1625 }
1626 end:
1627 return status;
1628 }
1629
1630 static
1631 enum get_next_subbuffer_status get_next_subbuffer_splice(
1632 struct lttng_consumer_stream *stream,
1633 struct stream_subbuffer *subbuffer)
1634 {
1635 const enum get_next_subbuffer_status status =
1636 get_subbuffer_common(stream, subbuffer);
1637
1638 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
1639 goto end;
1640 }
1641
1642 subbuffer->buffer.fd = stream->wait_fd;
1643 end:
1644 return status;
1645 }
1646
1647 static
1648 enum get_next_subbuffer_status get_next_subbuffer_mmap(
1649 struct lttng_consumer_stream *stream,
1650 struct stream_subbuffer *subbuffer)
1651 {
1652 int ret;
1653 enum get_next_subbuffer_status status;
1654 const char *addr;
1655
1656 status = get_subbuffer_common(stream, subbuffer);
1657 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
1658 goto end;
1659 }
1660
1661 ret = get_current_subbuf_addr(stream, &addr);
1662 if (ret) {
1663 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1664 goto end;
1665 }
1666
1667 subbuffer->buffer.buffer = lttng_buffer_view_init(
1668 addr, 0, subbuffer->info.data.padded_subbuf_size);
1669 end:
1670 return status;
1671 }
1672
1673 static
1674 enum get_next_subbuffer_status get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
1675 struct stream_subbuffer *subbuffer)
1676 {
1677 int ret;
1678 const char *addr;
1679 bool coherent;
1680 enum get_next_subbuffer_status status;
1681
1682 ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd,
1683 &coherent);
1684 if (ret) {
1685 goto end;
1686 }
1687
1688 ret = stream->read_subbuffer_ops.extract_subbuffer_info(
1689 stream, subbuffer);
1690 if (ret) {
1691 goto end;
1692 }
1693
1694 LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
1695
1696 ret = get_current_subbuf_addr(stream, &addr);
1697 if (ret) {
1698 goto end;
1699 }
1700
1701 subbuffer->buffer.buffer = lttng_buffer_view_init(
1702 addr, 0, subbuffer->info.data.padded_subbuf_size);
1703 DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
1704 subbuffer->info.metadata.padded_subbuf_size,
1705 coherent ? "true" : "false");
1706 end:
1707 /*
1708 * The caller only expects -ENODATA when there is no data to read, but
1709 * the kernel tracer returns -EAGAIN when there is currently no data
1710 * for a non-finalized stream, and -ENODATA when there is no data for a
1711 * finalized stream. Those can be combined into a -ENODATA return value.
1712 */
1713 switch (ret) {
1714 case 0:
1715 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1716 break;
1717 case -ENODATA:
1718 case -EAGAIN:
1719 /*
1720 * The caller only expects -ENODATA when there is no data to
1721 * read, but the kernel tracer returns -EAGAIN when there is
1722 * currently no data for a non-finalized stream, and -ENODATA
1723 * when there is no data for a finalized stream. Those can be
1724 * combined into a -ENODATA return value.
1725 */
1726 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1727 break;
1728 default:
1729 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1730 break;
1731 }
1732
1733 return status;
1734 }
1735
1736 static
1737 int put_next_subbuffer(struct lttng_consumer_stream *stream,
1738 struct stream_subbuffer *subbuffer)
1739 {
1740 const int ret = kernctl_put_next_subbuf(stream->wait_fd);
1741
1742 if (ret) {
1743 if (ret == -EFAULT) {
1744 PERROR("Error in unreserving sub buffer");
1745 } else if (ret == -EIO) {
1746 /* Should never happen with newer LTTng versions */
1747 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted");
1748 }
1749 }
1750
1751 return ret;
1752 }
1753
1754 static
1755 bool is_get_next_check_metadata_available(int tracer_fd)
1756 {
1757 const int ret = kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL);
1758 const bool available = ret != -ENOTTY;
1759
1760 if (ret == 0) {
1761 /* get succeeded, make sure to put the subbuffer. */
1762 kernctl_put_subbuf(tracer_fd);
1763 }
1764
1765 return available;
1766 }
1767
1768 static
1769 int signal_metadata(struct lttng_consumer_stream *stream,
1770 struct lttng_consumer_local_data *ctx)
1771 {
1772 ASSERT_LOCKED(stream->metadata_rdv_lock);
1773 return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
1774 }
1775
1776 static
1777 int lttng_kconsumer_set_stream_ops(
1778 struct lttng_consumer_stream *stream)
1779 {
1780 int ret = 0;
1781
1782 if (stream->metadata_flag && stream->chan->is_live) {
1783 DBG("Attempting to enable metadata bucketization for live consumers");
1784 if (is_get_next_check_metadata_available(stream->wait_fd)) {
1785 DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
1786 stream->read_subbuffer_ops.get_next_subbuffer =
1787 get_next_subbuffer_metadata_check;
1788 ret = consumer_stream_enable_metadata_bucketization(
1789 stream);
1790 if (ret) {
1791 goto end;
1792 }
1793 } else {
1794 /*
1795 * The kernel tracer version is too old to indicate
1796 * when the metadata stream has reached a "coherent"
1797 * (parseable) point.
1798 *
1799 * This means that a live viewer may see an incoherent
1800 * sequence of metadata and fail to parse it.
1801 */
1802 WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
1803 metadata_bucket_destroy(stream->metadata_bucket);
1804 stream->metadata_bucket = NULL;
1805 }
1806
1807 stream->read_subbuffer_ops.on_sleep = signal_metadata;
1808 }
1809
1810 if (!stream->read_subbuffer_ops.get_next_subbuffer) {
1811 if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
1812 stream->read_subbuffer_ops.get_next_subbuffer =
1813 get_next_subbuffer_mmap;
1814 } else {
1815 stream->read_subbuffer_ops.get_next_subbuffer =
1816 get_next_subbuffer_splice;
1817 }
1818 }
1819
1820 if (stream->metadata_flag) {
1821 stream->read_subbuffer_ops.extract_subbuffer_info =
1822 extract_metadata_subbuffer_info;
1823 } else {
1824 stream->read_subbuffer_ops.extract_subbuffer_info =
1825 extract_data_subbuffer_info;
1826 if (stream->chan->is_live) {
1827 stream->read_subbuffer_ops.send_live_beacon =
1828 consumer_flush_kernel_index;
1829 }
1830 }
1831
1832 stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
1833 end:
1834 return ret;
1835 }
1836
1837 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
1838 {
1839 int ret;
1840
1841 LTTNG_ASSERT(stream);
1842
1843 /*
1844 * Don't create anything if this is set for streaming or if there is
1845 * no current trace chunk on the parent channel.
1846 */
1847 if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
1848 stream->chan->trace_chunk) {
1849 ret = consumer_stream_create_output_files(stream, true);
1850 if (ret) {
1851 goto error;
1852 }
1853 }
1854
1855 if (stream->output == LTTNG_EVENT_MMAP) {
1856 /* get the len of the mmap region */
1857 unsigned long mmap_len;
1858
1859 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
1860 if (ret != 0) {
1861 PERROR("kernctl_get_mmap_len");
1862 goto error_close_fd;
1863 }
1864 stream->mmap_len = (size_t) mmap_len;
1865
1866 stream->mmap_base = mmap(NULL, stream->mmap_len, PROT_READ,
1867 MAP_PRIVATE, stream->wait_fd, 0);
1868 if (stream->mmap_base == MAP_FAILED) {
1869 PERROR("Error mmaping");
1870 ret = -1;
1871 goto error_close_fd;
1872 }
1873 }
1874
1875 ret = lttng_kconsumer_set_stream_ops(stream);
1876 if (ret) {
1877 goto error_close_fd;
1878 }
1879
1880 /* we return 0 to let the library handle the FD internally */
1881 return 0;
1882
1883 error_close_fd:
1884 if (stream->out_fd >= 0) {
1885 int err;
1886
1887 err = close(stream->out_fd);
1888 LTTNG_ASSERT(!err);
1889 stream->out_fd = -1;
1890 }
1891 error:
1892 return ret;
1893 }
1894
1895 /*
1896 * Check if data is still being extracted from the buffers for a specific
1897 * stream. Consumer data lock MUST be acquired before calling this function
1898 * and the stream lock.
1899 *
1900 * Return 1 if the traced data are still getting read else 0 meaning that the
1901 * data is available for trace viewer reading.
1902 */
1903 int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
1904 {
1905 int ret;
1906
1907 LTTNG_ASSERT(stream);
1908
1909 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
1910 ret = 0;
1911 goto end;
1912 }
1913
1914 ret = kernctl_get_next_subbuf(stream->wait_fd);
1915 if (ret == 0) {
1916 /* There is still data so let's put back this subbuffer. */
1917 ret = kernctl_put_subbuf(stream->wait_fd);
1918 LTTNG_ASSERT(ret == 0);
1919 ret = 1; /* Data is pending */
1920 goto end;
1921 }
1922
1923 /* Data is NOT pending and ready to be read. */
1924 ret = 0;
1925
1926 end:
1927 return ret;
1928 }
This page took 0.073667 seconds and 4 git commands to generate.