2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * This library is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU Lesser General Public License, version 2.1 only,
6 * as published by the Free Software Foundation.
8 * This library is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this library; if not, write to the Free Software Foundation,
15 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18 #include <lttng/notification/notification-internal.h>
19 #include <lttng/notification/channel-internal.h>
20 #include <lttng/condition/condition-internal.h>
21 #include <lttng/endpoint.h>
22 #include <common/defaults.h>
23 #include <common/error.h>
24 #include <common/dynamic-buffer.h>
25 #include <common/utils.h>
26 #include <common/defaults.h>
28 #include "lttng-ctl-helper.h"
31 int handshake(struct lttng_notification_channel
*channel
);
34 * Populates the reception buffer with the next complete message.
35 * The caller must acquire the client's lock.
38 int receive_message(struct lttng_notification_channel
*channel
)
41 struct lttng_notification_channel_message msg
;
43 ret
= lttng_dynamic_buffer_set_size(&channel
->reception_buffer
, 0);
48 ret
= lttcomm_recv_unix_sock(channel
->socket
, &msg
, sizeof(msg
));
54 if (msg
.size
> DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE
) {
59 /* Add message header at buffer's start. */
60 ret
= lttng_dynamic_buffer_append(&channel
->reception_buffer
, &msg
,
66 /* Reserve space for the payload. */
67 ret
= lttng_dynamic_buffer_set_size(&channel
->reception_buffer
,
68 channel
->reception_buffer
.size
+ msg
.size
);
73 /* Receive message payload. */
74 ret
= lttcomm_recv_unix_sock(channel
->socket
,
75 channel
->reception_buffer
.data
+ sizeof(msg
), msg
.size
);
76 if (ret
< (ssize_t
) msg
.size
) {
84 if (lttng_dynamic_buffer_set_size(&channel
->reception_buffer
, 0)) {
91 enum lttng_notification_channel_message_type
get_current_message_type(
92 struct lttng_notification_channel
*channel
)
94 struct lttng_notification_channel_message
*msg
;
96 assert(channel
->reception_buffer
.size
>= sizeof(*msg
));
98 msg
= (struct lttng_notification_channel_message
*)
99 channel
->reception_buffer
.data
;
100 return (enum lttng_notification_channel_message_type
) msg
->type
;
104 struct lttng_notification
*create_notification_from_current_message(
105 struct lttng_notification_channel
*channel
)
108 struct lttng_notification
*notification
= NULL
;
109 struct lttng_buffer_view view
;
111 if (channel
->reception_buffer
.size
<=
112 sizeof(struct lttng_notification_channel_message
)) {
116 view
= lttng_buffer_view_from_dynamic_buffer(&channel
->reception_buffer
,
117 sizeof(struct lttng_notification_channel_message
), -1);
119 ret
= lttng_notification_create_from_buffer(&view
, ¬ification
);
120 if (ret
!= channel
->reception_buffer
.size
-
121 sizeof(struct lttng_notification_channel_message
)) {
122 lttng_notification_destroy(notification
);
130 struct lttng_notification_channel
*lttng_notification_channel_create(
131 struct lttng_endpoint
*endpoint
)
134 bool is_in_tracing_group
= false, is_root
= false;
135 char *sock_path
= NULL
;
136 struct lttng_notification_channel
*channel
= NULL
;
139 endpoint
!= lttng_session_daemon_notification_endpoint
) {
143 sock_path
= zmalloc(LTTNG_PATH_MAX
);
148 channel
= zmalloc(sizeof(struct lttng_notification_channel
));
152 channel
->socket
= -1;
153 pthread_mutex_init(&channel
->lock
, NULL
);
154 lttng_dynamic_buffer_init(&channel
->reception_buffer
);
155 CDS_INIT_LIST_HEAD(&channel
->pending_notifications
.list
);
157 is_root
= (getuid() == 0);
159 is_in_tracing_group
= lttng_check_tracing_group();
162 if (is_root
|| is_in_tracing_group
) {
163 lttng_ctl_copy_string(sock_path
,
164 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK
,
166 ret
= lttcomm_connect_unix_sock(sock_path
);
173 /* Fallback to local session daemon. */
174 ret
= snprintf(sock_path
, LTTNG_PATH_MAX
,
175 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK
,
176 utils_get_home_dir());
177 if (ret
< 0 || ret
>= LTTNG_PATH_MAX
) {
181 ret
= lttcomm_connect_unix_sock(sock_path
);
188 channel
->socket
= fd
;
190 ret
= handshake(channel
);
198 lttng_notification_channel_destroy(channel
);
203 enum lttng_notification_channel_status
204 lttng_notification_channel_get_next_notification(
205 struct lttng_notification_channel
*channel
,
206 struct lttng_notification
**_notification
)
209 struct lttng_notification
*notification
= NULL
;
210 enum lttng_notification_channel_status status
=
211 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
213 if (!channel
|| !_notification
) {
214 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
218 if (channel
->pending_notifications
.count
) {
219 struct pending_notification
*pending_notification
;
221 assert(!cds_list_empty(&channel
->pending_notifications
.list
));
223 /* Deliver one of the pending notifications. */
224 pending_notification
= cds_list_first_entry(
225 &channel
->pending_notifications
.list
,
226 struct pending_notification
,
228 notification
= pending_notification
->notification
;
230 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
232 cds_list_del(&pending_notification
->node
);
233 channel
->pending_notifications
.count
--;
234 free(pending_notification
);
238 pthread_mutex_lock(&channel
->lock
);
240 ret
= receive_message(channel
);
242 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
246 switch (get_current_message_type(channel
)) {
247 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
248 notification
= create_notification_from_current_message(
251 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
255 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
256 /* No payload to consume. */
257 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
260 /* Protocol error. */
261 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
266 pthread_mutex_unlock(&channel
->lock
);
269 *_notification
= notification
;
275 int enqueue_dropped_notification(
276 struct lttng_notification_channel
*channel
)
279 struct pending_notification
*pending_notification
;
280 struct cds_list_head
*last_element
=
281 channel
->pending_notifications
.list
.prev
;
283 pending_notification
= caa_container_of(last_element
,
284 struct pending_notification
, node
);
285 if (!pending_notification
->notification
) {
287 * The last enqueued notification indicates dropped
288 * notifications; there is nothing to do as we group
289 * dropped notifications together.
294 if (channel
->pending_notifications
.count
>=
295 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
&&
296 pending_notification
->notification
) {
298 * Discard the last enqueued notification to indicate
299 * that notifications were dropped at this point.
301 lttng_notification_destroy(
302 pending_notification
->notification
);
303 pending_notification
->notification
= NULL
;
307 pending_notification
= zmalloc(sizeof(*pending_notification
));
308 if (!pending_notification
) {
312 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
313 cds_list_add(&pending_notification
->node
,
314 &channel
->pending_notifications
.list
);
315 channel
->pending_notifications
.count
++;
321 int enqueue_notification_from_current_message(
322 struct lttng_notification_channel
*channel
)
325 struct lttng_notification
*notification
;
326 struct pending_notification
*pending_notification
;
328 if (channel
->pending_notifications
.count
>=
329 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
) {
330 /* Drop the notification. */
331 ret
= enqueue_dropped_notification(channel
);
335 pending_notification
= zmalloc(sizeof(*pending_notification
));
336 if (!pending_notification
) {
340 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
342 notification
= create_notification_from_current_message(channel
);
348 pending_notification
->notification
= notification
;
349 cds_list_add(&pending_notification
->node
,
350 &channel
->pending_notifications
.list
);
351 channel
->pending_notifications
.count
++;
355 free(pending_notification
);
360 int receive_command_reply(struct lttng_notification_channel
*channel
,
361 enum lttng_notification_channel_status
*status
)
364 struct lttng_notification_channel_command_reply
*reply
;
367 enum lttng_notification_channel_message_type msg_type
;
369 ret
= receive_message(channel
);
374 msg_type
= get_current_message_type(channel
);
376 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY
:
378 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
379 ret
= enqueue_notification_from_current_message(
385 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
386 ret
= enqueue_dropped_notification(channel
);
391 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
393 struct lttng_notification_channel_command_handshake
*handshake
;
395 handshake
= (struct lttng_notification_channel_command_handshake
*)
396 (channel
->reception_buffer
.data
+
397 sizeof(struct lttng_notification_channel_message
));
398 channel
->version
.major
= handshake
->major
;
399 channel
->version
.minor
= handshake
->minor
;
400 channel
->version
.set
= true;
410 if (channel
->reception_buffer
.size
<
411 (sizeof(struct lttng_notification_channel_message
) +
413 /* Invalid message received. */
418 reply
= (struct lttng_notification_channel_command_reply
*)
419 (channel
->reception_buffer
.data
+
420 sizeof(struct lttng_notification_channel_message
));
421 *status
= (enum lttng_notification_channel_status
) reply
->status
;
427 int handshake(struct lttng_notification_channel
*channel
)
430 enum lttng_notification_channel_status status
=
431 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
432 struct lttng_notification_channel_command_handshake handshake
= {
433 .major
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
,
434 .minor
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR
,
436 struct lttng_notification_channel_message msg_header
= {
437 .type
= LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
,
438 .size
= sizeof(handshake
),
440 char send_buffer
[sizeof(msg_header
) + sizeof(handshake
)];
442 memcpy(send_buffer
, &msg_header
, sizeof(msg_header
));
443 memcpy(send_buffer
+ sizeof(msg_header
), &handshake
, sizeof(handshake
));
445 pthread_mutex_lock(&channel
->lock
);
447 ret
= lttcomm_send_creds_unix_sock(channel
->socket
, send_buffer
,
448 sizeof(send_buffer
));
453 /* Receive handshake info from the sessiond. */
454 ret
= receive_command_reply(channel
, &status
);
459 if (!channel
->version
.set
) {
464 if (channel
->version
.major
!= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
) {
470 pthread_mutex_unlock(&channel
->lock
);
475 enum lttng_notification_channel_status
send_condition_command(
476 struct lttng_notification_channel
*channel
,
477 enum lttng_notification_channel_message_type type
,
478 const struct lttng_condition
*condition
)
481 ssize_t command_size
, ret
;
482 enum lttng_notification_channel_status status
=
483 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
484 char *command_buffer
= NULL
;
485 struct lttng_notification_channel_message cmd_message
= {
490 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
494 assert(type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
||
495 type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
);
497 pthread_mutex_lock(&channel
->lock
);
498 socket
= channel
->socket
;
499 if (!lttng_condition_validate(condition
)) {
500 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
504 ret
= lttng_condition_serialize(condition
, NULL
);
506 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
509 assert(ret
< UINT32_MAX
);
510 cmd_message
.size
= (uint32_t) ret
;
511 command_size
= ret
+ sizeof(
512 struct lttng_notification_channel_message
);
513 command_buffer
= zmalloc(command_size
);
514 if (!command_buffer
) {
518 memcpy(command_buffer
, &cmd_message
, sizeof(cmd_message
));
519 ret
= lttng_condition_serialize(condition
,
520 command_buffer
+ sizeof(cmd_message
));
525 ret
= lttcomm_send_unix_sock(socket
, command_buffer
, command_size
);
527 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
531 ret
= receive_command_reply(channel
, &status
);
533 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
537 pthread_mutex_unlock(&channel
->lock
);
539 free(command_buffer
);
543 enum lttng_notification_channel_status
lttng_notification_channel_subscribe(
544 struct lttng_notification_channel
*channel
,
545 const struct lttng_condition
*condition
)
547 return send_condition_command(channel
,
548 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
,
552 enum lttng_notification_channel_status
lttng_notification_channel_unsubscribe(
553 struct lttng_notification_channel
*channel
,
554 const struct lttng_condition
*condition
)
556 return send_condition_command(channel
,
557 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
,
561 void lttng_notification_channel_destroy(
562 struct lttng_notification_channel
*channel
)
568 if (channel
->socket
>= 0) {
569 (void) lttcomm_close_unix_sock(channel
->socket
);
571 pthread_mutex_destroy(&channel
->lock
);
572 lttng_dynamic_buffer_reset(&channel
->reception_buffer
);