Use compiler-agnostic defines to silence warning
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.cpp
... / ...
CommitLineData
1/*
2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
3 * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 *
5 * SPDX-License-Identifier: GPL-2.0-only
6 *
7 */
8
9#define _LGPL_SOURCE
10#include "consumer-output.hpp"
11#include "consumer.hpp"
12#include "health-sessiond.hpp"
13#include "lttng-sessiond.hpp"
14#include "ust-app.hpp"
15#include "utils.hpp"
16
17#include <common/common.hpp>
18#include <common/defaults.hpp>
19#include <common/relayd/relayd.hpp>
20#include <common/string-utils/format.hpp>
21#include <common/urcu.hpp>
22#include <common/uri.hpp>
23
24#include <inttypes.h>
25#include <stdio.h>
26#include <stdlib.h>
27#include <string.h>
28#include <sys/stat.h>
29#include <sys/types.h>
30#include <unistd.h>
31
32/*
33 * Return allocated full pathname of the session using the consumer trace path
34 * and subdir if available.
35 *
36 * The caller can safely free(3) the returned value. On error, NULL is
37 * returned.
38 */
39char *setup_channel_trace_path(struct consumer_output *consumer,
40 const char *session_path,
41 size_t *consumer_path_offset)
42{
43 int ret;
44 char *pathname;
45
46 LTTNG_ASSERT(consumer);
47 LTTNG_ASSERT(session_path);
48
49 health_code_update();
50
51 /*
52 * Allocate the string ourself to make sure we never exceed
53 * LTTNG_PATH_MAX.
54 */
55 pathname = calloc<char>(LTTNG_PATH_MAX);
56 if (!pathname) {
57 goto error;
58 }
59
60 /* Get correct path name destination */
61 if (consumer->type == CONSUMER_DST_NET && consumer->relay_major_version == 2 &&
62 consumer->relay_minor_version < 11) {
63 ret = snprintf(pathname,
64 LTTNG_PATH_MAX,
65 "%s%s/%s/%s",
66 consumer->dst.net.base_dir,
67 consumer->chunk_path,
68 consumer->domain_subdir,
69 session_path);
70 *consumer_path_offset = 0;
71 } else {
72 ret = snprintf(
73 pathname, LTTNG_PATH_MAX, "%s/%s", consumer->domain_subdir, session_path);
74 *consumer_path_offset = strlen(consumer->domain_subdir) + 1;
75 }
76 DBG3("Consumer trace path relative to current trace chunk: \"%s\"", pathname);
77 if (ret < 0) {
78 PERROR("Failed to format channel path");
79 goto error;
80 } else if (ret >= LTTNG_PATH_MAX) {
81 ERR("Truncation occurred while formatting channel path");
82 goto error;
83 }
84
85 return pathname;
86error:
87 free(pathname);
88 return nullptr;
89}
90
91/*
92 * Send a data payload using a given consumer socket of size len.
93 *
94 * The consumer socket lock MUST be acquired before calling this since this
95 * function can change the fd value.
96 *
97 * Return 0 on success else a negative value on error.
98 */
99int consumer_socket_send(struct consumer_socket *socket, const void *msg, size_t len)
100{
101 int fd;
102 ssize_t size;
103
104 LTTNG_ASSERT(socket);
105 LTTNG_ASSERT(socket->fd_ptr);
106 LTTNG_ASSERT(msg);
107
108 /* Consumer socket is invalid. Stopping. */
109 fd = *socket->fd_ptr;
110 if (fd < 0) {
111 goto error;
112 }
113
114 size = lttcomm_send_unix_sock(fd, msg, len);
115 if (size < 0) {
116 /* The above call will print a PERROR on error. */
117 DBG("Error when sending data to consumer on sock %d", fd);
118 /*
119 * At this point, the socket is not usable anymore thus closing it and
120 * setting the file descriptor to -1 so it is not reused.
121 */
122
123 /* This call will PERROR on error. */
124 (void) lttcomm_close_unix_sock(fd);
125 *socket->fd_ptr = -1;
126 goto error;
127 }
128
129 return 0;
130
131error:
132 return -1;
133}
134
135/*
136 * Receive a data payload using a given consumer socket of size len.
137 *
138 * The consumer socket lock MUST be acquired before calling this since this
139 * function can change the fd value.
140 *
141 * Return 0 on success else a negative value on error.
142 */
143int consumer_socket_recv(struct consumer_socket *socket, void *msg, size_t len)
144{
145 int fd;
146 ssize_t size;
147
148 LTTNG_ASSERT(socket);
149 LTTNG_ASSERT(socket->fd_ptr);
150 LTTNG_ASSERT(msg);
151
152 /* Consumer socket is invalid. Stopping. */
153 fd = *socket->fd_ptr;
154 if (fd < 0) {
155 goto error;
156 }
157
158 size = lttcomm_recv_unix_sock(fd, msg, len);
159 if (size <= 0) {
160 /* The above call will print a PERROR on error. */
161 DBG("Error when receiving data from the consumer socket %d", fd);
162 /*
163 * At this point, the socket is not usable anymore thus closing it and
164 * setting the file descriptor to -1 so it is not reused.
165 */
166
167 /* This call will PERROR on error. */
168 (void) lttcomm_close_unix_sock(fd);
169 *socket->fd_ptr = -1;
170 goto error;
171 }
172
173 return 0;
174
175error:
176 return -1;
177}
178
179/*
180 * Receive a reply command status message from the consumer. Consumer socket
181 * lock MUST be acquired before calling this function.
182 *
183 * Return 0 on success, -1 on recv error or a negative lttng error code which
184 * was possibly returned by the consumer.
185 */
186int consumer_recv_status_reply(struct consumer_socket *sock)
187{
188 int ret;
189 struct lttcomm_consumer_status_msg reply;
190
191 LTTNG_ASSERT(sock);
192
193 ret = consumer_socket_recv(sock, &reply, sizeof(reply));
194 if (ret < 0) {
195 goto end;
196 }
197
198 if (reply.ret_code == LTTCOMM_CONSUMERD_SUCCESS) {
199 /* All good. */
200 ret = 0;
201 } else {
202 ret = -reply.ret_code;
203 DBG("Consumer ret code %d", ret);
204 }
205
206end:
207 return ret;
208}
209
210/*
211 * Once the ASK_CHANNEL command is sent to the consumer, the channel
212 * information are sent back. This call receives that data and populates key
213 * and stream_count.
214 *
215 * On success return 0 and both key and stream_count are set. On error, a
216 * negative value is sent back and both parameters are untouched.
217 */
218int consumer_recv_status_channel(struct consumer_socket *sock,
219 uint64_t *key,
220 unsigned int *stream_count)
221{
222 int ret;
223 struct lttcomm_consumer_status_channel reply;
224
225 LTTNG_ASSERT(sock);
226 LTTNG_ASSERT(stream_count);
227 LTTNG_ASSERT(key);
228
229 ret = consumer_socket_recv(sock, &reply, sizeof(reply));
230 if (ret < 0) {
231 goto end;
232 }
233
234 /* An error is possible so don't touch the key and stream_count. */
235 if (reply.ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
236 ret = -1;
237 goto end;
238 }
239
240 *key = reply.key;
241 *stream_count = reply.stream_count;
242 ret = 0;
243
244end:
245 return ret;
246}
247
248/*
249 * Send destroy relayd command to consumer.
250 *
251 * On success return positive value. On error, negative value.
252 */
253int consumer_send_destroy_relayd(struct consumer_socket *sock, struct consumer_output *consumer)
254{
255 int ret;
256 struct lttcomm_consumer_msg msg;
257
258 LTTNG_ASSERT(consumer);
259 LTTNG_ASSERT(sock);
260
261 DBG2("Sending destroy relayd command to consumer sock %d", *sock->fd_ptr);
262
263 memset(&msg, 0, sizeof(msg));
264 msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
265 msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
266
267 pthread_mutex_lock(sock->lock);
268 ret = consumer_socket_send(sock, &msg, sizeof(msg));
269 if (ret < 0) {
270 goto error;
271 }
272
273 /* Don't check the return value. The caller will do it. */
274 ret = consumer_recv_status_reply(sock);
275
276 DBG2("Consumer send destroy relayd command done");
277
278error:
279 pthread_mutex_unlock(sock->lock);
280 return ret;
281}
282
283/*
284 * For each consumer socket in the consumer output object, send a destroy
285 * relayd command.
286 */
287void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
288{
289 LTTNG_ASSERT(consumer);
290
291 /* Destroy any relayd connection */
292 if (consumer->type != CONSUMER_DST_NET) {
293 return;
294 }
295
296 for (auto *socket :
297 lttng::urcu::lfht_iteration_adapter<consumer_socket,
298 decltype(consumer_socket::node),
299 &consumer_socket::node>(*consumer->socks->ht)) {
300 /* Send destroy relayd command. */
301 const int ret = consumer_send_destroy_relayd(socket, consumer);
302
303 if (ret < 0) {
304 DBG("Unable to send destroy relayd command to consumer");
305 /* Continue since we MUST delete everything at this point. */
306 }
307 }
308}
309
310/*
311 * From a consumer_data structure, allocate and add a consumer socket to the
312 * consumer output.
313 *
314 * Return 0 on success, else negative value on error
315 */
316int consumer_create_socket(struct consumer_data *data, struct consumer_output *output)
317{
318 int ret = 0;
319 struct consumer_socket *socket;
320
321 LTTNG_ASSERT(data);
322
323 const lttng::urcu::read_lock_guard read_lock;
324
325 if (output == nullptr || data->cmd_sock < 0) {
326 /*
327 * Not an error. Possible there is simply not spawned consumer or it's
328 * disabled for the tracing session asking the socket.
329 */
330 goto error;
331 }
332
333 socket = consumer_find_socket(data->cmd_sock, output);
334 if (socket == nullptr) {
335 socket = consumer_allocate_socket(&data->cmd_sock);
336 if (socket == nullptr) {
337 ret = -1;
338 goto error;
339 }
340
341 socket->registered = 0;
342 socket->lock = &data->lock;
343 consumer_add_socket(socket, output);
344 }
345
346 socket->type = data->type;
347
348 DBG3("Consumer socket created (fd: %d) and added to output", data->cmd_sock);
349
350error:
351 return ret;
352}
353
354/*
355 * Return the consumer socket from the given consumer output with the right
356 * bitness. On error, returns NULL.
357 *
358 * The caller MUST acquire a rcu read side lock and keep it until the socket
359 * object reference is not needed anymore.
360 */
361struct consumer_socket *consumer_find_socket_by_bitness(int bits,
362 const struct consumer_output *consumer)
363{
364 int consumer_fd;
365 struct consumer_socket *socket = nullptr;
366
367 ASSERT_RCU_READ_LOCKED();
368
369 switch (bits) {
370 case 64:
371 consumer_fd = uatomic_read(&the_ust_consumerd64_fd);
372 break;
373 case 32:
374 consumer_fd = uatomic_read(&the_ust_consumerd32_fd);
375 break;
376 default:
377 abort();
378 goto end;
379 }
380
381 socket = consumer_find_socket(consumer_fd, consumer);
382 if (!socket) {
383 ERR("Consumer socket fd %d not found in consumer obj %p", consumer_fd, consumer);
384 }
385
386end:
387 return socket;
388}
389
390/*
391 * Find a consumer_socket in a consumer_output hashtable. Read side lock must
392 * be acquired before calling this function and across use of the
393 * returned consumer_socket.
394 */
395struct consumer_socket *consumer_find_socket(int key, const struct consumer_output *consumer)
396{
397 struct lttng_ht_iter iter;
398 struct lttng_ht_node_ulong *node;
399 struct consumer_socket *socket = nullptr;
400
401 ASSERT_RCU_READ_LOCKED();
402
403 /* Negative keys are lookup failures */
404 if (key < 0 || consumer == nullptr) {
405 return nullptr;
406 }
407
408 lttng_ht_lookup(consumer->socks, (void *) ((unsigned long) key), &iter);
409 node = lttng_ht_iter_get_node<lttng_ht_node_ulong>(&iter);
410 if (node != nullptr) {
411 socket = lttng::utils::container_of(node, &consumer_socket::node);
412 }
413
414 return socket;
415}
416
417/*
418 * Allocate a new consumer_socket and return the pointer.
419 */
420struct consumer_socket *consumer_allocate_socket(int *fd)
421{
422 struct consumer_socket *socket = nullptr;
423
424 LTTNG_ASSERT(fd);
425
426 socket = zmalloc<consumer_socket>();
427 if (socket == nullptr) {
428 PERROR("zmalloc consumer socket");
429 goto error;
430 }
431
432 socket->fd_ptr = fd;
433 lttng_ht_node_init_ulong(&socket->node, *fd);
434
435error:
436 return socket;
437}
438
439/*
440 * Add consumer socket to consumer output object. Read side lock must be
441 * acquired before calling this function.
442 */
443void consumer_add_socket(struct consumer_socket *sock, struct consumer_output *consumer)
444{
445 LTTNG_ASSERT(sock);
446 LTTNG_ASSERT(consumer);
447 ASSERT_RCU_READ_LOCKED();
448
449 lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
450}
451
452/*
453 * Delete consumer socket to consumer output object. Read side lock must be
454 * acquired before calling this function.
455 */
456void consumer_del_socket(struct consumer_socket *sock, struct consumer_output *consumer)
457{
458 int ret;
459 struct lttng_ht_iter iter;
460
461 LTTNG_ASSERT(sock);
462 LTTNG_ASSERT(consumer);
463 ASSERT_RCU_READ_LOCKED();
464
465 iter.iter.node = &sock->node.node;
466 ret = lttng_ht_del(consumer->socks, &iter);
467 LTTNG_ASSERT(!ret);
468}
469
470/*
471 * RCU destroy call function.
472 */
473static void destroy_socket_rcu(struct rcu_head *head)
474{
475 struct lttng_ht_node_ulong *node =
476 lttng::utils::container_of(head, &lttng_ht_node_ulong::head);
477 struct consumer_socket *socket = lttng::utils::container_of(node, &consumer_socket::node);
478
479 free(socket);
480}
481
482/*
483 * Destroy and free socket pointer in a call RCU. The call must either:
484 * - have acquired the read side lock before calling this function, or
485 * - guarantee the validity of the `struct consumer_socket` object for the
486 * duration of the call.
487 */
488void consumer_destroy_socket(struct consumer_socket *sock)
489{
490 LTTNG_ASSERT(sock);
491
492 /*
493 * We DO NOT close the file descriptor here since it is global to the
494 * session daemon and is closed only if the consumer dies or a custom
495 * consumer was registered,
496 */
497 if (sock->registered) {
498 DBG3("Consumer socket was registered. Closing fd %d", *sock->fd_ptr);
499 lttcomm_close_unix_sock(*sock->fd_ptr);
500 }
501
502 call_rcu(&sock->node.head, destroy_socket_rcu);
503}
504
505/*
506 * Allocate and assign data to a consumer_output object.
507 *
508 * Return pointer to structure.
509 */
510struct consumer_output *consumer_create_output(enum consumer_dst_type type)
511{
512 struct consumer_output *output = nullptr;
513
514 output = zmalloc<consumer_output>();
515 if (output == nullptr) {
516 PERROR("zmalloc consumer_output");
517 goto error;
518 }
519
520 /* By default, consumer output is enabled */
521 output->enabled = true;
522 output->type = type;
523 output->net_seq_index = (uint64_t) -1ULL;
524 urcu_ref_init(&output->ref);
525
526 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
527
528error:
529 return output;
530}
531
532/*
533 * Iterate over the consumer output socket hash table and destroy them. The
534 * socket file descriptor are only closed if the consumer output was
535 * registered meaning it's an external consumer.
536 */
537void consumer_destroy_output_sockets(struct consumer_output *obj)
538{
539 if (!obj->socks) {
540 return;
541 }
542
543 for (auto *socket :
544 lttng::urcu::lfht_iteration_adapter<consumer_socket,
545 decltype(consumer_socket::node),
546 &consumer_socket::node>(*obj->socks->ht)) {
547 consumer_del_socket(socket, obj);
548 consumer_destroy_socket(socket);
549 }
550}
551
552/*
553 * Delete the consumer_output object from the list and free the ptr.
554 */
555static void consumer_release_output(struct urcu_ref *ref)
556{
557 struct consumer_output *obj = lttng::utils::container_of(ref, &consumer_output::ref);
558
559 consumer_destroy_output_sockets(obj);
560
561 if (obj->socks) {
562 /* Finally destroy HT */
563 lttng_ht_destroy(obj->socks);
564 }
565
566 free(obj);
567}
568
569/*
570 * Get the consumer_output object.
571 */
572void consumer_output_get(struct consumer_output *obj)
573{
574 urcu_ref_get(&obj->ref);
575}
576
577/*
578 * Put the consumer_output object.
579 */
580void consumer_output_put(struct consumer_output *obj)
581{
582 if (!obj) {
583 return;
584 }
585 urcu_ref_put(&obj->ref, consumer_release_output);
586}
587
588/*
589 * Copy consumer output and returned the newly allocated copy.
590 */
591struct consumer_output *consumer_copy_output(struct consumer_output *src)
592{
593 int ret;
594 struct consumer_output *output;
595
596 LTTNG_ASSERT(src);
597
598 output = consumer_create_output(src->type);
599 if (output == nullptr) {
600 goto end;
601 }
602 output->enabled = src->enabled;
603 output->net_seq_index = src->net_seq_index;
604 memcpy(output->domain_subdir, src->domain_subdir, sizeof(output->domain_subdir));
605 output->snapshot = src->snapshot;
606 output->relay_major_version = src->relay_major_version;
607 output->relay_minor_version = src->relay_minor_version;
608 output->relay_allows_clear = src->relay_allows_clear;
609 memcpy(&output->dst, &src->dst, sizeof(output->dst));
610 ret = consumer_copy_sockets(output, src);
611 if (ret < 0) {
612 goto error_put;
613 }
614end:
615 return output;
616
617error_put:
618 consumer_output_put(output);
619 return nullptr;
620}
621
622/*
623 * Copy consumer sockets from src to dst.
624 *
625 * Return 0 on success or else a negative value.
626 */
627int consumer_copy_sockets(struct consumer_output *dst, struct consumer_output *src)
628{
629 int ret = 0;
630
631 LTTNG_ASSERT(dst);
632 LTTNG_ASSERT(src);
633
634 for (auto *socket :
635 lttng::urcu::lfht_iteration_adapter<consumer_socket,
636 decltype(consumer_socket::node),
637 &consumer_socket::node>(*src->socks->ht)) {
638 /* Ignore socket that are already there. */
639 auto *copy_sock = consumer_find_socket(*socket->fd_ptr, dst);
640 if (copy_sock) {
641 continue;
642 }
643
644 /* Create new socket object. */
645 copy_sock = consumer_allocate_socket(socket->fd_ptr);
646 if (copy_sock == nullptr) {
647 ret = -ENOMEM;
648 goto error;
649 }
650
651 copy_sock->registered = socket->registered;
652 /*
653 * This is valid because this lock is shared accross all consumer
654 * object being the global lock of the consumer data structure of the
655 * session daemon.
656 */
657 copy_sock->lock = socket->lock;
658 consumer_add_socket(copy_sock, dst);
659 }
660
661error:
662 return ret;
663}
664
665/*
666 * Set network URI to the consumer output.
667 *
668 * Return 0 on success. Return 1 if the URI were equal. Else, negative value on
669 * error.
670 */
671int consumer_set_network_uri(const ltt_session::locked_ref& session,
672 struct consumer_output *output,
673 struct lttng_uri *uri)
674{
675 int ret;
676 struct lttng_uri *dst_uri = nullptr;
677
678 /* Code flow error safety net. */
679 LTTNG_ASSERT(output);
680 LTTNG_ASSERT(uri);
681
682 switch (uri->stype) {
683 case LTTNG_STREAM_CONTROL:
684 dst_uri = &output->dst.net.control;
685 output->dst.net.control_isset = 1;
686 if (uri->port == 0) {
687 /* Assign default port. */
688 uri->port = DEFAULT_NETWORK_CONTROL_PORT;
689 } else {
690 if (output->dst.net.data_isset && uri->port == output->dst.net.data.port) {
691 ret = -LTTNG_ERR_INVALID;
692 goto error;
693 }
694 }
695 DBG3("Consumer control URI set with port %d", uri->port);
696 break;
697 case LTTNG_STREAM_DATA:
698 dst_uri = &output->dst.net.data;
699 output->dst.net.data_isset = 1;
700 if (uri->port == 0) {
701 /* Assign default port. */
702 uri->port = DEFAULT_NETWORK_DATA_PORT;
703 } else {
704 if (output->dst.net.control_isset &&
705 uri->port == output->dst.net.control.port) {
706 ret = -LTTNG_ERR_INVALID;
707 goto error;
708 }
709 }
710 DBG3("Consumer data URI set with port %d", uri->port);
711 break;
712 default:
713 ERR("Set network uri type unknown %d", uri->stype);
714 ret = -LTTNG_ERR_INVALID;
715 goto error;
716 }
717
718 ret = uri_compare(dst_uri, uri);
719 if (!ret) {
720 /* Same URI, don't touch it and return success. */
721 DBG3("URI network compare are the same");
722 goto equal;
723 }
724
725 /* URIs were not equal, replacing it. */
726 memcpy(dst_uri, uri, sizeof(struct lttng_uri));
727 output->type = CONSUMER_DST_NET;
728 if (dst_uri->stype != LTTNG_STREAM_CONTROL) {
729 /* Only the control uri needs to contain the path. */
730 goto end;
731 }
732
733 /*
734 * If the user has specified a subdir as part of the control
735 * URL, the session's base output directory is:
736 * /RELAYD_OUTPUT_PATH/HOSTNAME/USER_SPECIFIED_DIR
737 *
738 * Hence, the "base_dir" from which all stream files and
739 * session rotation chunks are created takes the form
740 * /HOSTNAME/USER_SPECIFIED_DIR
741 *
742 * If the user has not specified an output directory as part of
743 * the control URL, the base output directory has the form:
744 * /RELAYD_OUTPUT_PATH/HOSTNAME/SESSION_NAME-CREATION_TIME
745 *
746 * Hence, the "base_dir" from which all stream files and
747 * session rotation chunks are created takes the form
748 * /HOSTNAME/SESSION_NAME-CREATION_TIME
749 *
750 * Note that automatically generated session names already
751 * contain the session's creation time. In that case, the
752 * creation time is omitted to prevent it from being duplicated
753 * in the final directory hierarchy.
754 */
755 if (*uri->subdir) {
756 if (strstr(uri->subdir, "../")) {
757 ERR("Network URI subdirs are not allowed to walk up the path hierarchy");
758 ret = -LTTNG_ERR_INVALID;
759 goto error;
760 }
761 ret = snprintf(output->dst.net.base_dir,
762 sizeof(output->dst.net.base_dir),
763 "/%s/%s/",
764 session->hostname,
765 uri->subdir);
766 } else {
767 if (session->has_auto_generated_name) {
768 ret = snprintf(output->dst.net.base_dir,
769 sizeof(output->dst.net.base_dir),
770 "/%s/%s/",
771 session->hostname,
772 session->name);
773 } else {
774 char session_creation_datetime[16];
775 size_t strftime_ret;
776 struct tm *timeinfo;
777
778 timeinfo = localtime(&session->creation_time);
779 if (!timeinfo) {
780 ret = -LTTNG_ERR_FATAL;
781 goto error;
782 }
783 strftime_ret = strftime(session_creation_datetime,
784 sizeof(session_creation_datetime),
785 "%Y%m%d-%H%M%S",
786 timeinfo);
787 if (strftime_ret == 0) {
788 ERR("Failed to format session creation timestamp while setting network URI");
789 ret = -LTTNG_ERR_FATAL;
790 goto error;
791 }
792 ret = snprintf(output->dst.net.base_dir,
793 sizeof(output->dst.net.base_dir),
794 "/%s/%s-%s/",
795 session->hostname,
796 session->name,
797 session_creation_datetime);
798 }
799 }
800 if (ret >= sizeof(output->dst.net.base_dir)) {
801 ret = -LTTNG_ERR_INVALID;
802 ERR("Truncation occurred while setting network output base directory");
803 goto error;
804 } else if (ret == -1) {
805 ret = -LTTNG_ERR_INVALID;
806 PERROR("Error occurred while setting network output base directory");
807 goto error;
808 }
809
810 DBG3("Consumer set network uri base_dir path %s", output->dst.net.base_dir);
811
812end:
813 return 0;
814equal:
815 return 1;
816error:
817 return ret;
818}
819
820/*
821 * Send file descriptor to consumer via sock.
822 *
823 * The consumer socket lock must be held by the caller.
824 */
825int consumer_send_fds(struct consumer_socket *sock, const int *fds, size_t nb_fd)
826{
827 int ret;
828
829 LTTNG_ASSERT(fds);
830 LTTNG_ASSERT(sock);
831 LTTNG_ASSERT(nb_fd > 0);
832 LTTNG_ASSERT(pthread_mutex_trylock(sock->lock) == EBUSY);
833
834 ret = lttcomm_send_fds_unix_sock(*sock->fd_ptr, fds, nb_fd);
835 if (ret < 0) {
836 /* The above call will print a PERROR on error. */
837 DBG("Error when sending consumer fds on sock %d", *sock->fd_ptr);
838 goto error;
839 }
840
841 ret = consumer_recv_status_reply(sock);
842error:
843 return ret;
844}
845
846/*
847 * Consumer send communication message structure to consumer.
848 *
849 * The consumer socket lock must be held by the caller.
850 */
851int consumer_send_msg(struct consumer_socket *sock, const struct lttcomm_consumer_msg *msg)
852{
853 int ret;
854
855 LTTNG_ASSERT(msg);
856 LTTNG_ASSERT(sock);
857 LTTNG_ASSERT(pthread_mutex_trylock(sock->lock) == EBUSY);
858
859 ret = consumer_socket_send(sock, msg, sizeof(struct lttcomm_consumer_msg));
860 if (ret < 0) {
861 goto error;
862 }
863
864 ret = consumer_recv_status_reply(sock);
865
866error:
867 return ret;
868}
869
870/*
871 * Consumer send channel communication message structure to consumer.
872 *
873 * The consumer socket lock must be held by the caller.
874 */
875int consumer_send_channel(struct consumer_socket *sock, struct lttcomm_consumer_msg *msg)
876{
877 int ret;
878
879 LTTNG_ASSERT(msg);
880 LTTNG_ASSERT(sock);
881
882 ret = consumer_send_msg(sock, msg);
883 if (ret < 0) {
884 goto error;
885 }
886
887error:
888 return ret;
889}
890
891/*
892 * Populate the given consumer msg structure with the ask_channel command
893 * information.
894 */
895void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
896 uint64_t subbuf_size,
897 uint64_t num_subbuf,
898 int overwrite,
899 unsigned int switch_timer_interval,
900 unsigned int read_timer_interval,
901 unsigned int live_timer_interval,
902 bool is_in_live_session,
903 unsigned int monitor_timer_interval,
904 int output,
905 int type,
906 uint64_t session_id,
907 const char *pathname,
908 const char *name,
909 uint64_t relayd_id,
910 uint64_t key,
911 const lttng_uuid& uuid,
912 uint32_t chan_id,
913 uint64_t tracefile_size,
914 uint64_t tracefile_count,
915 uint64_t session_id_per_pid,
916 unsigned int monitor,
917 uint32_t ust_app_uid,
918 int64_t blocking_timeout,
919 const char *root_shm_path,
920 const char *shm_path,
921 struct lttng_trace_chunk *trace_chunk,
922 const struct lttng_credentials *buffer_credentials)
923{
924 LTTNG_ASSERT(msg);
925
926 /* Zeroed structure */
927 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
928 msg->u.ask_channel.buffer_credentials.uid = UINT32_MAX;
929 msg->u.ask_channel.buffer_credentials.gid = UINT32_MAX;
930
931 if (trace_chunk) {
932 uint64_t chunk_id;
933 enum lttng_trace_chunk_status chunk_status;
934
935 chunk_status = lttng_trace_chunk_get_id(trace_chunk, &chunk_id);
936 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
937 LTTNG_OPTIONAL_SET(&msg->u.ask_channel.chunk_id, chunk_id);
938 }
939 msg->u.ask_channel.buffer_credentials.uid = lttng_credentials_get_uid(buffer_credentials);
940 msg->u.ask_channel.buffer_credentials.gid = lttng_credentials_get_gid(buffer_credentials);
941
942 msg->cmd_type = LTTNG_CONSUMER_ASK_CHANNEL_CREATION;
943 msg->u.ask_channel.subbuf_size = subbuf_size;
944 msg->u.ask_channel.num_subbuf = num_subbuf;
945 msg->u.ask_channel.overwrite = overwrite;
946 msg->u.ask_channel.switch_timer_interval = switch_timer_interval;
947 msg->u.ask_channel.read_timer_interval = read_timer_interval;
948 msg->u.ask_channel.live_timer_interval = live_timer_interval;
949 msg->u.ask_channel.is_live = is_in_live_session;
950 msg->u.ask_channel.monitor_timer_interval = monitor_timer_interval;
951 msg->u.ask_channel.output = output;
952 msg->u.ask_channel.type = type;
953 msg->u.ask_channel.session_id = session_id;
954 msg->u.ask_channel.session_id_per_pid = session_id_per_pid;
955 msg->u.ask_channel.relayd_id = relayd_id;
956 msg->u.ask_channel.key = key;
957 msg->u.ask_channel.chan_id = chan_id;
958 msg->u.ask_channel.tracefile_size = tracefile_size;
959 msg->u.ask_channel.tracefile_count = tracefile_count;
960 msg->u.ask_channel.monitor = monitor;
961 msg->u.ask_channel.ust_app_uid = ust_app_uid;
962 msg->u.ask_channel.blocking_timeout = blocking_timeout;
963
964 std::copy(uuid.begin(), uuid.end(), msg->u.ask_channel.uuid);
965
966 if (pathname) {
967 strncpy(msg->u.ask_channel.pathname, pathname, sizeof(msg->u.ask_channel.pathname));
968 msg->u.ask_channel.pathname[sizeof(msg->u.ask_channel.pathname) - 1] = '\0';
969 }
970
971 strncpy(msg->u.ask_channel.name, name, sizeof(msg->u.ask_channel.name));
972 msg->u.ask_channel.name[sizeof(msg->u.ask_channel.name) - 1] = '\0';
973
974 if (root_shm_path) {
975 strncpy(msg->u.ask_channel.root_shm_path,
976 root_shm_path,
977 sizeof(msg->u.ask_channel.root_shm_path));
978 msg->u.ask_channel.root_shm_path[sizeof(msg->u.ask_channel.root_shm_path) - 1] =
979 '\0';
980 }
981 if (shm_path) {
982 strncpy(msg->u.ask_channel.shm_path, shm_path, sizeof(msg->u.ask_channel.shm_path));
983 msg->u.ask_channel.shm_path[sizeof(msg->u.ask_channel.shm_path) - 1] = '\0';
984 }
985}
986
987/*
988 * Init channel communication message structure.
989 */
990void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg,
991 uint64_t channel_key,
992 uint64_t session_id,
993 const char *pathname,
994 uint64_t relayd_id,
995 const char *name,
996 unsigned int nb_init_streams,
997 enum lttng_event_output output,
998 int type,
999 uint64_t tracefile_size,
1000 uint64_t tracefile_count,
1001 unsigned int monitor,
1002 unsigned int live_timer_interval,
1003 bool is_in_live_session,
1004 unsigned int monitor_timer_interval,
1005 struct lttng_trace_chunk *trace_chunk)
1006{
1007 LTTNG_ASSERT(msg);
1008
1009 /* Zeroed structure */
1010 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
1011
1012 if (trace_chunk) {
1013 uint64_t chunk_id;
1014 enum lttng_trace_chunk_status chunk_status;
1015
1016 chunk_status = lttng_trace_chunk_get_id(trace_chunk, &chunk_id);
1017 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
1018 LTTNG_OPTIONAL_SET(&msg->u.channel.chunk_id, chunk_id);
1019 }
1020
1021 /* Send channel */
1022 msg->cmd_type = LTTNG_CONSUMER_ADD_CHANNEL;
1023 msg->u.channel.channel_key = channel_key;
1024 msg->u.channel.session_id = session_id;
1025 msg->u.channel.relayd_id = relayd_id;
1026 msg->u.channel.nb_init_streams = nb_init_streams;
1027 msg->u.channel.output = output;
1028 msg->u.channel.type = type;
1029 msg->u.channel.tracefile_size = tracefile_size;
1030 msg->u.channel.tracefile_count = tracefile_count;
1031 msg->u.channel.monitor = monitor;
1032 msg->u.channel.live_timer_interval = live_timer_interval;
1033 msg->u.channel.is_live = is_in_live_session;
1034 msg->u.channel.monitor_timer_interval = monitor_timer_interval;
1035
1036 strncpy(msg->u.channel.pathname, pathname, sizeof(msg->u.channel.pathname));
1037 msg->u.channel.pathname[sizeof(msg->u.channel.pathname) - 1] = '\0';
1038
1039 strncpy(msg->u.channel.name, name, sizeof(msg->u.channel.name));
1040 msg->u.channel.name[sizeof(msg->u.channel.name) - 1] = '\0';
1041}
1042
1043/*
1044 * Init stream communication message structure.
1045 */
1046void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg,
1047 uint64_t channel_key,
1048 uint64_t stream_key,
1049 int32_t cpu)
1050{
1051 LTTNG_ASSERT(msg);
1052
1053 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
1054
1055 msg->cmd_type = LTTNG_CONSUMER_ADD_STREAM;
1056 msg->u.stream.channel_key = channel_key;
1057 msg->u.stream.stream_key = stream_key;
1058 msg->u.stream.cpu = cpu;
1059}
1060
1061void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg,
1062 enum lttng_consumer_command cmd,
1063 uint64_t channel_key,
1064 uint64_t net_seq_idx)
1065{
1066 LTTNG_ASSERT(msg);
1067
1068 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
1069
1070 msg->cmd_type = cmd;
1071 msg->u.sent_streams.channel_key = channel_key;
1072 msg->u.sent_streams.net_seq_idx = net_seq_idx;
1073}
1074
1075/*
1076 * Send stream communication structure to the consumer.
1077 */
1078int consumer_send_stream(struct consumer_socket *sock,
1079 struct consumer_output *dst,
1080 struct lttcomm_consumer_msg *msg,
1081 const int *fds,
1082 size_t nb_fd)
1083{
1084 int ret;
1085
1086 LTTNG_ASSERT(msg);
1087 LTTNG_ASSERT(dst);
1088 LTTNG_ASSERT(sock);
1089 LTTNG_ASSERT(fds);
1090
1091 ret = consumer_send_msg(sock, msg);
1092 if (ret < 0) {
1093 goto error;
1094 }
1095
1096 ret = consumer_send_fds(sock, fds, nb_fd);
1097 if (ret < 0) {
1098 goto error;
1099 }
1100
1101error:
1102 return ret;
1103}
1104
1105/*
1106 * Send relayd socket to consumer associated with a session name.
1107 *
1108 * The consumer socket lock must be held by the caller.
1109 *
1110 * On success return positive value. On error, negative value.
1111 */
1112int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
1113 struct lttcomm_relayd_sock *rsock,
1114 struct consumer_output *consumer,
1115 enum lttng_stream_type type,
1116 uint64_t session_id,
1117 const char *session_name,
1118 const char *hostname,
1119 const char *base_path,
1120 int session_live_timer,
1121 const uint64_t *current_chunk_id,
1122 time_t session_creation_time,
1123 bool session_name_contains_creation_time)
1124{
1125 int ret;
1126 int fd;
1127 struct lttcomm_consumer_msg msg;
1128
1129 /* Code flow error. Safety net. */
1130 LTTNG_ASSERT(rsock);
1131 LTTNG_ASSERT(consumer);
1132 LTTNG_ASSERT(consumer_sock);
1133
1134 memset(&msg, 0, sizeof(msg));
1135 /* Bail out if consumer is disabled */
1136 if (!consumer->enabled) {
1137 ret = LTTNG_OK;
1138 goto error;
1139 }
1140
1141 if (type == LTTNG_STREAM_CONTROL) {
1142 char output_path[LTTNG_PATH_MAX] = {};
1143 uint64_t relayd_session_id;
1144
1145 ret = relayd_create_session(rsock,
1146 &relayd_session_id,
1147 session_name,
1148 hostname,
1149 base_path,
1150 session_live_timer,
1151 consumer->snapshot,
1152 session_id,
1153 the_sessiond_uuid,
1154 current_chunk_id,
1155 session_creation_time,
1156 session_name_contains_creation_time,
1157 output_path);
1158 if (ret < 0) {
1159 /* Close the control socket. */
1160 (void) relayd_close(rsock);
1161 goto error;
1162 }
1163 msg.u.relayd_sock.relayd_session_id = relayd_session_id;
1164 DBG("Created session on relay, output path reply: %s", output_path);
1165 }
1166
1167 msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
1168 /*
1169 * Assign network consumer output index using the temporary consumer since
1170 * this call should only be made from within a set_consumer_uri() function
1171 * call in the session daemon.
1172 */
1173 msg.u.relayd_sock.net_index = consumer->net_seq_index;
1174 msg.u.relayd_sock.type = type;
1175 msg.u.relayd_sock.session_id = session_id;
1176 msg.u.relayd_sock.major = rsock->major;
1177 msg.u.relayd_sock.minor = rsock->minor;
1178 msg.u.relayd_sock.relayd_socket_protocol = rsock->sock.proto;
1179
1180 DBG3("Sending relayd sock info to consumer on %d", *consumer_sock->fd_ptr);
1181 ret = consumer_send_msg(consumer_sock, &msg);
1182 if (ret < 0) {
1183 goto error;
1184 }
1185
1186 DBG3("Sending relayd socket file descriptor to consumer");
1187 fd = rsock->sock.fd;
1188 ret = consumer_send_fds(consumer_sock, &fd, 1);
1189 if (ret < 0) {
1190 goto error;
1191 }
1192
1193 DBG2("Consumer relayd socket sent");
1194
1195error:
1196 return ret;
1197}
1198
1199static int
1200consumer_send_pipe(struct consumer_socket *consumer_sock, enum lttng_consumer_command cmd, int pipe)
1201{
1202 int ret;
1203 struct lttcomm_consumer_msg msg;
1204 const char *pipe_name;
1205 const char *command_name;
1206
1207 switch (cmd) {
1208 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1209 pipe_name = "channel monitor";
1210 command_name = "SET_CHANNEL_MONITOR_PIPE";
1211 break;
1212 default:
1213 ERR("Unexpected command received in %s (cmd = %d)", __func__, (int) cmd);
1214 abort();
1215 }
1216
1217 /* Code flow error. Safety net. */
1218
1219 memset(&msg, 0, sizeof(msg));
1220 msg.cmd_type = cmd;
1221
1222 pthread_mutex_lock(consumer_sock->lock);
1223 DBG3("Sending %s command to consumer", command_name);
1224 ret = consumer_send_msg(consumer_sock, &msg);
1225 if (ret < 0) {
1226 goto error;
1227 }
1228
1229 DBG3("Sending %s pipe %d to consumer on socket %d", pipe_name, pipe, *consumer_sock->fd_ptr);
1230 ret = consumer_send_fds(consumer_sock, &pipe, 1);
1231 if (ret < 0) {
1232 goto error;
1233 }
1234
1235 DBG2("%s pipe successfully sent", pipe_name);
1236error:
1237 pthread_mutex_unlock(consumer_sock->lock);
1238 return ret;
1239}
1240
1241int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock, int pipe)
1242{
1243 return consumer_send_pipe(consumer_sock, LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE, pipe);
1244}
1245
1246/*
1247 * Ask the consumer if the data is pending for the specific session id.
1248 * Returns 1 if data is pending, 0 otherwise, or < 0 on error.
1249 */
1250int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consumer)
1251{
1252 int ret;
1253 int32_t ret_code = 0; /* Default is that the data is NOT pending */
1254 struct lttcomm_consumer_msg msg;
1255
1256 LTTNG_ASSERT(consumer);
1257
1258 DBG3("Consumer data pending for id %" PRIu64, session_id);
1259
1260 memset(&msg, 0, sizeof(msg));
1261 msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
1262 msg.u.data_pending.session_id = session_id;
1263
1264 for (auto *socket :
1265 lttng::urcu::lfht_iteration_adapter<consumer_socket,
1266 decltype(consumer_socket::node),
1267 &consumer_socket::node>(*consumer->socks->ht)) {
1268 pthread_mutex_lock(socket->lock);
1269 ret = consumer_socket_send(socket, &msg, sizeof(msg));
1270 if (ret < 0) {
1271 pthread_mutex_unlock(socket->lock);
1272 goto error_unlock;
1273 }
1274
1275 /*
1276 * No need for a recv reply status because the answer to the command is
1277 * the reply status message.
1278 */
1279 ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
1280 if (ret < 0) {
1281 pthread_mutex_unlock(socket->lock);
1282 goto error_unlock;
1283 }
1284
1285 pthread_mutex_unlock(socket->lock);
1286
1287 if (ret_code == 1) {
1288 break;
1289 }
1290 }
1291
1292 DBG("Consumer data is %s pending for session id %" PRIu64,
1293 ret_code == 1 ? "" : "NOT",
1294 session_id);
1295 return ret_code;
1296
1297error_unlock:
1298 return -1;
1299}
1300
1301/*
1302 * Send a flush command to consumer using the given channel key.
1303 *
1304 * Return 0 on success else a negative value.
1305 */
1306int consumer_flush_channel(struct consumer_socket *socket, uint64_t key)
1307{
1308 int ret;
1309 struct lttcomm_consumer_msg msg;
1310
1311 LTTNG_ASSERT(socket);
1312
1313 DBG2("Consumer flush channel key %" PRIu64, key);
1314
1315 memset(&msg, 0, sizeof(msg));
1316 msg.cmd_type = LTTNG_CONSUMER_FLUSH_CHANNEL;
1317 msg.u.flush_channel.key = key;
1318
1319 pthread_mutex_lock(socket->lock);
1320 health_code_update();
1321
1322 ret = consumer_send_msg(socket, &msg);
1323 if (ret < 0) {
1324 goto end;
1325 }
1326
1327end:
1328 health_code_update();
1329 pthread_mutex_unlock(socket->lock);
1330 return ret;
1331}
1332
1333/*
1334 * Send a clear quiescent command to consumer using the given channel key.
1335 *
1336 * Return 0 on success else a negative value.
1337 */
1338int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key)
1339{
1340 int ret;
1341 struct lttcomm_consumer_msg msg;
1342
1343 LTTNG_ASSERT(socket);
1344
1345 DBG2("Consumer clear quiescent channel key %" PRIu64, key);
1346
1347 memset(&msg, 0, sizeof(msg));
1348 msg.cmd_type = LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL;
1349 msg.u.clear_quiescent_channel.key = key;
1350
1351 pthread_mutex_lock(socket->lock);
1352 health_code_update();
1353
1354 ret = consumer_send_msg(socket, &msg);
1355 if (ret < 0) {
1356 goto end;
1357 }
1358
1359end:
1360 health_code_update();
1361 pthread_mutex_unlock(socket->lock);
1362 return ret;
1363}
1364
1365/*
1366 * Send a close metadata command to consumer using the given channel key.
1367 * Called with registry lock held.
1368 *
1369 * Return 0 on success else a negative value.
1370 */
1371int consumer_close_metadata(struct consumer_socket *socket, uint64_t metadata_key)
1372{
1373 int ret;
1374 struct lttcomm_consumer_msg msg;
1375
1376 LTTNG_ASSERT(socket);
1377
1378 DBG2("Consumer close metadata channel key %" PRIu64, metadata_key);
1379
1380 memset(&msg, 0, sizeof(msg));
1381 msg.cmd_type = LTTNG_CONSUMER_CLOSE_METADATA;
1382 msg.u.close_metadata.key = metadata_key;
1383
1384 pthread_mutex_lock(socket->lock);
1385 health_code_update();
1386
1387 ret = consumer_send_msg(socket, &msg);
1388 if (ret < 0) {
1389 goto end;
1390 }
1391
1392end:
1393 health_code_update();
1394 pthread_mutex_unlock(socket->lock);
1395 return ret;
1396}
1397
1398/*
1399 * Send a setup metdata command to consumer using the given channel key.
1400 *
1401 * Return 0 on success else a negative value.
1402 */
1403int consumer_setup_metadata(struct consumer_socket *socket, uint64_t metadata_key)
1404{
1405 int ret;
1406 struct lttcomm_consumer_msg msg;
1407
1408 LTTNG_ASSERT(socket);
1409
1410 DBG2("Consumer setup metadata channel key %" PRIu64, metadata_key);
1411
1412 memset(&msg, 0, sizeof(msg));
1413 msg.cmd_type = LTTNG_CONSUMER_SETUP_METADATA;
1414 msg.u.setup_metadata.key = metadata_key;
1415
1416 pthread_mutex_lock(socket->lock);
1417 health_code_update();
1418
1419 ret = consumer_send_msg(socket, &msg);
1420 if (ret < 0) {
1421 goto end;
1422 }
1423
1424end:
1425 health_code_update();
1426 pthread_mutex_unlock(socket->lock);
1427 return ret;
1428}
1429
1430/*
1431 * Send metadata string to consumer.
1432 * RCU read-side lock must be held to guarantee existence of socket.
1433 *
1434 * Return 0 on success else a negative value.
1435 */
1436int consumer_push_metadata(struct consumer_socket *socket,
1437 uint64_t metadata_key,
1438 char *metadata_str,
1439 size_t len,
1440 size_t target_offset,
1441 uint64_t version)
1442{
1443 int ret;
1444 struct lttcomm_consumer_msg msg;
1445
1446 LTTNG_ASSERT(socket);
1447 ASSERT_RCU_READ_LOCKED();
1448
1449 DBG2("Consumer push metadata to consumer socket %d", *socket->fd_ptr);
1450
1451 pthread_mutex_lock(socket->lock);
1452
1453 memset(&msg, 0, sizeof(msg));
1454 msg.cmd_type = LTTNG_CONSUMER_PUSH_METADATA;
1455 msg.u.push_metadata.key = metadata_key;
1456 msg.u.push_metadata.target_offset = target_offset;
1457 msg.u.push_metadata.len = len;
1458 msg.u.push_metadata.version = version;
1459
1460 health_code_update();
1461 ret = consumer_send_msg(socket, &msg);
1462 if (ret < 0 || len == 0) {
1463 goto end;
1464 }
1465
1466 DBG3("Consumer pushing metadata on sock %d of len %zu", *socket->fd_ptr, len);
1467
1468 ret = consumer_socket_send(socket, metadata_str, len);
1469 if (ret < 0) {
1470 goto end;
1471 }
1472
1473 health_code_update();
1474 ret = consumer_recv_status_reply(socket);
1475 if (ret < 0) {
1476 goto end;
1477 }
1478
1479end:
1480 pthread_mutex_unlock(socket->lock);
1481 health_code_update();
1482 return ret;
1483}
1484
1485/*
1486 * Ask the consumer to snapshot a specific channel using the key.
1487 *
1488 * Returns LTTNG_OK on success or else an LTTng error code.
1489 */
1490enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket,
1491 uint64_t key,
1492 const struct consumer_output *output,
1493 int metadata,
1494 const char *channel_path,
1495 uint64_t nb_packets_per_stream)
1496{
1497 int ret;
1498 enum lttng_error_code status = LTTNG_OK;
1499 struct lttcomm_consumer_msg msg;
1500
1501 LTTNG_ASSERT(socket);
1502 LTTNG_ASSERT(output);
1503
1504 DBG("Consumer snapshot channel key %" PRIu64, key);
1505
1506 memset(&msg, 0, sizeof(msg));
1507 msg.cmd_type = LTTNG_CONSUMER_SNAPSHOT_CHANNEL;
1508 msg.u.snapshot_channel.key = key;
1509 msg.u.snapshot_channel.nb_packets_per_stream = nb_packets_per_stream;
1510 msg.u.snapshot_channel.metadata = metadata;
1511
1512 if (output->type == CONSUMER_DST_NET) {
1513 msg.u.snapshot_channel.relayd_id = output->net_seq_index;
1514 msg.u.snapshot_channel.use_relayd = 1;
1515 } else {
1516 msg.u.snapshot_channel.relayd_id = (uint64_t) -1ULL;
1517 }
1518 ret = lttng_strncpy(msg.u.snapshot_channel.pathname,
1519 channel_path,
1520 sizeof(msg.u.snapshot_channel.pathname));
1521 if (ret < 0) {
1522 ERR("Snapshot path exceeds the maximal allowed length of %zu bytes (%zu bytes required) with path \"%s\"",
1523 sizeof(msg.u.snapshot_channel.pathname),
1524 strlen(channel_path),
1525 channel_path);
1526 status = LTTNG_ERR_SNAPSHOT_FAIL;
1527 goto error;
1528 }
1529
1530 health_code_update();
1531 pthread_mutex_lock(socket->lock);
1532 ret = consumer_send_msg(socket, &msg);
1533 pthread_mutex_unlock(socket->lock);
1534 if (ret < 0) {
1535 switch (-ret) {
1536 case LTTCOMM_CONSUMERD_CHAN_NOT_FOUND:
1537 status = LTTNG_ERR_CHAN_NOT_FOUND;
1538 break;
1539 default:
1540 status = LTTNG_ERR_SNAPSHOT_FAIL;
1541 break;
1542 }
1543 goto error;
1544 }
1545
1546error:
1547 health_code_update();
1548 return status;
1549}
1550
1551/*
1552 * Ask the consumer the number of discarded events for a channel.
1553 */
1554int consumer_get_discarded_events(uint64_t session_id,
1555 uint64_t channel_key,
1556 struct consumer_output *consumer,
1557 uint64_t *discarded)
1558{
1559 int ret;
1560 struct lttcomm_consumer_msg msg;
1561
1562 LTTNG_ASSERT(consumer);
1563
1564 DBG3("Consumer discarded events id %" PRIu64, session_id);
1565
1566 memset(&msg, 0, sizeof(msg));
1567 msg.cmd_type = LTTNG_CONSUMER_DISCARDED_EVENTS;
1568 msg.u.discarded_events.session_id = session_id;
1569 msg.u.discarded_events.channel_key = channel_key;
1570
1571 *discarded = 0;
1572
1573 /* Send command for each consumer. */
1574 for (auto *socket :
1575 lttng::urcu::lfht_iteration_adapter<consumer_socket,
1576 decltype(consumer_socket::node),
1577 &consumer_socket::node>(*consumer->socks->ht)) {
1578 uint64_t consumer_discarded = 0;
1579
1580 pthread_mutex_lock(socket->lock);
1581 ret = consumer_socket_send(socket, &msg, sizeof(msg));
1582 if (ret < 0) {
1583 pthread_mutex_unlock(socket->lock);
1584 goto end;
1585 }
1586
1587 /*
1588 * No need for a recv reply status because the answer to the
1589 * command is the reply status message.
1590 */
1591 ret = consumer_socket_recv(socket, &consumer_discarded, sizeof(consumer_discarded));
1592 if (ret < 0) {
1593 ERR("get discarded events");
1594 pthread_mutex_unlock(socket->lock);
1595 goto end;
1596 }
1597
1598 pthread_mutex_unlock(socket->lock);
1599 *discarded += consumer_discarded;
1600 }
1601
1602 ret = 0;
1603 DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64, *discarded, session_id);
1604
1605end:
1606 return ret;
1607}
1608
1609/*
1610 * Ask the consumer the number of lost packets for a channel.
1611 */
1612int consumer_get_lost_packets(uint64_t session_id,
1613 uint64_t channel_key,
1614 struct consumer_output *consumer,
1615 uint64_t *lost)
1616{
1617 int ret;
1618 struct lttcomm_consumer_msg msg;
1619
1620 LTTNG_ASSERT(consumer);
1621
1622 DBG3("Consumer lost packets id %" PRIu64, session_id);
1623
1624 memset(&msg, 0, sizeof(msg));
1625 msg.cmd_type = LTTNG_CONSUMER_LOST_PACKETS;
1626 msg.u.lost_packets.session_id = session_id;
1627 msg.u.lost_packets.channel_key = channel_key;
1628
1629 *lost = 0;
1630
1631 /* Send command for each consumer. */
1632 for (auto *socket :
1633 lttng::urcu::lfht_iteration_adapter<consumer_socket,
1634 decltype(consumer_socket::node),
1635 &consumer_socket::node>(*consumer->socks->ht)) {
1636 uint64_t consumer_lost = 0;
1637 pthread_mutex_lock(socket->lock);
1638 ret = consumer_socket_send(socket, &msg, sizeof(msg));
1639 if (ret < 0) {
1640 pthread_mutex_unlock(socket->lock);
1641 goto end;
1642 }
1643
1644 /*
1645 * No need for a recv reply status because the answer to the
1646 * command is the reply status message.
1647 */
1648 ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost));
1649 if (ret < 0) {
1650 ERR("get lost packets");
1651 pthread_mutex_unlock(socket->lock);
1652 goto end;
1653 }
1654 pthread_mutex_unlock(socket->lock);
1655 *lost += consumer_lost;
1656 }
1657
1658 ret = 0;
1659 DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64, *lost, session_id);
1660
1661end:
1662 return ret;
1663}
1664
1665/*
1666 * Ask the consumer to rotate a channel.
1667 *
1668 * The new_chunk_id is the session->rotate_count that has been incremented
1669 * when the rotation started. On the relay, this allows to keep track in which
1670 * chunk each stream is currently writing to (for the rotate_pending operation).
1671 */
1672int consumer_rotate_channel(struct consumer_socket *socket,
1673 uint64_t key,
1674 struct consumer_output *output,
1675 bool is_metadata_channel)
1676{
1677 int ret;
1678 struct lttcomm_consumer_msg msg;
1679
1680 LTTNG_ASSERT(socket);
1681
1682 DBG("Consumer rotate channel key %" PRIu64, key);
1683
1684 pthread_mutex_lock(socket->lock);
1685 memset(&msg, 0, sizeof(msg));
1686 msg.cmd_type = LTTNG_CONSUMER_ROTATE_CHANNEL;
1687 msg.u.rotate_channel.key = key;
1688 msg.u.rotate_channel.metadata = !!is_metadata_channel;
1689
1690 if (output->type == CONSUMER_DST_NET) {
1691 msg.u.rotate_channel.relayd_id = output->net_seq_index;
1692 } else {
1693 msg.u.rotate_channel.relayd_id = (uint64_t) -1ULL;
1694 }
1695
1696 health_code_update();
1697 ret = consumer_send_msg(socket, &msg);
1698 if (ret < 0) {
1699 switch (-ret) {
1700 case LTTCOMM_CONSUMERD_CHAN_NOT_FOUND:
1701 ret = -LTTNG_ERR_CHAN_NOT_FOUND;
1702 break;
1703 default:
1704 ret = -LTTNG_ERR_ROTATION_FAIL_CONSUMER;
1705 break;
1706 }
1707 goto error;
1708 }
1709error:
1710 pthread_mutex_unlock(socket->lock);
1711 health_code_update();
1712 return ret;
1713}
1714
1715int consumer_open_channel_packets(struct consumer_socket *socket, uint64_t key)
1716{
1717 int ret;
1718 lttcomm_consumer_msg msg = {
1719 .cmd_type = LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS,
1720 .u = {},
1721 };
1722 msg.u.open_channel_packets.key = key;
1723
1724 LTTNG_ASSERT(socket);
1725
1726 DBG("Consumer open channel packets: channel key = %" PRIu64, key);
1727
1728 health_code_update();
1729
1730 pthread_mutex_lock(socket->lock);
1731 ret = consumer_send_msg(socket, &msg);
1732 pthread_mutex_unlock(socket->lock);
1733 if (ret < 0) {
1734 goto error_socket;
1735 }
1736
1737error_socket:
1738 health_code_update();
1739 return ret;
1740}
1741
1742int consumer_clear_channel(struct consumer_socket *socket, uint64_t key)
1743{
1744 int ret;
1745 struct lttcomm_consumer_msg msg;
1746
1747 LTTNG_ASSERT(socket);
1748
1749 DBG("Consumer clear channel %" PRIu64, key);
1750
1751 memset(&msg, 0, sizeof(msg));
1752 msg.cmd_type = LTTNG_CONSUMER_CLEAR_CHANNEL;
1753 msg.u.clear_channel.key = key;
1754
1755 health_code_update();
1756
1757 pthread_mutex_lock(socket->lock);
1758 ret = consumer_send_msg(socket, &msg);
1759 if (ret < 0) {
1760 goto error_socket;
1761 }
1762
1763error_socket:
1764 pthread_mutex_unlock(socket->lock);
1765
1766 health_code_update();
1767 return ret;
1768}
1769
1770int consumer_init(struct consumer_socket *socket, const lttng_uuid& sessiond_uuid)
1771{
1772 int ret;
1773 struct lttcomm_consumer_msg msg = {
1774 .cmd_type = LTTNG_CONSUMER_INIT,
1775 .u = {},
1776 };
1777
1778 LTTNG_ASSERT(socket);
1779
1780 DBG("Sending consumer initialization command");
1781 std::copy(sessiond_uuid.begin(), sessiond_uuid.end(), msg.u.init.sessiond_uuid);
1782
1783 health_code_update();
1784 ret = consumer_send_msg(socket, &msg);
1785 if (ret < 0) {
1786 goto error;
1787 }
1788
1789error:
1790 health_code_update();
1791 return ret;
1792}
1793
1794/*
1795 * Ask the consumer to create a new chunk for a given session.
1796 *
1797 * Called with the consumer socket lock held.
1798 */
1799int consumer_create_trace_chunk(struct consumer_socket *socket,
1800 uint64_t relayd_id,
1801 uint64_t session_id,
1802 struct lttng_trace_chunk *chunk,
1803 const char *domain_subdir)
1804{
1805 int ret;
1806 enum lttng_trace_chunk_status chunk_status;
1807 struct lttng_credentials chunk_credentials;
1808 const struct lttng_directory_handle *chunk_directory_handle = nullptr;
1809 struct lttng_directory_handle *domain_handle = nullptr;
1810 int domain_dirfd;
1811 const char *chunk_name;
1812 bool chunk_name_overridden;
1813 uint64_t chunk_id;
1814 time_t creation_timestamp;
1815 char creation_timestamp_buffer[ISO8601_STR_LEN];
1816 const char *creation_timestamp_str = "(none)";
1817 const bool chunk_has_local_output = relayd_id == -1ULL;
1818 enum lttng_trace_chunk_status tc_status;
1819 struct lttcomm_consumer_msg msg = {
1820 .cmd_type = LTTNG_CONSUMER_CREATE_TRACE_CHUNK,
1821 .u = {},
1822 };
1823 msg.u.create_trace_chunk.session_id = session_id;
1824
1825 LTTNG_ASSERT(socket);
1826 LTTNG_ASSERT(chunk);
1827
1828 if (relayd_id != -1ULL) {
1829 LTTNG_OPTIONAL_SET(&msg.u.create_trace_chunk.relayd_id, relayd_id);
1830 }
1831
1832 chunk_status = lttng_trace_chunk_get_name(chunk, &chunk_name, &chunk_name_overridden);
1833 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK &&
1834 chunk_status != LTTNG_TRACE_CHUNK_STATUS_NONE) {
1835 ERR("Failed to get name of trace chunk");
1836 ret = -LTTNG_ERR_FATAL;
1837 goto error;
1838 }
1839 if (chunk_name_overridden) {
1840 ret = lttng_strncpy(msg.u.create_trace_chunk.override_name,
1841 chunk_name,
1842 sizeof(msg.u.create_trace_chunk.override_name));
1843 if (ret) {
1844 ERR("Trace chunk name \"%s\" exceeds the maximal length allowed by the consumer protocol",
1845 chunk_name);
1846 ret = -LTTNG_ERR_FATAL;
1847 goto error;
1848 }
1849 }
1850
1851 chunk_status = lttng_trace_chunk_get_creation_timestamp(chunk, &creation_timestamp);
1852 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1853 ret = -LTTNG_ERR_FATAL;
1854 goto error;
1855 }
1856 msg.u.create_trace_chunk.creation_timestamp = (uint64_t) creation_timestamp;
1857 /* Only used for logging purposes. */
1858 ret = time_to_iso8601_str(
1859 creation_timestamp, creation_timestamp_buffer, sizeof(creation_timestamp_buffer));
1860 creation_timestamp_str = !ret ? creation_timestamp_buffer : "(formatting error)";
1861
1862 chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1863 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1864 /*
1865 * Anonymous trace chunks should never be transmitted
1866 * to remote peers (consumerd and relayd). They are used
1867 * internally for backward-compatibility purposes.
1868 */
1869 ret = -LTTNG_ERR_FATAL;
1870 goto error;
1871 }
1872 msg.u.create_trace_chunk.chunk_id = chunk_id;
1873
1874 if (chunk_has_local_output) {
1875 chunk_status = lttng_trace_chunk_borrow_chunk_directory_handle(
1876 chunk, &chunk_directory_handle);
1877 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1878 ret = -LTTNG_ERR_FATAL;
1879 goto error;
1880 }
1881 chunk_status = lttng_trace_chunk_get_credentials(chunk, &chunk_credentials);
1882 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1883 /*
1884 * Not associating credentials to a sessiond chunk is a
1885 * fatal internal error.
1886 */
1887 ret = -LTTNG_ERR_FATAL;
1888 goto error;
1889 }
1890 tc_status = lttng_trace_chunk_create_subdirectory(chunk, domain_subdir);
1891 if (tc_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1892 PERROR("Failed to create chunk domain output directory \"%s\"",
1893 domain_subdir);
1894 ret = -LTTNG_ERR_FATAL;
1895 goto error;
1896 }
1897 domain_handle = lttng_directory_handle_create_from_handle(domain_subdir,
1898 chunk_directory_handle);
1899 if (!domain_handle) {
1900 ret = -LTTNG_ERR_FATAL;
1901 goto error;
1902 }
1903
1904 /*
1905 * This will only compile on platforms that support
1906 * dirfd (POSIX.2008). This is fine as the session daemon
1907 * is only built for such platforms.
1908 *
1909 * The ownership of the chunk directory handle's is maintained
1910 * by the trace chunk.
1911 */
1912 domain_dirfd = lttng_directory_handle_get_dirfd(domain_handle);
1913 LTTNG_ASSERT(domain_dirfd >= 0);
1914
1915 msg.u.create_trace_chunk.credentials.value.uid =
1916 lttng_credentials_get_uid(&chunk_credentials);
1917 msg.u.create_trace_chunk.credentials.value.gid =
1918 lttng_credentials_get_gid(&chunk_credentials);
1919 msg.u.create_trace_chunk.credentials.is_set = 1;
1920 }
1921
1922 DBG("Sending consumer create trace chunk command: relayd_id = %" PRId64
1923 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", creation_timestamp = %s",
1924 relayd_id,
1925 session_id,
1926 chunk_id,
1927 creation_timestamp_str);
1928 health_code_update();
1929 ret = consumer_send_msg(socket, &msg);
1930 health_code_update();
1931 if (ret < 0) {
1932 ERR("Trace chunk creation error on consumer");
1933 ret = -LTTNG_ERR_CREATE_TRACE_CHUNK_FAIL_CONSUMER;
1934 goto error;
1935 }
1936
1937 if (chunk_has_local_output) {
1938 DBG("Sending trace chunk domain directory fd to consumer");
1939 health_code_update();
1940 ret = consumer_send_fds(socket, &domain_dirfd, 1);
1941 health_code_update();
1942 if (ret < 0) {
1943 ERR("Trace chunk creation error on consumer");
1944 ret = -LTTNG_ERR_CREATE_TRACE_CHUNK_FAIL_CONSUMER;
1945 goto error;
1946 }
1947 }
1948error:
1949 lttng_directory_handle_put(domain_handle);
1950 return ret;
1951}
1952
1953/*
1954 * Ask the consumer to close a trace chunk for a given session.
1955 *
1956 * Called with the consumer socket lock held.
1957 */
1958int consumer_close_trace_chunk(struct consumer_socket *socket,
1959 uint64_t relayd_id,
1960 uint64_t session_id,
1961 struct lttng_trace_chunk *chunk,
1962 char *closed_trace_chunk_path)
1963{
1964 int ret;
1965 enum lttng_trace_chunk_status chunk_status;
1966 lttcomm_consumer_msg msg = {
1967 .cmd_type = LTTNG_CONSUMER_CLOSE_TRACE_CHUNK,
1968 .u = {},
1969 };
1970 msg.u.close_trace_chunk.session_id = session_id;
1971
1972 struct lttcomm_consumer_close_trace_chunk_reply reply;
1973 uint64_t chunk_id;
1974 time_t close_timestamp;
1975 enum lttng_trace_chunk_command_type close_command;
1976 const char *close_command_name = "none";
1977 struct lttng_dynamic_buffer path_reception_buffer;
1978
1979 LTTNG_ASSERT(socket);
1980 lttng_dynamic_buffer_init(&path_reception_buffer);
1981
1982 if (relayd_id != -1ULL) {
1983 LTTNG_OPTIONAL_SET(&msg.u.close_trace_chunk.relayd_id, relayd_id);
1984 }
1985
1986 chunk_status = lttng_trace_chunk_get_close_command(chunk, &close_command);
1987 switch (chunk_status) {
1988 case LTTNG_TRACE_CHUNK_STATUS_OK:
1989 LTTNG_OPTIONAL_SET(&msg.u.close_trace_chunk.close_command,
1990 (uint32_t) close_command);
1991 break;
1992 case LTTNG_TRACE_CHUNK_STATUS_NONE:
1993 break;
1994 default:
1995 ERR("Failed to get trace chunk close command");
1996 ret = -1;
1997 goto error;
1998 }
1999
2000 chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id);
2001 /*
2002 * Anonymous trace chunks should never be transmitted to remote peers
2003 * (consumerd and relayd). They are used internally for
2004 * backward-compatibility purposes.
2005 */
2006 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
2007 msg.u.close_trace_chunk.chunk_id = chunk_id;
2008
2009 chunk_status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp);
2010 /*
2011 * A trace chunk should be closed locally before being closed remotely.
2012 * Otherwise, the close timestamp would never be transmitted to the
2013 * peers.
2014 */
2015 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
2016 msg.u.close_trace_chunk.close_timestamp = (uint64_t) close_timestamp;
2017
2018 if (msg.u.close_trace_chunk.close_command.is_set) {
2019 close_command_name = lttng_trace_chunk_command_type_get_name(close_command);
2020 }
2021 DBG("Sending consumer close trace chunk command: relayd_id = %" PRId64
2022 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", close command = \"%s\"",
2023 relayd_id,
2024 session_id,
2025 chunk_id,
2026 close_command_name);
2027
2028 health_code_update();
2029 ret = consumer_socket_send(socket, &msg, sizeof(struct lttcomm_consumer_msg));
2030 if (ret < 0) {
2031 ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER;
2032 goto error;
2033 }
2034 ret = consumer_socket_recv(socket, &reply, sizeof(reply));
2035 if (ret < 0) {
2036 ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER;
2037 goto error;
2038 }
2039 if (reply.path_length >= LTTNG_PATH_MAX) {
2040 ERR("Invalid path returned by relay daemon: %" PRIu32
2041 "bytes exceeds maximal allowed length of %d bytes",
2042 reply.path_length,
2043 LTTNG_PATH_MAX);
2044 ret = -LTTNG_ERR_INVALID_PROTOCOL;
2045 goto error;
2046 }
2047 ret = lttng_dynamic_buffer_set_size(&path_reception_buffer, reply.path_length);
2048 if (ret) {
2049 ERR("Failed to allocate reception buffer of path returned by the \"close trace chunk\" command");
2050 ret = -LTTNG_ERR_NOMEM;
2051 goto error;
2052 }
2053 ret = consumer_socket_recv(socket, path_reception_buffer.data, path_reception_buffer.size);
2054 if (ret < 0) {
2055 ERR("Communication error while receiving path of closed trace chunk");
2056 ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER;
2057 goto error;
2058 }
2059 if (path_reception_buffer.data[path_reception_buffer.size - 1] != '\0') {
2060 ERR("Invalid path returned by relay daemon: not null-terminated");
2061 ret = -LTTNG_ERR_INVALID_PROTOCOL;
2062 goto error;
2063 }
2064 if (closed_trace_chunk_path) {
2065 /*
2066 * closed_trace_chunk_path is assumed to have a length >=
2067 * LTTNG_PATH_MAX
2068 */
2069 memcpy(closed_trace_chunk_path,
2070 path_reception_buffer.data,
2071 path_reception_buffer.size);
2072 }
2073error:
2074 lttng_dynamic_buffer_reset(&path_reception_buffer);
2075 health_code_update();
2076 return ret;
2077}
2078
2079/*
2080 * Ask the consumer if a trace chunk exists.
2081 *
2082 * Called with the consumer socket lock held.
2083 * Returns 0 on success, or a negative value on error.
2084 */
2085int consumer_trace_chunk_exists(struct consumer_socket *socket,
2086 uint64_t relayd_id,
2087 uint64_t session_id,
2088 struct lttng_trace_chunk *chunk,
2089 enum consumer_trace_chunk_exists_status *result)
2090{
2091 int ret;
2092 enum lttng_trace_chunk_status chunk_status;
2093 lttcomm_consumer_msg msg = {
2094 .cmd_type = LTTNG_CONSUMER_TRACE_CHUNK_EXISTS,
2095 .u = {},
2096 };
2097 msg.u.trace_chunk_exists.session_id = session_id;
2098
2099 uint64_t chunk_id;
2100 const char *consumer_reply_str;
2101
2102 LTTNG_ASSERT(socket);
2103
2104 if (relayd_id != -1ULL) {
2105 LTTNG_OPTIONAL_SET(&msg.u.trace_chunk_exists.relayd_id, relayd_id);
2106 }
2107
2108 chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id);
2109 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
2110 /*
2111 * Anonymous trace chunks should never be transmitted
2112 * to remote peers (consumerd and relayd). They are used
2113 * internally for backward-compatibility purposes.
2114 */
2115 ret = -LTTNG_ERR_FATAL;
2116 goto error;
2117 }
2118 msg.u.trace_chunk_exists.chunk_id = chunk_id;
2119
2120 DBG("Sending consumer trace chunk exists command: relayd_id = %" PRId64
2121 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64,
2122 relayd_id,
2123 session_id,
2124 chunk_id);
2125
2126 health_code_update();
2127 ret = consumer_send_msg(socket, &msg);
2128 switch (-ret) {
2129 case LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK:
2130 consumer_reply_str = "unknown trace chunk";
2131 *result = CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK;
2132 break;
2133 case LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL:
2134 consumer_reply_str = "trace chunk exists locally";
2135 *result = CONSUMER_TRACE_CHUNK_EXISTS_STATUS_EXISTS_LOCAL;
2136 break;
2137 case LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE:
2138 consumer_reply_str = "trace chunk exists on remote peer";
2139 *result = CONSUMER_TRACE_CHUNK_EXISTS_STATUS_EXISTS_REMOTE;
2140 break;
2141 default:
2142 ERR("Consumer returned an error from TRACE_CHUNK_EXISTS command");
2143 ret = -1;
2144 goto error;
2145 }
2146 DBG("Consumer reply to TRACE_CHUNK_EXISTS command: %s", consumer_reply_str);
2147 ret = 0;
2148error:
2149 health_code_update();
2150 return ret;
2151}
This page took 0.030689 seconds and 5 git commands to generate.