2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * SPDX-License-Identifier: LGPL-2.1-only
8 #include <lttng/notification/notification-internal.h>
9 #include <lttng/notification/channel-internal.h>
10 #include <lttng/condition/condition-internal.h>
11 #include <lttng/endpoint.h>
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/dynamic-buffer.h>
15 #include <common/utils.h>
16 #include <common/defaults.h>
18 #include "lttng-ctl-helper.h"
19 #include <common/compat/poll.h>
22 int handshake(struct lttng_notification_channel
*channel
);
25 * Populates the reception buffer with the next complete message.
26 * The caller must acquire the channel's lock.
29 int receive_message(struct lttng_notification_channel
*channel
)
32 struct lttng_notification_channel_message msg
;
34 if (lttng_dynamic_buffer_set_size(&channel
->reception_buffer
, 0)) {
39 ret
= lttcomm_recv_unix_sock(channel
->socket
, &msg
, sizeof(msg
));
45 if (msg
.size
> DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE
) {
50 /* Add message header at buffer's start. */
51 ret
= lttng_dynamic_buffer_append(&channel
->reception_buffer
, &msg
,
57 /* Reserve space for the payload. */
58 ret
= lttng_dynamic_buffer_set_size(&channel
->reception_buffer
,
59 channel
->reception_buffer
.size
+ msg
.size
);
64 /* Receive message payload. */
65 ret
= lttcomm_recv_unix_sock(channel
->socket
,
66 channel
->reception_buffer
.data
+ sizeof(msg
), msg
.size
);
67 if (ret
< (ssize_t
) msg
.size
) {
75 if (lttng_dynamic_buffer_set_size(&channel
->reception_buffer
, 0)) {
82 enum lttng_notification_channel_message_type
get_current_message_type(
83 struct lttng_notification_channel
*channel
)
85 struct lttng_notification_channel_message
*msg
;
87 assert(channel
->reception_buffer
.size
>= sizeof(*msg
));
89 msg
= (struct lttng_notification_channel_message
*)
90 channel
->reception_buffer
.data
;
91 return (enum lttng_notification_channel_message_type
) msg
->type
;
95 struct lttng_notification
*create_notification_from_current_message(
96 struct lttng_notification_channel
*channel
)
99 struct lttng_notification
*notification
= NULL
;
101 if (channel
->reception_buffer
.size
<=
102 sizeof(struct lttng_notification_channel_message
)) {
107 struct lttng_payload_view view
= lttng_payload_view_from_dynamic_buffer(
108 &channel
->reception_buffer
,
109 sizeof(struct lttng_notification_channel_message
),
112 ret
= lttng_notification_create_from_payload(
113 &view
, ¬ification
);
116 if (ret
!= channel
->reception_buffer
.size
-
117 sizeof(struct lttng_notification_channel_message
)) {
118 lttng_notification_destroy(notification
);
126 struct lttng_notification_channel
*lttng_notification_channel_create(
127 struct lttng_endpoint
*endpoint
)
130 bool is_in_tracing_group
= false, is_root
= false;
131 char *sock_path
= NULL
;
132 struct lttng_notification_channel
*channel
= NULL
;
135 endpoint
!= lttng_session_daemon_notification_endpoint
) {
139 sock_path
= zmalloc(LTTNG_PATH_MAX
);
144 channel
= zmalloc(sizeof(struct lttng_notification_channel
));
148 channel
->socket
= -1;
149 pthread_mutex_init(&channel
->lock
, NULL
);
150 lttng_dynamic_buffer_init(&channel
->reception_buffer
);
151 CDS_INIT_LIST_HEAD(&channel
->pending_notifications
.list
);
153 is_root
= (getuid() == 0);
155 is_in_tracing_group
= lttng_check_tracing_group();
158 if (is_root
|| is_in_tracing_group
) {
159 lttng_ctl_copy_string(sock_path
,
160 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK
,
162 ret
= lttcomm_connect_unix_sock(sock_path
);
169 /* Fallback to local session daemon. */
170 ret
= snprintf(sock_path
, LTTNG_PATH_MAX
,
171 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK
,
172 utils_get_home_dir());
173 if (ret
< 0 || ret
>= LTTNG_PATH_MAX
) {
177 ret
= lttcomm_connect_unix_sock(sock_path
);
184 channel
->socket
= fd
;
186 ret
= handshake(channel
);
194 lttng_notification_channel_destroy(channel
);
199 enum lttng_notification_channel_status
200 lttng_notification_channel_get_next_notification(
201 struct lttng_notification_channel
*channel
,
202 struct lttng_notification
**_notification
)
205 struct lttng_notification
*notification
= NULL
;
206 enum lttng_notification_channel_status status
=
207 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
208 struct lttng_poll_event events
;
210 if (!channel
|| !_notification
) {
211 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
215 pthread_mutex_lock(&channel
->lock
);
217 if (channel
->pending_notifications
.count
) {
218 struct pending_notification
*pending_notification
;
220 assert(!cds_list_empty(&channel
->pending_notifications
.list
));
222 /* Deliver one of the pending notifications. */
223 pending_notification
= cds_list_first_entry(
224 &channel
->pending_notifications
.list
,
225 struct pending_notification
,
227 notification
= pending_notification
->notification
;
229 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
231 cds_list_del(&pending_notification
->node
);
232 channel
->pending_notifications
.count
--;
233 free(pending_notification
);
238 * Block on interruptible epoll/poll() instead of the message reception
239 * itself as the recvmsg() wrappers always restart on EINTR. We choose
240 * to wait using interruptible epoll/poll() in order to:
241 * 1) Return if a signal occurs,
242 * 2) Not deal with partially received messages.
244 * The drawback to this approach is that we assume that messages
245 * are complete/well formed. If a message is shorter than its
246 * announced length, receive_message() will block on recvmsg()
247 * and never return (even if a signal is received).
249 ret
= lttng_poll_create(&events
, 1, LTTNG_CLOEXEC
);
251 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
254 ret
= lttng_poll_add(&events
, channel
->socket
, LPOLLIN
| LPOLLERR
);
256 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
259 ret
= lttng_poll_wait_interruptible(&events
, -1);
261 status
= (ret
== -1 && errno
== EINTR
) ?
262 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED
:
263 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
267 ret
= receive_message(channel
);
269 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
273 switch (get_current_message_type(channel
)) {
274 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
275 notification
= create_notification_from_current_message(
278 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
282 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
283 /* No payload to consume. */
284 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
287 /* Protocol error. */
288 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
293 lttng_poll_clean(&events
);
295 pthread_mutex_unlock(&channel
->lock
);
296 *_notification
= notification
;
302 int enqueue_dropped_notification(
303 struct lttng_notification_channel
*channel
)
306 struct pending_notification
*pending_notification
;
307 struct cds_list_head
*last_element
=
308 channel
->pending_notifications
.list
.prev
;
310 pending_notification
= caa_container_of(last_element
,
311 struct pending_notification
, node
);
312 if (!pending_notification
->notification
) {
314 * The last enqueued notification indicates dropped
315 * notifications; there is nothing to do as we group
316 * dropped notifications together.
321 if (channel
->pending_notifications
.count
>=
322 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
&&
323 pending_notification
->notification
) {
325 * Discard the last enqueued notification to indicate
326 * that notifications were dropped at this point.
328 lttng_notification_destroy(
329 pending_notification
->notification
);
330 pending_notification
->notification
= NULL
;
334 pending_notification
= zmalloc(sizeof(*pending_notification
));
335 if (!pending_notification
) {
339 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
340 cds_list_add(&pending_notification
->node
,
341 &channel
->pending_notifications
.list
);
342 channel
->pending_notifications
.count
++;
348 int enqueue_notification_from_current_message(
349 struct lttng_notification_channel
*channel
)
352 struct lttng_notification
*notification
;
353 struct pending_notification
*pending_notification
;
355 if (channel
->pending_notifications
.count
>=
356 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
) {
357 /* Drop the notification. */
358 ret
= enqueue_dropped_notification(channel
);
362 pending_notification
= zmalloc(sizeof(*pending_notification
));
363 if (!pending_notification
) {
367 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
369 notification
= create_notification_from_current_message(channel
);
375 pending_notification
->notification
= notification
;
376 cds_list_add(&pending_notification
->node
,
377 &channel
->pending_notifications
.list
);
378 channel
->pending_notifications
.count
++;
382 free(pending_notification
);
386 enum lttng_notification_channel_status
387 lttng_notification_channel_has_pending_notification(
388 struct lttng_notification_channel
*channel
,
389 bool *_notification_pending
)
392 enum lttng_notification_channel_status status
=
393 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
394 struct lttng_poll_event events
;
396 if (!channel
|| !_notification_pending
) {
397 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
401 pthread_mutex_lock(&channel
->lock
);
403 if (channel
->pending_notifications
.count
) {
404 *_notification_pending
= true;
408 if (channel
->socket
< 0) {
409 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED
;
414 * Check, without blocking, if data is available on the channel's
415 * socket. If there is data available, it is safe to read (blocking)
416 * on the socket for a message from the session daemon.
418 * Since all commands wait for the session daemon's reply before
419 * releasing the channel's lock, the protocol only allows for
420 * notifications and "notification dropped" messages to come
421 * through. If we receive a different message type, it is
422 * considered a protocol error.
424 * Note that this function is not guaranteed not to block. This
425 * will block until our peer (the session daemon) has sent a complete
426 * message if we see data available on the socket. If the peer does
427 * not respect the protocol, this may block indefinitely.
429 ret
= lttng_poll_create(&events
, 1, LTTNG_CLOEXEC
);
431 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
434 ret
= lttng_poll_add(&events
, channel
->socket
, LPOLLIN
| LPOLLERR
);
436 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
439 /* timeout = 0: return immediately. */
440 ret
= lttng_poll_wait_interruptible(&events
, 0);
442 /* No data available. */
443 *_notification_pending
= false;
445 } else if (ret
< 0) {
446 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
450 /* Data available on socket. */
451 ret
= receive_message(channel
);
453 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
457 switch (get_current_message_type(channel
)) {
458 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
459 ret
= enqueue_notification_from_current_message(channel
);
463 *_notification_pending
= true;
465 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
466 ret
= enqueue_dropped_notification(channel
);
470 *_notification_pending
= true;
473 /* Protocol error. */
474 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
479 lttng_poll_clean(&events
);
481 pthread_mutex_unlock(&channel
->lock
);
487 int receive_command_reply(struct lttng_notification_channel
*channel
,
488 enum lttng_notification_channel_status
*status
)
491 struct lttng_notification_channel_command_reply
*reply
;
494 enum lttng_notification_channel_message_type msg_type
;
496 ret
= receive_message(channel
);
501 msg_type
= get_current_message_type(channel
);
503 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY
:
505 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
506 ret
= enqueue_notification_from_current_message(
512 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
513 ret
= enqueue_dropped_notification(channel
);
518 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
520 struct lttng_notification_channel_command_handshake
*handshake
;
522 handshake
= (struct lttng_notification_channel_command_handshake
*)
523 (channel
->reception_buffer
.data
+
524 sizeof(struct lttng_notification_channel_message
));
525 channel
->version
.major
= handshake
->major
;
526 channel
->version
.minor
= handshake
->minor
;
527 channel
->version
.set
= true;
537 if (channel
->reception_buffer
.size
<
538 (sizeof(struct lttng_notification_channel_message
) +
540 /* Invalid message received. */
545 reply
= (struct lttng_notification_channel_command_reply
*)
546 (channel
->reception_buffer
.data
+
547 sizeof(struct lttng_notification_channel_message
));
548 *status
= (enum lttng_notification_channel_status
) reply
->status
;
554 int handshake(struct lttng_notification_channel
*channel
)
557 enum lttng_notification_channel_status status
=
558 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
559 struct lttng_notification_channel_command_handshake handshake
= {
560 .major
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
,
561 .minor
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR
,
563 struct lttng_notification_channel_message msg_header
= {
564 .type
= LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
,
565 .size
= sizeof(handshake
),
567 char send_buffer
[sizeof(msg_header
) + sizeof(handshake
)];
569 memcpy(send_buffer
, &msg_header
, sizeof(msg_header
));
570 memcpy(send_buffer
+ sizeof(msg_header
), &handshake
, sizeof(handshake
));
572 pthread_mutex_lock(&channel
->lock
);
574 ret
= lttcomm_send_creds_unix_sock(channel
->socket
, send_buffer
,
575 sizeof(send_buffer
));
580 /* Receive handshake info from the sessiond. */
581 ret
= receive_command_reply(channel
, &status
);
586 if (!channel
->version
.set
) {
591 if (channel
->version
.major
!= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
) {
597 pthread_mutex_unlock(&channel
->lock
);
602 enum lttng_notification_channel_status
send_condition_command(
603 struct lttng_notification_channel
*channel
,
604 enum lttng_notification_channel_message_type type
,
605 const struct lttng_condition
*condition
)
609 enum lttng_notification_channel_status status
=
610 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
611 struct lttng_payload payload
;
612 struct lttng_notification_channel_message cmd_header
= {
613 .type
= (int8_t) type
,
616 lttng_payload_init(&payload
);
619 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
623 assert(type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
||
624 type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
);
626 pthread_mutex_lock(&channel
->lock
);
627 socket
= channel
->socket
;
628 if (!lttng_condition_validate(condition
)) {
629 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
633 ret
= lttng_dynamic_buffer_append(&payload
.buffer
, &cmd_header
,
636 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
640 ret
= lttng_condition_serialize(condition
, &payload
);
642 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
646 /* Update payload length. */
647 ((struct lttng_notification_channel_message
*) payload
.buffer
.data
)->size
=
648 (uint32_t) (payload
.buffer
.size
- sizeof(cmd_header
));
650 ret
= lttcomm_send_unix_sock(
651 socket
, payload
.buffer
.data
, payload
.buffer
.size
);
653 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
657 ret
= receive_command_reply(channel
, &status
);
659 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
663 pthread_mutex_unlock(&channel
->lock
);
665 lttng_payload_reset(&payload
);
669 enum lttng_notification_channel_status
lttng_notification_channel_subscribe(
670 struct lttng_notification_channel
*channel
,
671 const struct lttng_condition
*condition
)
673 return send_condition_command(channel
,
674 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
,
678 enum lttng_notification_channel_status
lttng_notification_channel_unsubscribe(
679 struct lttng_notification_channel
*channel
,
680 const struct lttng_condition
*condition
)
682 return send_condition_command(channel
,
683 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
,
687 void lttng_notification_channel_destroy(
688 struct lttng_notification_channel
*channel
)
694 if (channel
->socket
>= 0) {
695 (void) lttcomm_close_unix_sock(channel
->socket
);
697 pthread_mutex_destroy(&channel
->lock
);
698 lttng_dynamic_buffer_reset(&channel
->reception_buffer
);