2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * SPDX-License-Identifier: LGPL-2.1-only
8 #include <lttng/notification/notification-internal.h>
9 #include <lttng/notification/channel-internal.h>
10 #include <lttng/condition/condition-internal.h>
11 #include <lttng/endpoint.h>
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/dynamic-buffer.h>
15 #include <common/utils.h>
16 #include <common/defaults.h>
17 #include <common/payload.h>
18 #include <common/payload-view.h>
19 #include <common/unix.h>
21 #include "lttng-ctl-helper.h"
22 #include <common/compat/poll.h>
25 int handshake(struct lttng_notification_channel
*channel
);
28 * Populates the reception buffer with the next complete message.
29 * The caller must acquire the channel's lock.
32 int receive_message(struct lttng_notification_channel
*channel
)
35 struct lttng_notification_channel_message msg
;
37 lttng_payload_clear(&channel
->reception_payload
);
39 ret
= lttcomm_recv_unix_sock(channel
->socket
, &msg
, sizeof(msg
));
45 if (msg
.size
> DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE
) {
50 /* Add message header at buffer's start. */
51 ret
= lttng_dynamic_buffer_append(&channel
->reception_payload
.buffer
, &msg
,
61 /* Reserve space for the payload. */
62 ret
= lttng_dynamic_buffer_set_size(&channel
->reception_payload
.buffer
,
63 channel
->reception_payload
.buffer
.size
+ msg
.size
);
68 /* Receive message payload. */
69 ret
= lttcomm_recv_unix_sock(channel
->socket
,
70 channel
->reception_payload
.buffer
.data
+ sizeof(msg
), msg
.size
);
71 if (ret
< (ssize_t
) msg
.size
) {
77 /* Receive message fds. */
79 ret
= lttcomm_recv_payload_fds_unix_sock(channel
->socket
,
80 msg
.fds
, &channel
->reception_payload
);
81 if (ret
< sizeof(int) * msg
.fds
) {
90 lttng_payload_clear(&channel
->reception_payload
);
95 enum lttng_notification_channel_message_type
get_current_message_type(
96 struct lttng_notification_channel
*channel
)
98 struct lttng_notification_channel_message
*msg
;
100 assert(channel
->reception_payload
.buffer
.size
>= sizeof(*msg
));
102 msg
= (struct lttng_notification_channel_message
*)
103 channel
->reception_payload
.buffer
.data
;
104 return (enum lttng_notification_channel_message_type
) msg
->type
;
108 struct lttng_notification
*create_notification_from_current_message(
109 struct lttng_notification_channel
*channel
)
112 struct lttng_notification
*notification
= NULL
;
114 if (channel
->reception_payload
.buffer
.size
<=
115 sizeof(struct lttng_notification_channel_message
)) {
120 struct lttng_payload_view view
= lttng_payload_view_from_payload(
121 &channel
->reception_payload
,
122 sizeof(struct lttng_notification_channel_message
),
125 ret
= lttng_notification_create_from_payload(
126 &view
, ¬ification
);
129 if (ret
!= channel
->reception_payload
.buffer
.size
-
130 sizeof(struct lttng_notification_channel_message
)) {
131 lttng_notification_destroy(notification
);
139 struct lttng_notification_channel
*lttng_notification_channel_create(
140 struct lttng_endpoint
*endpoint
)
143 bool is_in_tracing_group
= false, is_root
= false;
144 char *sock_path
= NULL
;
145 struct lttng_notification_channel
*channel
= NULL
;
148 endpoint
!= lttng_session_daemon_notification_endpoint
) {
152 sock_path
= zmalloc(LTTNG_PATH_MAX
);
157 channel
= zmalloc(sizeof(struct lttng_notification_channel
));
161 channel
->socket
= -1;
162 pthread_mutex_init(&channel
->lock
, NULL
);
163 lttng_payload_init(&channel
->reception_payload
);
164 CDS_INIT_LIST_HEAD(&channel
->pending_notifications
.list
);
166 is_root
= (getuid() == 0);
168 is_in_tracing_group
= lttng_check_tracing_group();
171 if (is_root
|| is_in_tracing_group
) {
172 ret
= lttng_strncpy(sock_path
,
173 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK
,
176 ret
= -LTTNG_ERR_INVALID
;
180 ret
= lttcomm_connect_unix_sock(sock_path
);
187 /* Fallback to local session daemon. */
188 ret
= snprintf(sock_path
, LTTNG_PATH_MAX
,
189 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK
,
190 utils_get_home_dir());
191 if (ret
< 0 || ret
>= LTTNG_PATH_MAX
) {
195 ret
= lttcomm_connect_unix_sock(sock_path
);
202 channel
->socket
= fd
;
204 ret
= handshake(channel
);
212 lttng_notification_channel_destroy(channel
);
217 enum lttng_notification_channel_status
218 lttng_notification_channel_get_next_notification(
219 struct lttng_notification_channel
*channel
,
220 struct lttng_notification
**_notification
)
223 struct lttng_notification
*notification
= NULL
;
224 enum lttng_notification_channel_status status
=
225 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
226 struct lttng_poll_event events
;
228 if (!channel
|| !_notification
) {
229 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
233 pthread_mutex_lock(&channel
->lock
);
235 if (channel
->pending_notifications
.count
) {
236 struct pending_notification
*pending_notification
;
238 assert(!cds_list_empty(&channel
->pending_notifications
.list
));
240 /* Deliver one of the pending notifications. */
241 pending_notification
= cds_list_first_entry(
242 &channel
->pending_notifications
.list
,
243 struct pending_notification
,
245 notification
= pending_notification
->notification
;
247 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
249 cds_list_del(&pending_notification
->node
);
250 channel
->pending_notifications
.count
--;
251 free(pending_notification
);
256 * Block on interruptible epoll/poll() instead of the message reception
257 * itself as the recvmsg() wrappers always restart on EINTR. We choose
258 * to wait using interruptible epoll/poll() in order to:
259 * 1) Return if a signal occurs,
260 * 2) Not deal with partially received messages.
262 * The drawback to this approach is that we assume that messages
263 * are complete/well formed. If a message is shorter than its
264 * announced length, receive_message() will block on recvmsg()
265 * and never return (even if a signal is received).
267 ret
= lttng_poll_create(&events
, 1, LTTNG_CLOEXEC
);
269 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
272 ret
= lttng_poll_add(&events
, channel
->socket
, LPOLLIN
| LPOLLERR
);
274 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
277 ret
= lttng_poll_wait_interruptible(&events
, -1);
279 status
= (ret
== -1 && errno
== EINTR
) ?
280 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED
:
281 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
285 ret
= receive_message(channel
);
287 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
291 switch (get_current_message_type(channel
)) {
292 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
293 notification
= create_notification_from_current_message(
296 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
300 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
301 /* No payload to consume. */
302 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
305 /* Protocol error. */
306 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
311 lttng_poll_clean(&events
);
313 pthread_mutex_unlock(&channel
->lock
);
314 *_notification
= notification
;
320 int enqueue_dropped_notification(
321 struct lttng_notification_channel
*channel
)
324 struct pending_notification
*pending_notification
;
325 struct cds_list_head
*last_element
=
326 channel
->pending_notifications
.list
.prev
;
328 pending_notification
= caa_container_of(last_element
,
329 struct pending_notification
, node
);
330 if (!pending_notification
->notification
) {
332 * The last enqueued notification indicates dropped
333 * notifications; there is nothing to do as we group
334 * dropped notifications together.
339 if (channel
->pending_notifications
.count
>=
340 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
&&
341 pending_notification
->notification
) {
343 * Discard the last enqueued notification to indicate
344 * that notifications were dropped at this point.
346 lttng_notification_destroy(
347 pending_notification
->notification
);
348 pending_notification
->notification
= NULL
;
352 pending_notification
= zmalloc(sizeof(*pending_notification
));
353 if (!pending_notification
) {
357 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
358 cds_list_add(&pending_notification
->node
,
359 &channel
->pending_notifications
.list
);
360 channel
->pending_notifications
.count
++;
366 int enqueue_notification_from_current_message(
367 struct lttng_notification_channel
*channel
)
370 struct lttng_notification
*notification
;
371 struct pending_notification
*pending_notification
;
373 if (channel
->pending_notifications
.count
>=
374 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
) {
375 /* Drop the notification. */
376 ret
= enqueue_dropped_notification(channel
);
380 pending_notification
= zmalloc(sizeof(*pending_notification
));
381 if (!pending_notification
) {
385 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
387 notification
= create_notification_from_current_message(channel
);
393 pending_notification
->notification
= notification
;
394 cds_list_add(&pending_notification
->node
,
395 &channel
->pending_notifications
.list
);
396 channel
->pending_notifications
.count
++;
400 free(pending_notification
);
404 enum lttng_notification_channel_status
405 lttng_notification_channel_has_pending_notification(
406 struct lttng_notification_channel
*channel
,
407 bool *_notification_pending
)
410 enum lttng_notification_channel_status status
=
411 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
412 struct lttng_poll_event events
;
414 if (!channel
|| !_notification_pending
) {
415 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
419 pthread_mutex_lock(&channel
->lock
);
421 if (channel
->pending_notifications
.count
) {
422 *_notification_pending
= true;
426 if (channel
->socket
< 0) {
427 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED
;
432 * Check, without blocking, if data is available on the channel's
433 * socket. If there is data available, it is safe to read (blocking)
434 * on the socket for a message from the session daemon.
436 * Since all commands wait for the session daemon's reply before
437 * releasing the channel's lock, the protocol only allows for
438 * notifications and "notification dropped" messages to come
439 * through. If we receive a different message type, it is
440 * considered a protocol error.
442 * Note that this function is not guaranteed not to block. This
443 * will block until our peer (the session daemon) has sent a complete
444 * message if we see data available on the socket. If the peer does
445 * not respect the protocol, this may block indefinitely.
447 ret
= lttng_poll_create(&events
, 1, LTTNG_CLOEXEC
);
449 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
452 ret
= lttng_poll_add(&events
, channel
->socket
, LPOLLIN
| LPOLLERR
);
454 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
457 /* timeout = 0: return immediately. */
458 ret
= lttng_poll_wait_interruptible(&events
, 0);
460 /* No data available. */
461 *_notification_pending
= false;
463 } else if (ret
< 0) {
464 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
468 /* Data available on socket. */
469 ret
= receive_message(channel
);
471 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
475 switch (get_current_message_type(channel
)) {
476 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
477 ret
= enqueue_notification_from_current_message(channel
);
481 *_notification_pending
= true;
483 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
484 ret
= enqueue_dropped_notification(channel
);
488 *_notification_pending
= true;
491 /* Protocol error. */
492 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
497 lttng_poll_clean(&events
);
499 pthread_mutex_unlock(&channel
->lock
);
505 int receive_command_reply(struct lttng_notification_channel
*channel
,
506 enum lttng_notification_channel_status
*status
)
509 struct lttng_notification_channel_command_reply
*reply
;
512 enum lttng_notification_channel_message_type msg_type
;
514 ret
= receive_message(channel
);
519 msg_type
= get_current_message_type(channel
);
521 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY
:
523 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
524 ret
= enqueue_notification_from_current_message(
530 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
531 ret
= enqueue_dropped_notification(channel
);
536 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
538 struct lttng_notification_channel_command_handshake
*handshake
;
540 handshake
= (struct lttng_notification_channel_command_handshake
*)
541 (channel
->reception_payload
.buffer
.data
+
542 sizeof(struct lttng_notification_channel_message
));
543 channel
->version
.major
= handshake
->major
;
544 channel
->version
.minor
= handshake
->minor
;
545 channel
->version
.set
= true;
555 if (channel
->reception_payload
.buffer
.size
<
556 (sizeof(struct lttng_notification_channel_message
) +
558 /* Invalid message received. */
563 reply
= (struct lttng_notification_channel_command_reply
*)
564 (channel
->reception_payload
.buffer
.data
+
565 sizeof(struct lttng_notification_channel_message
));
566 *status
= (enum lttng_notification_channel_status
) reply
->status
;
572 int handshake(struct lttng_notification_channel
*channel
)
575 enum lttng_notification_channel_status status
=
576 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
577 struct lttng_notification_channel_command_handshake handshake
= {
578 .major
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
,
579 .minor
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR
,
581 struct lttng_notification_channel_message msg_header
= {
582 .type
= LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
,
583 .size
= sizeof(handshake
),
585 char send_buffer
[sizeof(msg_header
) + sizeof(handshake
)];
587 memcpy(send_buffer
, &msg_header
, sizeof(msg_header
));
588 memcpy(send_buffer
+ sizeof(msg_header
), &handshake
, sizeof(handshake
));
590 pthread_mutex_lock(&channel
->lock
);
592 ret
= lttcomm_send_creds_unix_sock(channel
->socket
, send_buffer
,
593 sizeof(send_buffer
));
598 /* Receive handshake info from the sessiond. */
599 ret
= receive_command_reply(channel
, &status
);
604 if (!channel
->version
.set
) {
609 if (channel
->version
.major
!= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
) {
615 pthread_mutex_unlock(&channel
->lock
);
620 enum lttng_notification_channel_status
send_condition_command(
621 struct lttng_notification_channel
*channel
,
622 enum lttng_notification_channel_message_type type
,
623 const struct lttng_condition
*condition
)
627 enum lttng_notification_channel_status status
=
628 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
629 struct lttng_payload payload
;
630 struct lttng_notification_channel_message cmd_header
= {
631 .type
= (int8_t) type
,
634 lttng_payload_init(&payload
);
637 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
641 assert(type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
||
642 type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
);
644 pthread_mutex_lock(&channel
->lock
);
645 socket
= channel
->socket
;
647 if (!lttng_condition_validate(condition
)) {
648 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
652 ret
= lttng_dynamic_buffer_append(&payload
.buffer
, &cmd_header
,
655 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
659 ret
= lttng_condition_serialize(condition
, &payload
);
661 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
665 /* Update payload length. */
666 ((struct lttng_notification_channel_message
*) payload
.buffer
.data
)->size
=
667 (uint32_t) (payload
.buffer
.size
- sizeof(cmd_header
));
670 struct lttng_payload_view pv
=
671 lttng_payload_view_from_payload(
674 lttng_payload_view_get_fd_handle_count(&pv
);
676 /* Update fd count. */
677 ((struct lttng_notification_channel_message
*) payload
.buffer
.data
)->fds
=
680 ret
= lttcomm_send_unix_sock(
681 socket
, pv
.buffer
.data
, pv
.buffer
.size
);
683 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
687 /* Pass fds if present. */
689 ret
= lttcomm_send_payload_view_fds_unix_sock(socket
,
692 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
698 ret
= receive_command_reply(channel
, &status
);
700 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
704 pthread_mutex_unlock(&channel
->lock
);
706 lttng_payload_reset(&payload
);
710 enum lttng_notification_channel_status
lttng_notification_channel_subscribe(
711 struct lttng_notification_channel
*channel
,
712 const struct lttng_condition
*condition
)
714 return send_condition_command(channel
,
715 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
,
719 enum lttng_notification_channel_status
lttng_notification_channel_unsubscribe(
720 struct lttng_notification_channel
*channel
,
721 const struct lttng_condition
*condition
)
723 return send_condition_command(channel
,
724 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
,
728 void lttng_notification_channel_destroy(
729 struct lttng_notification_channel
*channel
)
735 if (channel
->socket
>= 0) {
736 (void) lttcomm_close_unix_sock(channel
->socket
);
738 pthread_mutex_destroy(&channel
->lock
);
739 lttng_payload_reset(&channel
->reception_payload
);