2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 * 2019 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License, version 2 only, as
9 * published by the Free Software Foundation.
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
16 * You should have received a copy of the GNU General Public License along with
17 * this program; if not, write to the Free Software Foundation, Inc., 51
18 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 #include <common/common.h>
23 #include <common/utils.h>
24 #include <common/defaults.h>
25 #include <common/sessiond-comm/relayd.h>
26 #include <urcu/rculist.h>
29 #include "lttng-relayd.h"
32 #include "viewer-stream.h"
34 #include <sys/types.h>
37 #define FILE_IO_STACK_BUFFER_SIZE 65536
39 /* Should be called with RCU read-side lock held. */
40 bool stream_get(struct relay_stream
*stream
)
42 return urcu_ref_get_unless_zero(&stream
->ref
);
46 * Get stream from stream id from the streams hash table. Return stream
47 * if found else NULL. A stream reference is taken when a stream is
48 * returned. stream_put() must be called on that stream.
50 struct relay_stream
*stream_get_by_id(uint64_t stream_id
)
52 struct lttng_ht_node_u64
*node
;
53 struct lttng_ht_iter iter
;
54 struct relay_stream
*stream
= NULL
;
57 lttng_ht_lookup(relay_streams_ht
, &stream_id
, &iter
);
58 node
= lttng_ht_iter_get_node_u64(&iter
);
60 DBG("Relay stream %" PRIu64
" not found", stream_id
);
63 stream
= caa_container_of(node
, struct relay_stream
, node
);
64 if (!stream_get(stream
)) {
72 static void stream_complete_rotation(struct relay_stream
*stream
)
74 DBG("Rotation completed for stream %" PRIu64
, stream
->stream_handle
);
75 if (stream
->ongoing_rotation
.value
.next_trace_chunk
) {
76 tracefile_array_reset(stream
->tfa
);
77 tracefile_array_commit_seq(stream
->tfa
,
78 stream
->index_received_seqcount
);
80 lttng_trace_chunk_put(stream
->trace_chunk
);
81 stream
->trace_chunk
= stream
->ongoing_rotation
.value
.next_trace_chunk
;
82 stream
->ongoing_rotation
= (typeof(stream
->ongoing_rotation
)) {};
85 static int stream_create_data_output_file_from_trace_chunk(
86 struct relay_stream
*stream
,
87 struct lttng_trace_chunk
*trace_chunk
,
89 struct stream_fd
**out_stream_fd
)
92 char stream_path
[LTTNG_PATH_MAX
];
93 enum lttng_trace_chunk_status status
;
94 const int flags
= O_RDWR
| O_CREAT
| O_TRUNC
;
95 const mode_t mode
= S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IWGRP
;
97 ASSERT_LOCKED(stream
->lock
);
99 ret
= utils_stream_file_path(stream
->path_name
, stream
->channel_name
,
100 stream
->tracefile_size
, stream
->tracefile_current_index
,
101 NULL
, stream_path
, sizeof(stream_path
));
106 if (stream
->tracefile_wrapped_around
|| force_unlink
) {
108 * The on-disk ring-buffer has wrapped around.
109 * Newly created stream files will replace existing files. Since
110 * live clients may be consuming existing files, the file about
111 * to be replaced is unlinked in order to not overwrite its
114 status
= lttng_trace_chunk_unlink_file(trace_chunk
,
116 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
117 PERROR("Failed to unlink stream file \"%s\" during trace file rotation",
120 * Don't abort if the file doesn't exist, it is
121 * unexpected, but should not be a fatal error.
123 if (errno
!= ENOENT
) {
130 status
= lttng_trace_chunk_open_file(
131 trace_chunk
, stream_path
, flags
, mode
, &fd
, false);
132 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
133 ERR("Failed to open stream file \"%s\"", stream
->channel_name
);
138 *out_stream_fd
= stream_fd_create(fd
);
139 if (!*out_stream_fd
) {
141 PERROR("Error closing stream file descriptor %d", ret
);
150 static int stream_rotate_data_file(struct relay_stream
*stream
)
154 DBG("Rotating stream %" PRIu64
" data file with size %" PRIu64
,
155 stream
->stream_handle
, stream
->tracefile_size_current
);
157 if (stream
->stream_fd
) {
158 stream_fd_put(stream
->stream_fd
);
159 stream
->stream_fd
= NULL
;
162 stream
->tracefile_wrapped_around
= false;
163 stream
->tracefile_current_index
= 0;
165 if (stream
->ongoing_rotation
.value
.next_trace_chunk
) {
166 struct stream_fd
*new_stream_fd
= NULL
;
167 enum lttng_trace_chunk_status chunk_status
;
169 chunk_status
= lttng_trace_chunk_create_subdirectory(
170 stream
->ongoing_rotation
.value
.next_trace_chunk
,
172 if (chunk_status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
177 /* Rotate the data file. */
178 ret
= stream_create_data_output_file_from_trace_chunk(stream
,
179 stream
->ongoing_rotation
.value
.next_trace_chunk
,
180 false, &new_stream_fd
);
181 stream
->stream_fd
= new_stream_fd
;
183 ERR("Failed to rotate stream data file");
187 DBG("%s: reset tracefile_size_current for stream %" PRIu64
" was %" PRIu64
,
188 __func__
, stream
->stream_handle
, stream
->tracefile_size_current
);
189 stream
->tracefile_size_current
= 0;
190 stream
->pos_after_last_complete_data_index
= 0;
191 stream
->ongoing_rotation
.value
.data_rotated
= true;
193 if (stream
->ongoing_rotation
.value
.index_rotated
) {
194 /* Rotation completed; reset its state. */
195 stream_complete_rotation(stream
);
202 * If too much data has been written in a tracefile before we received the
203 * rotation command, we have to move the excess data to the new tracefile and
204 * perform the rotation. This can happen because the control and data
205 * connections are separate, the indexes as well as the commands arrive from
206 * the control connection and we have no control over the order so we could be
207 * in a situation where too much data has been received on the data connection
208 * before the rotation command on the control connection arrives.
210 static int rotate_truncate_stream(struct relay_stream
*stream
)
213 off_t lseek_ret
, previous_stream_copy_origin
;
214 uint64_t copy_bytes_left
, misplaced_data_size
;
215 bool acquired_reference
;
216 struct stream_fd
*previous_stream_fd
= NULL
;
217 struct lttng_trace_chunk
*previous_chunk
= NULL
;
219 if (!LTTNG_OPTIONAL_GET(stream
->ongoing_rotation
).next_trace_chunk
) {
220 ERR("Protocol error encoutered in %s(): stream rotation "
221 "sequence number is before the current sequence number "
222 "and the next trace chunk is unset. Honoring this "
223 "rotation command would result in data loss",
229 ASSERT_LOCKED(stream
->lock
);
231 * Acquire a reference to the current trace chunk to ensure
232 * it is not reclaimed when `stream_rotate_data_file` is called.
233 * Failing to do so would violate the contract of the trace
234 * chunk API as an active file descriptor would outlive the
237 acquired_reference
= lttng_trace_chunk_get(stream
->trace_chunk
);
238 assert(acquired_reference
);
239 previous_chunk
= stream
->trace_chunk
;
242 * Steal the stream's reference to its stream_fd. A new
243 * stream_fd will be created when the rotation completes and
244 * the orinal stream_fd will be used to copy the "extra" data
247 assert(stream
->stream_fd
);
248 previous_stream_fd
= stream
->stream_fd
;
249 stream
->stream_fd
= NULL
;
251 assert(!stream
->is_metadata
);
252 assert(stream
->tracefile_size_current
>
253 stream
->pos_after_last_complete_data_index
);
254 misplaced_data_size
= stream
->tracefile_size_current
-
255 stream
->pos_after_last_complete_data_index
;
256 copy_bytes_left
= misplaced_data_size
;
257 previous_stream_copy_origin
= stream
->pos_after_last_complete_data_index
;
259 ret
= stream_rotate_data_file(stream
);
264 assert(stream
->stream_fd
);
266 * Seek the current tracefile to the position at which the rotation
267 * should have occurred.
269 lseek_ret
= lseek(previous_stream_fd
->fd
, previous_stream_copy_origin
,
272 PERROR("Failed to seek to offset %" PRIu64
273 " while copying extra data received before a stream rotation",
274 (uint64_t) previous_stream_copy_origin
);
279 /* Move data from the old file to the new file. */
280 while (copy_bytes_left
) {
282 char copy_buffer
[FILE_IO_STACK_BUFFER_SIZE
];
283 const off_t copy_size_this_pass
= min_t(
284 off_t
, copy_bytes_left
, sizeof(copy_buffer
));
286 io_ret
= lttng_read(previous_stream_fd
->fd
, copy_buffer
,
287 copy_size_this_pass
);
288 if (io_ret
< (ssize_t
) copy_size_this_pass
) {
290 PERROR("Failed to read %" PRIu64
291 " bytes from fd %i in %s(), returned %zi",
293 previous_stream_fd
->fd
,
294 __FUNCTION__
, io_ret
);
296 ERR("Failed to read %" PRIu64
297 " bytes from fd %i in %s(), returned %zi",
299 previous_stream_fd
->fd
,
300 __FUNCTION__
, io_ret
);
306 io_ret
= lttng_write(stream
->stream_fd
->fd
, copy_buffer
,
307 copy_size_this_pass
);
308 if (io_ret
< (ssize_t
) copy_size_this_pass
) {
310 PERROR("Failed to write %" PRIu64
311 " bytes from fd %i in %s(), returned %zi",
313 stream
->stream_fd
->fd
,
314 __FUNCTION__
, io_ret
);
316 ERR("Failed to write %" PRIu64
317 " bytes from fd %i in %s(), returned %zi",
319 stream
->stream_fd
->fd
,
320 __FUNCTION__
, io_ret
);
325 copy_bytes_left
-= copy_size_this_pass
;
328 /* Truncate the file to get rid of the excess data. */
329 ret
= ftruncate(previous_stream_fd
->fd
, previous_stream_copy_origin
);
331 PERROR("Failed to truncate current stream file to offset %" PRIu64
,
332 previous_stream_copy_origin
);
337 * Update the offset and FD of all the eventual indexes created by the
338 * data connection before the rotation command arrived.
340 ret
= relay_index_switch_all_files(stream
);
342 ERR("Failed to rotate index file");
346 stream
->tracefile_size_current
= misplaced_data_size
;
347 /* Index and data contents are back in sync. */
348 stream
->pos_after_last_complete_data_index
= 0;
351 lttng_trace_chunk_put(previous_chunk
);
352 stream_fd_put(previous_stream_fd
);
357 * Check if a stream's data file (as opposed to index) should be rotated
358 * (for session rotation).
359 * Must be called with the stream lock held.
361 * Return 0 on success, a negative value on error.
363 static int try_rotate_stream_data(struct relay_stream
*stream
)
367 if (caa_likely(!stream
->ongoing_rotation
.is_set
)) {
368 /* No rotation expected. */
372 if (stream
->ongoing_rotation
.value
.data_rotated
) {
373 /* Rotation of the data file has already occurred. */
377 DBG("%s: Stream %" PRIu64
378 " (rotate_at_index_packet_seq_num = %" PRIu64
379 ", rotate_at_prev_data_net_seq = %" PRIu64
380 ", prev_data_seq = %" PRIu64
")",
381 __func__
, stream
->stream_handle
,
382 stream
->ongoing_rotation
.value
.packet_seq_num
,
383 stream
->ongoing_rotation
.value
.prev_data_net_seq
,
384 stream
->prev_data_seq
);
386 if (stream
->prev_data_seq
== -1ULL ||
387 stream
->ongoing_rotation
.value
.prev_data_net_seq
== -1ULL ||
388 stream
->prev_data_seq
<
389 stream
->ongoing_rotation
.value
.prev_data_net_seq
) {
391 * The next packet that will be written is not part of the next
394 DBG("Stream %" PRIu64
" data not yet ready for rotation "
395 "(rotate_at_index_packet_seq_num = %" PRIu64
396 ", rotate_at_prev_data_net_seq = %" PRIu64
397 ", prev_data_seq = %" PRIu64
")",
398 stream
->stream_handle
,
399 stream
->ongoing_rotation
.value
.packet_seq_num
,
400 stream
->ongoing_rotation
.value
.prev_data_net_seq
,
401 stream
->prev_data_seq
);
403 } else if (stream
->prev_data_seq
> stream
->ongoing_rotation
.value
.prev_data_net_seq
) {
405 * prev_data_seq is checked here since indexes and rotation
406 * commands are serialized with respect to each other.
408 DBG("Rotation after too much data has been written in tracefile "
409 "for stream %" PRIu64
", need to truncate before "
410 "rotating", stream
->stream_handle
);
411 ret
= rotate_truncate_stream(stream
);
413 ERR("Failed to truncate stream");
417 ret
= stream_rotate_data_file(stream
);
425 * Close the current index file if it is open, and create a new one.
427 * Return 0 on success, -1 on error.
429 static int create_index_file(struct relay_stream
*stream
,
430 struct lttng_trace_chunk
*chunk
)
433 uint32_t major
, minor
;
434 char *index_subpath
= NULL
;
435 enum lttng_trace_chunk_status status
;
437 ASSERT_LOCKED(stream
->lock
);
439 /* Put ref on previous index_file. */
440 if (stream
->index_file
) {
441 lttng_index_file_put(stream
->index_file
);
442 stream
->index_file
= NULL
;
444 major
= stream
->trace
->session
->major
;
445 minor
= stream
->trace
->session
->minor
;
451 ret
= asprintf(&index_subpath
, "%s/%s", stream
->path_name
,
457 status
= lttng_trace_chunk_create_subdirectory(chunk
,
460 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
464 status
= lttng_index_file_create_from_trace_chunk(
465 chunk
, stream
->path_name
,
466 stream
->channel_name
, stream
->tracefile_size
,
467 stream
->tracefile_current_index
,
468 lttng_to_index_major(major
, minor
),
469 lttng_to_index_minor(major
, minor
), true,
470 &stream
->index_file
);
471 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
483 * Check if a stream's index file should be rotated (for session rotation).
484 * Must be called with the stream lock held.
486 * Return 0 on success, a negative value on error.
488 static int try_rotate_stream_index(struct relay_stream
*stream
)
492 if (!stream
->ongoing_rotation
.is_set
) {
493 /* No rotation expected. */
497 if (stream
->ongoing_rotation
.value
.index_rotated
) {
498 /* Rotation of the index has already occurred. */
502 DBG("%s: Stream %" PRIu64
503 " (rotate_at_packet_seq_num = %" PRIu64
504 ", received_packet_seq_num = "
505 "(value = %" PRIu64
", is_set = %" PRIu8
"))",
506 __func__
, stream
->stream_handle
,
507 stream
->ongoing_rotation
.value
.packet_seq_num
,
508 stream
->received_packet_seq_num
.value
,
509 stream
->received_packet_seq_num
.is_set
);
511 if (!stream
->received_packet_seq_num
.is_set
||
512 LTTNG_OPTIONAL_GET(stream
->received_packet_seq_num
) + 1 <
513 stream
->ongoing_rotation
.value
.packet_seq_num
) {
514 DBG("Stream %" PRIu64
" index not yet ready for rotation "
515 "(rotate_at_packet_seq_num = %" PRIu64
516 ", received_packet_seq_num = "
517 "(value = %" PRIu64
", is_set = %" PRIu8
"))",
518 stream
->stream_handle
,
519 stream
->ongoing_rotation
.value
.packet_seq_num
,
520 stream
->received_packet_seq_num
.value
,
521 stream
->received_packet_seq_num
.is_set
);
525 * The next index belongs to the new trace chunk; rotate.
526 * In overwrite mode, the packet seq num may jump over the
529 assert(LTTNG_OPTIONAL_GET(stream
->received_packet_seq_num
) + 1 >=
530 stream
->ongoing_rotation
.value
.packet_seq_num
);
531 DBG("Rotating stream %" PRIu64
" index file",
532 stream
->stream_handle
);
533 if (stream
->index_file
) {
534 lttng_index_file_put(stream
->index_file
);
535 stream
->index_file
= NULL
;
537 stream
->ongoing_rotation
.value
.index_rotated
= true;
540 * Set the rotation pivot position for the data, now that we have the
541 * net_seq_num matching the packet_seq_num index pivot position.
543 stream
->ongoing_rotation
.value
.prev_data_net_seq
=
544 stream
->prev_index_seq
;
545 if (stream
->ongoing_rotation
.value
.data_rotated
&&
546 stream
->ongoing_rotation
.value
.index_rotated
) {
547 /* Rotation completed; reset its state. */
548 DBG("Rotation completed for stream %" PRIu64
,
549 stream
->stream_handle
);
550 stream_complete_rotation(stream
);
558 static int stream_set_trace_chunk(struct relay_stream
*stream
,
559 struct lttng_trace_chunk
*chunk
)
562 enum lttng_trace_chunk_status status
;
563 bool acquired_reference
;
564 struct stream_fd
*new_stream_fd
= NULL
;
566 status
= lttng_trace_chunk_create_subdirectory(chunk
,
568 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
573 lttng_trace_chunk_put(stream
->trace_chunk
);
574 acquired_reference
= lttng_trace_chunk_get(chunk
);
575 assert(acquired_reference
);
576 stream
->trace_chunk
= chunk
;
578 if (stream
->stream_fd
) {
579 stream_fd_put(stream
->stream_fd
);
580 stream
->stream_fd
= NULL
;
582 ret
= stream_create_data_output_file_from_trace_chunk(stream
, chunk
,
583 false, &new_stream_fd
);
584 stream
->stream_fd
= new_stream_fd
;
590 * We keep ownership of path_name and channel_name.
592 struct relay_stream
*stream_create(struct ctf_trace
*trace
,
593 uint64_t stream_handle
, char *path_name
,
594 char *channel_name
, uint64_t tracefile_size
,
595 uint64_t tracefile_count
)
598 struct relay_stream
*stream
= NULL
;
599 struct relay_session
*session
= trace
->session
;
600 bool acquired_reference
= false;
601 struct lttng_trace_chunk
*current_trace_chunk
;
603 stream
= zmalloc(sizeof(struct relay_stream
));
604 if (stream
== NULL
) {
605 PERROR("relay stream zmalloc");
609 stream
->stream_handle
= stream_handle
;
610 stream
->prev_data_seq
= -1ULL;
611 stream
->prev_index_seq
= -1ULL;
612 stream
->last_net_seq_num
= -1ULL;
613 stream
->ctf_stream_id
= -1ULL;
614 stream
->tracefile_size
= tracefile_size
;
615 stream
->tracefile_count
= tracefile_count
;
616 stream
->path_name
= path_name
;
617 stream
->channel_name
= channel_name
;
618 stream
->beacon_ts_end
= -1ULL;
619 lttng_ht_node_init_u64(&stream
->node
, stream
->stream_handle
);
620 pthread_mutex_init(&stream
->lock
, NULL
);
621 urcu_ref_init(&stream
->ref
);
622 ctf_trace_get(trace
);
623 stream
->trace
= trace
;
625 pthread_mutex_lock(&trace
->session
->lock
);
626 current_trace_chunk
= trace
->session
->current_trace_chunk
;
627 if (current_trace_chunk
) {
628 acquired_reference
= lttng_trace_chunk_get(current_trace_chunk
);
630 pthread_mutex_unlock(&trace
->session
->lock
);
631 if (!acquired_reference
) {
632 ERR("Cannot create stream for channel \"%s\" as a reference to the session's current trace chunk could not be acquired",
638 stream
->indexes_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
639 if (!stream
->indexes_ht
) {
640 ERR("Cannot created indexes_ht");
645 pthread_mutex_lock(&stream
->lock
);
646 ret
= stream_set_trace_chunk(stream
, current_trace_chunk
);
647 pthread_mutex_unlock(&stream
->lock
);
649 ERR("Failed to set the current trace chunk of session \"%s\" on newly created stream of channel \"%s\"",
650 trace
->session
->session_name
,
651 stream
->channel_name
);
655 stream
->tfa
= tracefile_array_create(stream
->tracefile_count
);
661 stream
->is_metadata
= !strcmp(stream
->channel_name
,
662 DEFAULT_METADATA_NAME
);
663 stream
->in_recv_list
= true;
666 * Add the stream in the recv list of the session. Once the end stream
667 * message is received, all session streams are published.
669 pthread_mutex_lock(&session
->recv_list_lock
);
670 cds_list_add_rcu(&stream
->recv_node
, &session
->recv_list
);
671 session
->stream_count
++;
672 pthread_mutex_unlock(&session
->recv_list_lock
);
675 * Both in the ctf_trace object and the global stream ht since the data
676 * side of the relayd does not have the concept of session.
678 lttng_ht_add_unique_u64(relay_streams_ht
, &stream
->node
);
679 stream
->in_stream_ht
= true;
681 DBG("Relay new stream added %s with ID %" PRIu64
, stream
->channel_name
,
682 stream
->stream_handle
);
687 if (stream
->stream_fd
) {
688 stream_fd_put(stream
->stream_fd
);
689 stream
->stream_fd
= NULL
;
694 if (acquired_reference
) {
695 lttng_trace_chunk_put(current_trace_chunk
);
701 * path_name and channel_name need to be freed explicitly here
702 * because we cannot rely on stream_put().
710 * Called with the session lock held.
712 void stream_publish(struct relay_stream
*stream
)
714 struct relay_session
*session
;
716 pthread_mutex_lock(&stream
->lock
);
717 if (stream
->published
) {
721 session
= stream
->trace
->session
;
723 pthread_mutex_lock(&session
->recv_list_lock
);
724 if (stream
->in_recv_list
) {
725 cds_list_del_rcu(&stream
->recv_node
);
726 stream
->in_recv_list
= false;
728 pthread_mutex_unlock(&session
->recv_list_lock
);
730 pthread_mutex_lock(&stream
->trace
->stream_list_lock
);
731 cds_list_add_rcu(&stream
->stream_node
, &stream
->trace
->stream_list
);
732 pthread_mutex_unlock(&stream
->trace
->stream_list_lock
);
734 stream
->published
= true;
736 pthread_mutex_unlock(&stream
->lock
);
740 * Stream must be protected by holding the stream lock or by virtue of being
741 * called from stream_destroy.
743 static void stream_unpublish(struct relay_stream
*stream
)
745 if (stream
->in_stream_ht
) {
746 struct lttng_ht_iter iter
;
749 iter
.iter
.node
= &stream
->node
.node
;
750 ret
= lttng_ht_del(relay_streams_ht
, &iter
);
752 stream
->in_stream_ht
= false;
754 if (stream
->published
) {
755 pthread_mutex_lock(&stream
->trace
->stream_list_lock
);
756 cds_list_del_rcu(&stream
->stream_node
);
757 pthread_mutex_unlock(&stream
->trace
->stream_list_lock
);
758 stream
->published
= false;
762 static void stream_destroy(struct relay_stream
*stream
)
764 if (stream
->indexes_ht
) {
766 * Calling lttng_ht_destroy in call_rcu worker thread so
767 * we don't hold the RCU read-side lock while calling
770 lttng_ht_destroy(stream
->indexes_ht
);
773 tracefile_array_destroy(stream
->tfa
);
775 free(stream
->path_name
);
776 free(stream
->channel_name
);
780 static void stream_destroy_rcu(struct rcu_head
*rcu_head
)
782 struct relay_stream
*stream
=
783 caa_container_of(rcu_head
, struct relay_stream
, rcu_node
);
785 stream_destroy(stream
);
789 * No need to take stream->lock since this is only called on the final
790 * stream_put which ensures that a single thread may act on the stream.
792 static void stream_release(struct urcu_ref
*ref
)
794 struct relay_stream
*stream
=
795 caa_container_of(ref
, struct relay_stream
, ref
);
796 struct relay_session
*session
;
798 session
= stream
->trace
->session
;
800 DBG("Releasing stream id %" PRIu64
, stream
->stream_handle
);
802 pthread_mutex_lock(&session
->recv_list_lock
);
803 session
->stream_count
--;
804 if (stream
->in_recv_list
) {
805 cds_list_del_rcu(&stream
->recv_node
);
806 stream
->in_recv_list
= false;
808 pthread_mutex_unlock(&session
->recv_list_lock
);
810 stream_unpublish(stream
);
812 if (stream
->stream_fd
) {
813 stream_fd_put(stream
->stream_fd
);
814 stream
->stream_fd
= NULL
;
816 if (stream
->index_file
) {
817 lttng_index_file_put(stream
->index_file
);
818 stream
->index_file
= NULL
;
821 ctf_trace_put(stream
->trace
);
822 stream
->trace
= NULL
;
824 stream_complete_rotation(stream
);
825 lttng_trace_chunk_put(stream
->trace_chunk
);
826 stream
->trace_chunk
= NULL
;
828 call_rcu(&stream
->rcu_node
, stream_destroy_rcu
);
831 void stream_put(struct relay_stream
*stream
)
834 assert(stream
->ref
.refcount
!= 0);
836 * Wait until we have processed all the stream packets before
837 * actually putting our last stream reference.
839 urcu_ref_put(&stream
->ref
, stream_release
);
843 int stream_set_pending_rotation(struct relay_stream
*stream
,
844 struct lttng_trace_chunk
*next_trace_chunk
,
845 uint64_t rotation_sequence_number
)
848 const struct relay_stream_rotation rotation
= {
849 .data_rotated
= false,
850 .index_rotated
= false,
851 .packet_seq_num
= rotation_sequence_number
,
852 .prev_data_net_seq
= -1ULL,
853 .next_trace_chunk
= next_trace_chunk
,
856 if (stream
->ongoing_rotation
.is_set
) {
857 ERR("Attempted to set a pending rotation on a stream already being rotated (protocol error)");
862 if (next_trace_chunk
) {
863 const bool reference_acquired
=
864 lttng_trace_chunk_get(next_trace_chunk
);
866 assert(reference_acquired
);
868 LTTNG_OPTIONAL_SET(&stream
->ongoing_rotation
, rotation
);
870 DBG("Setting pending rotation: stream_id = %" PRIu64
871 ", rotate_at_packet_seq_num = %" PRIu64
,
872 stream
->stream_handle
, rotation_sequence_number
);
873 if (stream
->is_metadata
) {
875 * A metadata stream has no index; consider it already rotated.
877 stream
->ongoing_rotation
.value
.index_rotated
= true;
878 if (next_trace_chunk
) {
880 * The metadata will be received again in the new chunk.
882 stream
->metadata_received
= 0;
884 ret
= stream_rotate_data_file(stream
);
886 ret
= try_rotate_stream_index(stream
);
891 ret
= try_rotate_stream_data(stream
);
900 void try_stream_close(struct relay_stream
*stream
)
902 bool session_aborted
;
903 struct relay_session
*session
= stream
->trace
->session
;
905 DBG("Trying to close stream %" PRIu64
, stream
->stream_handle
);
907 pthread_mutex_lock(&session
->lock
);
908 session_aborted
= session
->aborted
;
909 pthread_mutex_unlock(&session
->lock
);
911 pthread_mutex_lock(&stream
->lock
);
913 * Can be called concurently by connection close and reception of last
916 if (stream
->closed
) {
917 pthread_mutex_unlock(&stream
->lock
);
918 DBG("closing stream %" PRIu64
" aborted since it is already marked as closed", stream
->stream_handle
);
922 stream
->close_requested
= true;
924 if (stream
->last_net_seq_num
== -1ULL) {
926 * Handle connection close without explicit stream close
929 * We can be clever about indexes partially received in
930 * cases where we received the data socket part, but not
931 * the control socket part: since we're currently closing
932 * the stream on behalf of the control socket, we *know*
933 * there won't be any more control information for this
934 * socket. Therefore, we can destroy all indexes for
935 * which we have received only the file descriptor (from
936 * data socket). This takes care of consumerd crashes
937 * between sending the data and control information for
938 * a packet. Since those are sent in that order, we take
939 * care of consumerd crashes.
941 DBG("relay_index_close_partial_fd");
942 relay_index_close_partial_fd(stream
);
944 * Use the highest net_seq_num we currently have pending
945 * As end of stream indicator. Leave last_net_seq_num
946 * at -1ULL if we cannot find any index.
948 stream
->last_net_seq_num
= relay_index_find_last(stream
);
949 DBG("Updating stream->last_net_seq_num to %" PRIu64
, stream
->last_net_seq_num
);
950 /* Fall-through into the next check. */
953 if (stream
->last_net_seq_num
!= -1ULL &&
954 ((int64_t) (stream
->prev_data_seq
- stream
->last_net_seq_num
)) < 0
955 && !session_aborted
) {
957 * Don't close since we still have data pending. This
958 * handles cases where an explicit close command has
959 * been received for this stream, and cases where the
960 * connection has been closed, and we are awaiting for
961 * index information from the data socket. It is
962 * therefore expected that all the index fd information
963 * we need has already been received on the control
964 * socket. Matching index information from data socket
965 * should be Expected Soon(TM).
967 * TODO: We should implement a timer to garbage collect
968 * streams after a timeout to be resilient against a
969 * consumerd implementation that would not match this
972 pthread_mutex_unlock(&stream
->lock
);
973 DBG("closing stream %" PRIu64
" aborted since it still has data pending", stream
->stream_handle
);
977 * We received all the indexes we can expect.
979 stream_unpublish(stream
);
980 stream
->closed
= true;
981 /* Relay indexes are only used by the "consumer/sessiond" end. */
982 relay_index_close_all(stream
);
985 * If we are closed by an application exiting (per-pid buffers),
986 * we need to put our reference on the stream trace chunk right
987 * away, because otherwise still holding the reference on the
988 * trace chunk could allow a viewer stream (which holds a reference
989 * to the stream) to postpone destroy waiting for the chunk to cease
990 * to exist endlessly until the viewer is detached.
993 /* Put stream fd before put chunk. */
994 if (stream
->stream_fd
) {
995 stream_fd_put(stream
->stream_fd
);
996 stream
->stream_fd
= NULL
;
998 if (stream
->index_file
) {
999 lttng_index_file_put(stream
->index_file
);
1000 stream
->index_file
= NULL
;
1002 lttng_trace_chunk_put(stream
->trace_chunk
);
1003 stream
->trace_chunk
= NULL
;
1004 pthread_mutex_unlock(&stream
->lock
);
1005 DBG("Succeeded in closing stream %" PRIu64
, stream
->stream_handle
);
1009 int stream_init_packet(struct relay_stream
*stream
, size_t packet_size
,
1014 ASSERT_LOCKED(stream
->lock
);
1016 if (!stream
->stream_fd
|| !stream
->trace_chunk
) {
1017 ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64
", channel_name = %s",
1018 stream
->stream_handle
, stream
->channel_name
);
1023 if (caa_likely(stream
->tracefile_size
== 0)) {
1024 /* No size limit set; nothing to check. */
1029 * Check if writing the new packet would exceed the maximal file size.
1031 if (caa_unlikely((stream
->tracefile_size_current
+ packet_size
) >
1032 stream
->tracefile_size
)) {
1033 const uint64_t new_file_index
=
1034 (stream
->tracefile_current_index
+ 1) %
1035 stream
->tracefile_count
;
1037 if (new_file_index
< stream
->tracefile_current_index
) {
1038 stream
->tracefile_wrapped_around
= true;
1040 DBG("New stream packet causes stream file rotation: stream_id = %" PRIu64
1041 ", current_file_size = %" PRIu64
1042 ", packet_size = %zu, current_file_index = %" PRIu64
1043 " new_file_index = %" PRIu64
,
1044 stream
->stream_handle
,
1045 stream
->tracefile_size_current
, packet_size
,
1046 stream
->tracefile_current_index
, new_file_index
);
1047 tracefile_array_file_rotate(stream
->tfa
, TRACEFILE_ROTATE_WRITE
);
1048 stream
->tracefile_current_index
= new_file_index
;
1050 if (stream
->stream_fd
) {
1051 stream_fd_put(stream
->stream_fd
);
1052 stream
->stream_fd
= NULL
;
1054 ret
= stream_create_data_output_file_from_trace_chunk(stream
,
1055 stream
->trace_chunk
, false, &stream
->stream_fd
);
1057 ERR("Failed to perform trace file rotation of stream %" PRIu64
,
1058 stream
->stream_handle
);
1063 * Reset current size because we just performed a stream
1066 DBG("%s: reset tracefile_size_current for stream %" PRIu64
" was %" PRIu64
,
1067 __func__
, stream
->stream_handle
, stream
->tracefile_size_current
);
1068 stream
->tracefile_size_current
= 0;
1069 *file_rotated
= true;
1071 *file_rotated
= false;
1077 /* Note that the packet is not necessarily complete. */
1078 int stream_write(struct relay_stream
*stream
,
1079 const struct lttng_buffer_view
*packet
, size_t padding_len
)
1083 size_t padding_to_write
= padding_len
;
1084 char padding_buffer
[FILE_IO_STACK_BUFFER_SIZE
];
1086 ASSERT_LOCKED(stream
->lock
);
1087 memset(padding_buffer
, 0,
1088 min(sizeof(padding_buffer
), padding_to_write
));
1090 if (!stream
->stream_fd
|| !stream
->trace_chunk
) {
1091 ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64
", channel_name = %s",
1092 stream
->stream_handle
, stream
->channel_name
);
1097 write_ret
= lttng_write(stream
->stream_fd
->fd
,
1098 packet
->data
, packet
->size
);
1099 if (write_ret
!= packet
->size
) {
1100 PERROR("Failed to write to stream file of %sstream %" PRIu64
,
1101 stream
->is_metadata
? "metadata " : "",
1102 stream
->stream_handle
);
1108 while (padding_to_write
> 0) {
1109 const size_t padding_to_write_this_pass
=
1110 min(padding_to_write
, sizeof(padding_buffer
));
1112 write_ret
= lttng_write(stream
->stream_fd
->fd
,
1113 padding_buffer
, padding_to_write_this_pass
);
1114 if (write_ret
!= padding_to_write_this_pass
) {
1115 PERROR("Failed to write padding to file of %sstream %" PRIu64
,
1116 stream
->is_metadata
? "metadata " : "",
1117 stream
->stream_handle
);
1121 padding_to_write
-= padding_to_write_this_pass
;
1124 if (stream
->is_metadata
) {
1127 recv_len
= packet
? packet
->size
: 0;
1128 recv_len
+= padding_len
;
1129 stream
->metadata_received
+= recv_len
;
1131 stream
->no_new_metadata_notified
= false;
1135 DBG("Wrote to %sstream %" PRIu64
": data_length = %zu, padding_length = %zu",
1136 stream
->is_metadata
? "metadata " : "",
1137 stream
->stream_handle
,
1138 packet
? packet
->size
: (size_t) 0, padding_len
);
1144 * Update index after receiving a packet for a data stream.
1146 * Called with the stream lock held.
1148 * Return 0 on success else a negative value.
1150 int stream_update_index(struct relay_stream
*stream
, uint64_t net_seq_num
,
1151 bool rotate_index
, bool *flushed
, uint64_t total_size
)
1154 uint64_t data_offset
;
1155 struct relay_index
*index
;
1157 assert(stream
->trace_chunk
);
1158 ASSERT_LOCKED(stream
->lock
);
1159 /* Get data offset because we are about to update the index. */
1160 data_offset
= htobe64(stream
->tracefile_size_current
);
1162 DBG("handle_index_data: stream %" PRIu64
" net_seq_num %" PRIu64
" data offset %" PRIu64
,
1163 stream
->stream_handle
, net_seq_num
, stream
->tracefile_size_current
);
1166 * Lookup for an existing index for that stream id/sequence
1167 * number. If it exists, the control thread has already received the
1168 * data for it, thus we need to write it to disk.
1170 index
= relay_index_get_by_id_or_create(stream
, net_seq_num
);
1176 if (rotate_index
|| !stream
->index_file
) {
1177 ret
= create_index_file(stream
, stream
->trace_chunk
);
1179 ERR("Failed to create index file for stream %" PRIu64
,
1180 stream
->stream_handle
);
1181 /* Put self-ref for this index due to error. */
1182 relay_index_put(index
);
1188 if (relay_index_set_file(index
, stream
->index_file
, data_offset
)) {
1190 /* Put self-ref for this index due to error. */
1191 relay_index_put(index
);
1196 ret
= relay_index_try_flush(index
);
1198 tracefile_array_file_rotate(stream
->tfa
, TRACEFILE_ROTATE_READ
);
1199 tracefile_array_commit_seq(stream
->tfa
, stream
->index_received_seqcount
);
1200 stream
->index_received_seqcount
++;
1201 LTTNG_OPTIONAL_SET(&stream
->received_packet_seq_num
,
1202 be64toh(index
->index_data
.packet_seq_num
));
1204 } else if (ret
> 0) {
1205 index
->total_size
= total_size
;
1212 * relay_index_try_flush is responsible for the self-reference
1213 * put of the index object on error.
1215 ERR("relay_index_try_flush error %d", ret
);
1222 int stream_complete_packet(struct relay_stream
*stream
, size_t packet_total_size
,
1223 uint64_t sequence_number
, bool index_flushed
)
1227 ASSERT_LOCKED(stream
->lock
);
1229 stream
->tracefile_size_current
+= packet_total_size
;
1230 if (index_flushed
) {
1231 stream
->pos_after_last_complete_data_index
=
1232 stream
->tracefile_size_current
;
1233 stream
->prev_index_seq
= sequence_number
;
1234 ret
= try_rotate_stream_index(stream
);
1240 stream
->prev_data_seq
= sequence_number
;
1241 ret
= try_rotate_stream_data(stream
);
1247 int stream_add_index(struct relay_stream
*stream
,
1248 const struct lttcomm_relayd_index
*index_info
)
1251 struct relay_index
*index
;
1253 ASSERT_LOCKED(stream
->lock
);
1255 DBG("stream_add_index for stream %" PRIu64
, stream
->stream_handle
);
1257 /* Live beacon handling */
1258 if (index_info
->packet_size
== 0) {
1259 DBG("Received live beacon for stream %" PRIu64
,
1260 stream
->stream_handle
);
1263 * Only flag a stream inactive when it has already
1264 * received data and no indexes are in flight.
1266 if (stream
->index_received_seqcount
> 0
1267 && stream
->indexes_in_flight
== 0) {
1268 stream
->beacon_ts_end
= index_info
->timestamp_end
;
1273 stream
->beacon_ts_end
= -1ULL;
1276 if (stream
->ctf_stream_id
== -1ULL) {
1277 stream
->ctf_stream_id
= index_info
->stream_id
;
1280 index
= relay_index_get_by_id_or_create(stream
, index_info
->net_seq_num
);
1283 ERR("Failed to get or create index %" PRIu64
,
1284 index_info
->net_seq_num
);
1287 if (relay_index_set_control_data(index
, index_info
,
1288 stream
->trace
->session
->minor
)) {
1289 ERR("set_index_control_data error");
1290 relay_index_put(index
);
1294 ret
= relay_index_try_flush(index
);
1296 tracefile_array_file_rotate(stream
->tfa
, TRACEFILE_ROTATE_READ
);
1297 tracefile_array_commit_seq(stream
->tfa
, stream
->index_received_seqcount
);
1298 stream
->index_received_seqcount
++;
1299 stream
->pos_after_last_complete_data_index
+= index
->total_size
;
1300 stream
->prev_index_seq
= index_info
->net_seq_num
;
1301 LTTNG_OPTIONAL_SET(&stream
->received_packet_seq_num
,
1302 index_info
->packet_seq_num
);
1304 ret
= try_rotate_stream_index(stream
);
1308 ret
= try_rotate_stream_data(stream
);
1312 } else if (ret
> 0) {
1319 * relay_index_try_flush is responsible for the self-reference
1320 * put of the index object on error.
1322 ERR("relay_index_try_flush error %d", ret
);
1329 static void print_stream_indexes(struct relay_stream
*stream
)
1331 struct lttng_ht_iter iter
;
1332 struct relay_index
*index
;
1335 cds_lfht_for_each_entry(stream
->indexes_ht
->ht
, &iter
.iter
, index
,
1337 DBG("index %p net_seq_num %" PRIu64
" refcount %ld"
1338 " stream %" PRIu64
" trace %" PRIu64
1339 " session %" PRIu64
,
1342 stream
->ref
.refcount
,
1343 index
->stream
->stream_handle
,
1344 index
->stream
->trace
->id
,
1345 index
->stream
->trace
->session
->id
);
1350 int stream_reset_file(struct relay_stream
*stream
)
1352 ASSERT_LOCKED(stream
->lock
);
1354 if (stream
->stream_fd
) {
1355 stream_fd_put(stream
->stream_fd
);
1356 stream
->stream_fd
= NULL
;
1359 DBG("%s: reset tracefile_size_current for stream %" PRIu64
" was %" PRIu64
,
1360 __func__
, stream
->stream_handle
, stream
->tracefile_size_current
);
1361 stream
->tracefile_size_current
= 0;
1362 stream
->prev_data_seq
= 0;
1363 stream
->prev_index_seq
= 0;
1364 /* Note that this does not reset the tracefile array. */
1365 stream
->tracefile_current_index
= 0;
1366 stream
->pos_after_last_complete_data_index
= 0;
1368 return stream_create_data_output_file_from_trace_chunk(stream
,
1369 stream
->trace_chunk
, true, &stream
->stream_fd
);
1372 void print_relay_streams(void)
1374 struct lttng_ht_iter iter
;
1375 struct relay_stream
*stream
;
1377 if (!relay_streams_ht
) {
1382 cds_lfht_for_each_entry(relay_streams_ht
->ht
, &iter
.iter
, stream
,
1384 if (!stream_get(stream
)) {
1387 DBG("stream %p refcount %ld stream %" PRIu64
" trace %" PRIu64
1388 " session %" PRIu64
,
1390 stream
->ref
.refcount
,
1391 stream
->stream_handle
,
1393 stream
->trace
->session
->id
);
1394 print_stream_indexes(stream
);