Commit | Line | Data |
---|---|---|
a58c490f JG |
1 | /* |
2 | * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com> | |
3 | * | |
4 | * This library is free software; you can redistribute it and/or modify it | |
5 | * under the terms of the GNU Lesser General Public License, version 2.1 only, | |
6 | * as published by the Free Software Foundation. | |
7 | * | |
8 | * This library is distributed in the hope that it will be useful, but WITHOUT | |
9 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | |
10 | * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License | |
11 | * for more details. | |
12 | * | |
13 | * You should have received a copy of the GNU Lesser General Public License | |
14 | * along with this library; if not, write to the Free Software Foundation, | |
15 | * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |
16 | */ | |
17 | ||
18 | #include <lttng/notification/notification-internal.h> | |
19 | #include <lttng/notification/channel-internal.h> | |
20 | #include <lttng/condition/condition-internal.h> | |
21 | #include <lttng/endpoint.h> | |
22 | #include <common/defaults.h> | |
23 | #include <common/error.h> | |
24 | #include <common/dynamic-buffer.h> | |
25 | #include <common/utils.h> | |
26 | #include <common/defaults.h> | |
27 | #include <assert.h> | |
28 | #include "lttng-ctl-helper.h" | |
29 | ||
30 | static | |
31 | int handshake(struct lttng_notification_channel *channel); | |
32 | ||
33 | /* | |
34 | * Populates the reception buffer with the next complete message. | |
35 | * The caller must acquire the client's lock. | |
36 | */ | |
37 | static | |
38 | int receive_message(struct lttng_notification_channel *channel) | |
39 | { | |
40 | ssize_t ret; | |
41 | struct lttng_notification_channel_message msg; | |
42 | ||
43 | ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0); | |
44 | if (ret) { | |
45 | goto error; | |
46 | } | |
47 | ||
48 | ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg)); | |
49 | if (ret <= 0) { | |
50 | ret = -1; | |
51 | goto error; | |
52 | } | |
53 | ||
54 | if (msg.size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) { | |
55 | ret = -1; | |
56 | goto error; | |
57 | } | |
58 | ||
59 | /* Add message header at buffer's start. */ | |
60 | ret = lttng_dynamic_buffer_append(&channel->reception_buffer, &msg, | |
61 | sizeof(msg)); | |
62 | if (ret) { | |
63 | goto error; | |
64 | } | |
65 | ||
66 | /* Reserve space for the payload. */ | |
67 | ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer, | |
68 | channel->reception_buffer.size + msg.size); | |
69 | if (ret) { | |
70 | goto error; | |
71 | } | |
72 | ||
73 | /* Receive message payload. */ | |
74 | ret = lttcomm_recv_unix_sock(channel->socket, | |
75 | channel->reception_buffer.data + sizeof(msg), msg.size); | |
76 | if (ret < (ssize_t) msg.size) { | |
77 | ret = -1; | |
78 | goto error; | |
79 | } | |
80 | ret = 0; | |
81 | end: | |
82 | return ret; | |
83 | error: | |
e4f3498e JG |
84 | if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) { |
85 | ret = -1; | |
86 | } | |
a58c490f JG |
87 | goto end; |
88 | } | |
89 | ||
90 | static | |
91 | enum lttng_notification_channel_message_type get_current_message_type( | |
92 | struct lttng_notification_channel *channel) | |
93 | { | |
94 | struct lttng_notification_channel_message *msg; | |
95 | ||
96 | assert(channel->reception_buffer.size >= sizeof(*msg)); | |
97 | ||
98 | msg = (struct lttng_notification_channel_message *) | |
99 | channel->reception_buffer.data; | |
100 | return (enum lttng_notification_channel_message_type) msg->type; | |
101 | } | |
102 | ||
103 | static | |
104 | struct lttng_notification *create_notification_from_current_message( | |
105 | struct lttng_notification_channel *channel) | |
106 | { | |
107 | ssize_t ret; | |
108 | struct lttng_notification *notification = NULL; | |
109 | struct lttng_buffer_view view; | |
110 | ||
111 | if (channel->reception_buffer.size <= | |
112 | sizeof(struct lttng_notification_channel_message)) { | |
113 | goto end; | |
114 | } | |
115 | ||
116 | view = lttng_buffer_view_from_dynamic_buffer(&channel->reception_buffer, | |
117 | sizeof(struct lttng_notification_channel_message), -1); | |
118 | ||
119 | ret = lttng_notification_create_from_buffer(&view, ¬ification); | |
120 | if (ret != channel->reception_buffer.size - | |
121 | sizeof(struct lttng_notification_channel_message)) { | |
122 | lttng_notification_destroy(notification); | |
123 | notification = NULL; | |
124 | goto end; | |
125 | } | |
126 | end: | |
127 | return notification; | |
128 | } | |
129 | ||
130 | struct lttng_notification_channel *lttng_notification_channel_create( | |
131 | struct lttng_endpoint *endpoint) | |
132 | { | |
133 | int fd, ret; | |
134 | bool is_in_tracing_group = false, is_root = false; | |
135 | char *sock_path = NULL; | |
136 | struct lttng_notification_channel *channel = NULL; | |
137 | ||
138 | if (!endpoint || | |
139 | endpoint != lttng_session_daemon_notification_endpoint) { | |
140 | goto end; | |
141 | } | |
142 | ||
143 | sock_path = zmalloc(LTTNG_PATH_MAX); | |
144 | if (!sock_path) { | |
145 | goto end; | |
146 | } | |
147 | ||
148 | channel = zmalloc(sizeof(struct lttng_notification_channel)); | |
149 | if (!channel) { | |
150 | goto end; | |
151 | } | |
152 | channel->socket = -1; | |
153 | pthread_mutex_init(&channel->lock, NULL); | |
154 | lttng_dynamic_buffer_init(&channel->reception_buffer); | |
155 | CDS_INIT_LIST_HEAD(&channel->pending_notifications.list); | |
156 | ||
157 | is_root = (getuid() == 0); | |
158 | if (!is_root) { | |
159 | is_in_tracing_group = lttng_check_tracing_group(); | |
160 | } | |
161 | ||
162 | if (is_root || is_in_tracing_group) { | |
163 | lttng_ctl_copy_string(sock_path, | |
164 | DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK, | |
165 | LTTNG_PATH_MAX); | |
166 | ret = lttcomm_connect_unix_sock(sock_path); | |
167 | if (ret >= 0) { | |
168 | fd = ret; | |
169 | goto set_fd; | |
170 | } | |
171 | } | |
172 | ||
173 | /* Fallback to local session daemon. */ | |
174 | ret = snprintf(sock_path, LTTNG_PATH_MAX, | |
175 | DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK, | |
176 | utils_get_home_dir()); | |
177 | if (ret < 0 || ret >= LTTNG_PATH_MAX) { | |
178 | goto error; | |
179 | } | |
180 | ||
181 | ret = lttcomm_connect_unix_sock(sock_path); | |
182 | if (ret < 0) { | |
183 | goto error; | |
184 | } | |
185 | fd = ret; | |
186 | ||
187 | set_fd: | |
188 | channel->socket = fd; | |
189 | ||
190 | ret = handshake(channel); | |
191 | if (ret) { | |
192 | goto error; | |
193 | } | |
194 | end: | |
195 | free(sock_path); | |
196 | return channel; | |
197 | error: | |
198 | lttng_notification_channel_destroy(channel); | |
199 | channel = NULL; | |
200 | goto end; | |
201 | } | |
202 | ||
203 | enum lttng_notification_channel_status | |
204 | lttng_notification_channel_get_next_notification( | |
205 | struct lttng_notification_channel *channel, | |
206 | struct lttng_notification **_notification) | |
207 | { | |
208 | int ret; | |
209 | struct lttng_notification *notification = NULL; | |
210 | enum lttng_notification_channel_status status = | |
211 | LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; | |
212 | ||
213 | if (!channel || !_notification) { | |
214 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; | |
215 | goto end; | |
216 | } | |
217 | ||
218 | if (channel->pending_notifications.count) { | |
219 | struct pending_notification *pending_notification; | |
220 | ||
221 | assert(!cds_list_empty(&channel->pending_notifications.list)); | |
222 | ||
223 | /* Deliver one of the pending notifications. */ | |
224 | pending_notification = cds_list_first_entry( | |
225 | &channel->pending_notifications.list, | |
226 | struct pending_notification, | |
227 | node); | |
228 | notification = pending_notification->notification; | |
229 | if (!notification) { | |
230 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED; | |
231 | } | |
232 | cds_list_del(&pending_notification->node); | |
233 | channel->pending_notifications.count--; | |
234 | free(pending_notification); | |
235 | goto end; | |
236 | } | |
237 | ||
238 | pthread_mutex_lock(&channel->lock); | |
239 | ||
240 | ret = receive_message(channel); | |
241 | if (ret) { | |
242 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; | |
243 | goto end_unlock; | |
244 | } | |
245 | ||
246 | switch (get_current_message_type(channel)) { | |
247 | case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION: | |
248 | notification = create_notification_from_current_message( | |
249 | channel); | |
250 | if (!notification) { | |
251 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; | |
252 | goto end_unlock; | |
253 | } | |
254 | break; | |
255 | case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED: | |
256 | /* No payload to consume. */ | |
257 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED; | |
258 | break; | |
259 | default: | |
260 | /* Protocol error. */ | |
261 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; | |
262 | goto end_unlock; | |
263 | } | |
264 | ||
265 | end_unlock: | |
266 | pthread_mutex_unlock(&channel->lock); | |
267 | end: | |
268 | if (_notification) { | |
269 | *_notification = notification; | |
270 | } | |
271 | return status; | |
272 | } | |
273 | ||
274 | static | |
275 | int enqueue_dropped_notification( | |
276 | struct lttng_notification_channel *channel) | |
277 | { | |
278 | int ret = 0; | |
279 | struct pending_notification *pending_notification; | |
280 | struct cds_list_head *last_element = | |
281 | channel->pending_notifications.list.prev; | |
282 | ||
283 | pending_notification = caa_container_of(last_element, | |
284 | struct pending_notification, node); | |
285 | if (!pending_notification->notification) { | |
286 | /* | |
287 | * The last enqueued notification indicates dropped | |
288 | * notifications; there is nothing to do as we group | |
289 | * dropped notifications together. | |
290 | */ | |
291 | goto end; | |
292 | } | |
293 | ||
294 | if (channel->pending_notifications.count >= | |
295 | DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT && | |
296 | pending_notification->notification) { | |
297 | /* | |
298 | * Discard the last enqueued notification to indicate | |
299 | * that notifications were dropped at this point. | |
300 | */ | |
301 | lttng_notification_destroy( | |
302 | pending_notification->notification); | |
303 | pending_notification->notification = NULL; | |
304 | goto end; | |
305 | } | |
306 | ||
307 | pending_notification = zmalloc(sizeof(*pending_notification)); | |
308 | if (!pending_notification) { | |
309 | ret = -1; | |
310 | goto end; | |
311 | } | |
312 | CDS_INIT_LIST_HEAD(&pending_notification->node); | |
313 | cds_list_add(&pending_notification->node, | |
314 | &channel->pending_notifications.list); | |
315 | channel->pending_notifications.count++; | |
316 | end: | |
317 | return ret; | |
318 | } | |
319 | ||
320 | static | |
321 | int enqueue_notification_from_current_message( | |
322 | struct lttng_notification_channel *channel) | |
323 | { | |
324 | int ret = 0; | |
325 | struct lttng_notification *notification; | |
326 | struct pending_notification *pending_notification; | |
327 | ||
328 | if (channel->pending_notifications.count >= | |
329 | DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT) { | |
330 | /* Drop the notification. */ | |
331 | ret = enqueue_dropped_notification(channel); | |
332 | goto end; | |
333 | } | |
334 | ||
335 | pending_notification = zmalloc(sizeof(*pending_notification)); | |
336 | if (!pending_notification) { | |
337 | ret = -1; | |
338 | goto error; | |
339 | } | |
340 | CDS_INIT_LIST_HEAD(&pending_notification->node); | |
341 | ||
342 | notification = create_notification_from_current_message(channel); | |
343 | if (!notification) { | |
344 | ret = -1; | |
345 | goto error; | |
346 | } | |
347 | ||
348 | pending_notification->notification = notification; | |
349 | cds_list_add(&pending_notification->node, | |
350 | &channel->pending_notifications.list); | |
351 | channel->pending_notifications.count++; | |
352 | end: | |
353 | return ret; | |
354 | error: | |
355 | free(pending_notification); | |
356 | goto end; | |
357 | } | |
358 | ||
359 | static | |
360 | int receive_command_reply(struct lttng_notification_channel *channel, | |
361 | enum lttng_notification_channel_status *status) | |
362 | { | |
363 | int ret; | |
364 | struct lttng_notification_channel_command_reply *reply; | |
365 | ||
366 | while (true) { | |
367 | enum lttng_notification_channel_message_type msg_type; | |
368 | ||
369 | ret = receive_message(channel); | |
370 | if (ret) { | |
371 | goto end; | |
372 | } | |
373 | ||
374 | msg_type = get_current_message_type(channel); | |
375 | switch (msg_type) { | |
376 | case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY: | |
377 | goto exit_loop; | |
378 | case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION: | |
379 | ret = enqueue_notification_from_current_message( | |
380 | channel); | |
381 | if (ret) { | |
382 | goto end; | |
383 | } | |
384 | break; | |
385 | case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED: | |
386 | ret = enqueue_dropped_notification(channel); | |
387 | if (ret) { | |
388 | goto end; | |
389 | } | |
390 | break; | |
391 | case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE: | |
392 | { | |
393 | struct lttng_notification_channel_command_handshake *handshake; | |
394 | ||
395 | handshake = (struct lttng_notification_channel_command_handshake *) | |
396 | (channel->reception_buffer.data + | |
397 | sizeof(struct lttng_notification_channel_message)); | |
398 | channel->version.major = handshake->major; | |
399 | channel->version.minor = handshake->minor; | |
400 | channel->version.set = true; | |
401 | break; | |
402 | } | |
403 | default: | |
404 | ret = -1; | |
405 | goto end; | |
406 | } | |
407 | } | |
408 | ||
409 | exit_loop: | |
410 | if (channel->reception_buffer.size < | |
411 | (sizeof(struct lttng_notification_channel_message) + | |
412 | sizeof(*reply))) { | |
413 | /* Invalid message received. */ | |
414 | ret = -1; | |
415 | goto end; | |
416 | } | |
417 | ||
418 | reply = (struct lttng_notification_channel_command_reply *) | |
419 | (channel->reception_buffer.data + | |
420 | sizeof(struct lttng_notification_channel_message)); | |
421 | *status = (enum lttng_notification_channel_status) reply->status; | |
422 | end: | |
423 | return ret; | |
424 | } | |
425 | ||
426 | static | |
427 | int handshake(struct lttng_notification_channel *channel) | |
428 | { | |
429 | ssize_t ret; | |
430 | enum lttng_notification_channel_status status = | |
431 | LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; | |
432 | struct lttng_notification_channel_command_handshake handshake = { | |
433 | .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR, | |
434 | .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR, | |
435 | }; | |
436 | struct lttng_notification_channel_message msg_header = { | |
437 | .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE, | |
438 | .size = sizeof(handshake), | |
439 | }; | |
440 | char send_buffer[sizeof(msg_header) + sizeof(handshake)]; | |
441 | ||
442 | memcpy(send_buffer, &msg_header, sizeof(msg_header)); | |
443 | memcpy(send_buffer + sizeof(msg_header), &handshake, sizeof(handshake)); | |
444 | ||
445 | pthread_mutex_lock(&channel->lock); | |
446 | ||
01ea340e | 447 | ret = lttcomm_send_creds_unix_sock(channel->socket, send_buffer, |
a58c490f JG |
448 | sizeof(send_buffer)); |
449 | if (ret < 0) { | |
450 | goto end_unlock; | |
451 | } | |
452 | ||
453 | /* Receive handshake info from the sessiond. */ | |
454 | ret = receive_command_reply(channel, &status); | |
455 | if (ret < 0) { | |
456 | goto end_unlock; | |
457 | } | |
458 | ||
459 | if (!channel->version.set) { | |
460 | ret = -1; | |
461 | goto end_unlock; | |
462 | } | |
463 | ||
464 | if (channel->version.major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) { | |
465 | ret = -1; | |
466 | goto end_unlock; | |
467 | } | |
468 | ||
469 | end_unlock: | |
470 | pthread_mutex_unlock(&channel->lock); | |
471 | return ret; | |
472 | } | |
473 | ||
474 | static | |
475 | enum lttng_notification_channel_status send_condition_command( | |
476 | struct lttng_notification_channel *channel, | |
477 | enum lttng_notification_channel_message_type type, | |
478 | const struct lttng_condition *condition) | |
479 | { | |
480 | int socket; | |
481 | ssize_t command_size, ret; | |
482 | enum lttng_notification_channel_status status = | |
483 | LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; | |
484 | char *command_buffer = NULL; | |
485 | struct lttng_notification_channel_message cmd_message = { | |
486 | .type = type, | |
487 | }; | |
488 | ||
489 | if (!channel) { | |
490 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; | |
491 | goto end; | |
492 | } | |
493 | ||
494 | assert(type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE || | |
495 | type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE); | |
496 | ||
497 | pthread_mutex_lock(&channel->lock); | |
498 | socket = channel->socket; | |
499 | if (!lttng_condition_validate(condition)) { | |
500 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; | |
501 | goto end_unlock; | |
502 | } | |
503 | ||
504 | ret = lttng_condition_serialize(condition, NULL); | |
505 | if (ret < 0) { | |
506 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; | |
507 | goto end_unlock; | |
508 | } | |
509 | assert(ret < UINT32_MAX); | |
510 | cmd_message.size = (uint32_t) ret; | |
511 | command_size = ret + sizeof( | |
512 | struct lttng_notification_channel_message); | |
513 | command_buffer = zmalloc(command_size); | |
514 | if (!command_buffer) { | |
515 | goto end_unlock; | |
516 | } | |
517 | ||
518 | memcpy(command_buffer, &cmd_message, sizeof(cmd_message)); | |
519 | ret = lttng_condition_serialize(condition, | |
520 | command_buffer + sizeof(cmd_message)); | |
521 | if (ret < 0) { | |
522 | goto end_unlock; | |
523 | } | |
524 | ||
525 | ret = lttcomm_send_unix_sock(socket, command_buffer, command_size); | |
526 | if (ret < 0) { | |
527 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; | |
528 | goto end_unlock; | |
529 | } | |
530 | ||
531 | ret = receive_command_reply(channel, &status); | |
532 | if (ret < 0) { | |
533 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; | |
534 | goto end_unlock; | |
535 | } | |
536 | end_unlock: | |
537 | pthread_mutex_unlock(&channel->lock); | |
538 | end: | |
539 | free(command_buffer); | |
540 | return status; | |
541 | } | |
542 | ||
543 | enum lttng_notification_channel_status lttng_notification_channel_subscribe( | |
544 | struct lttng_notification_channel *channel, | |
545 | const struct lttng_condition *condition) | |
546 | { | |
547 | return send_condition_command(channel, | |
548 | LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE, | |
549 | condition); | |
550 | } | |
551 | ||
552 | enum lttng_notification_channel_status lttng_notification_channel_unsubscribe( | |
553 | struct lttng_notification_channel *channel, | |
554 | const struct lttng_condition *condition) | |
555 | { | |
556 | return send_condition_command(channel, | |
557 | LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE, | |
558 | condition); | |
559 | } | |
560 | ||
561 | void lttng_notification_channel_destroy( | |
562 | struct lttng_notification_channel *channel) | |
563 | { | |
564 | if (!channel) { | |
565 | return; | |
566 | } | |
567 | ||
568 | if (channel->socket >= 0) { | |
569 | (void) lttcomm_close_unix_sock(channel->socket); | |
570 | } | |
571 | pthread_mutex_destroy(&channel->lock); | |
572 | lttng_dynamic_buffer_reset(&channel->reception_buffer); | |
573 | free(channel); | |
574 | } |