2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
4 * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 * Copyright (C) 2019 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * SPDX-License-Identifier: GPL-2.0-only
13 #include <common/common.h>
14 #include <common/defaults.h>
15 #include <common/fs-handle.h>
16 #include <common/sessiond-comm/relayd.h>
17 #include <common/utils.h>
19 #include <urcu/rculist.h>
21 #include "lttng-relayd.h"
24 #include "viewer-stream.h"
26 #include <sys/types.h>
29 #define FILE_IO_STACK_BUFFER_SIZE 65536
31 /* Should be called with RCU read-side lock held. */
32 bool stream_get(struct relay_stream
*stream
)
34 ASSERT_RCU_READ_LOCKED();
36 return urcu_ref_get_unless_zero(&stream
->ref
);
40 * Get stream from stream id from the streams hash table. Return stream
41 * if found else NULL. A stream reference is taken when a stream is
42 * returned. stream_put() must be called on that stream.
44 struct relay_stream
*stream_get_by_id(uint64_t stream_id
)
46 struct lttng_ht_node_u64
*node
;
47 struct lttng_ht_iter iter
;
48 struct relay_stream
*stream
= NULL
;
51 lttng_ht_lookup(relay_streams_ht
, &stream_id
, &iter
);
52 node
= lttng_ht_iter_get_node_u64(&iter
);
54 DBG("Relay stream %" PRIu64
" not found", stream_id
);
57 stream
= caa_container_of(node
, struct relay_stream
, node
);
58 if (!stream_get(stream
)) {
66 static void stream_complete_rotation(struct relay_stream
*stream
)
68 DBG("Rotation completed for stream %" PRIu64
, stream
->stream_handle
);
69 if (stream
->ongoing_rotation
.value
.next_trace_chunk
) {
70 tracefile_array_reset(stream
->tfa
);
71 tracefile_array_commit_seq(stream
->tfa
,
72 stream
->index_received_seqcount
);
74 lttng_trace_chunk_put(stream
->trace_chunk
);
75 stream
->trace_chunk
= stream
->ongoing_rotation
.value
.next_trace_chunk
;
76 stream
->ongoing_rotation
= LTTNG_OPTIONAL_INIT_UNSET
;
77 stream
->completed_rotation_count
++;
80 static int stream_create_data_output_file_from_trace_chunk(
81 struct relay_stream
*stream
,
82 struct lttng_trace_chunk
*trace_chunk
,
84 struct fs_handle
**out_file
)
87 char stream_path
[LTTNG_PATH_MAX
];
88 enum lttng_trace_chunk_status status
;
89 const int flags
= O_RDWR
| O_CREAT
| O_TRUNC
;
90 const mode_t mode
= S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IWGRP
;
92 ASSERT_LOCKED(stream
->lock
);
94 ret
= utils_stream_file_path(stream
->path_name
, stream
->channel_name
,
95 stream
->tracefile_size
, stream
->tracefile_current_index
,
96 NULL
, stream_path
, sizeof(stream_path
));
101 if (stream
->tracefile_wrapped_around
|| force_unlink
) {
103 * The on-disk ring-buffer has wrapped around.
104 * Newly created stream files will replace existing files. Since
105 * live clients may be consuming existing files, the file about
106 * to be replaced is unlinked in order to not overwrite its
109 status
= (lttng_trace_chunk_status
) lttng_trace_chunk_unlink_file(trace_chunk
,
111 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
112 PERROR("Failed to unlink stream file \"%s\" during trace file rotation",
115 * Don't abort if the file doesn't exist, it is
116 * unexpected, but should not be a fatal error.
118 if (errno
!= ENOENT
) {
125 status
= lttng_trace_chunk_open_fs_handle(trace_chunk
, stream_path
,
126 flags
, mode
, out_file
, false);
127 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
128 ERR("Failed to open stream file \"%s\"", stream
->channel_name
);
136 static int stream_rotate_data_file(struct relay_stream
*stream
)
140 DBG("Rotating stream %" PRIu64
" data file with size %" PRIu64
,
141 stream
->stream_handle
, stream
->tracefile_size_current
);
144 fs_handle_close(stream
->file
);
148 stream
->tracefile_wrapped_around
= false;
149 stream
->tracefile_current_index
= 0;
151 if (stream
->ongoing_rotation
.value
.next_trace_chunk
) {
152 enum lttng_trace_chunk_status chunk_status
;
154 chunk_status
= lttng_trace_chunk_create_subdirectory(
155 stream
->ongoing_rotation
.value
.next_trace_chunk
,
157 if (chunk_status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
162 /* Rotate the data file. */
163 ret
= stream_create_data_output_file_from_trace_chunk(stream
,
164 stream
->ongoing_rotation
.value
.next_trace_chunk
,
165 false, &stream
->file
);
167 ERR("Failed to rotate stream data file");
171 DBG("%s: reset tracefile_size_current for stream %" PRIu64
" was %" PRIu64
,
172 __func__
, stream
->stream_handle
, stream
->tracefile_size_current
);
173 stream
->tracefile_size_current
= 0;
174 stream
->pos_after_last_complete_data_index
= 0;
175 stream
->ongoing_rotation
.value
.data_rotated
= true;
177 if (stream
->ongoing_rotation
.value
.index_rotated
) {
178 /* Rotation completed; reset its state. */
179 stream_complete_rotation(stream
);
186 * If too much data has been written in a tracefile before we received the
187 * rotation command, we have to move the excess data to the new tracefile and
188 * perform the rotation. This can happen because the control and data
189 * connections are separate, the indexes as well as the commands arrive from
190 * the control connection and we have no control over the order so we could be
191 * in a situation where too much data has been received on the data connection
192 * before the rotation command on the control connection arrives.
194 static int rotate_truncate_stream(struct relay_stream
*stream
)
197 off_t lseek_ret
, previous_stream_copy_origin
;
198 uint64_t copy_bytes_left
, misplaced_data_size
;
199 bool acquired_reference
;
200 struct fs_handle
*previous_stream_file
= NULL
;
201 struct lttng_trace_chunk
*previous_chunk
= NULL
;
203 if (!LTTNG_OPTIONAL_GET(stream
->ongoing_rotation
).next_trace_chunk
) {
204 ERR("Protocol error encoutered in %s(): stream rotation "
205 "sequence number is before the current sequence number "
206 "and the next trace chunk is unset. Honoring this "
207 "rotation command would result in data loss",
213 ASSERT_LOCKED(stream
->lock
);
215 * Acquire a reference to the current trace chunk to ensure
216 * it is not reclaimed when `stream_rotate_data_file` is called.
217 * Failing to do so would violate the contract of the trace
218 * chunk API as an active file descriptor would outlive the
221 acquired_reference
= lttng_trace_chunk_get(stream
->trace_chunk
);
222 LTTNG_ASSERT(acquired_reference
);
223 previous_chunk
= stream
->trace_chunk
;
226 * Steal the stream's reference to its stream_fd. A new
227 * stream_fd will be created when the rotation completes and
228 * the orinal stream_fd will be used to copy the "extra" data
231 LTTNG_ASSERT(stream
->file
);
232 previous_stream_file
= stream
->file
;
235 LTTNG_ASSERT(!stream
->is_metadata
);
236 LTTNG_ASSERT(stream
->tracefile_size_current
>
237 stream
->pos_after_last_complete_data_index
);
238 misplaced_data_size
= stream
->tracefile_size_current
-
239 stream
->pos_after_last_complete_data_index
;
240 copy_bytes_left
= misplaced_data_size
;
241 previous_stream_copy_origin
= stream
->pos_after_last_complete_data_index
;
243 ret
= stream_rotate_data_file(stream
);
248 LTTNG_ASSERT(stream
->file
);
250 * Seek the current tracefile to the position at which the rotation
251 * should have occurred.
253 lseek_ret
= fs_handle_seek(previous_stream_file
, previous_stream_copy_origin
, SEEK_SET
);
255 PERROR("Failed to seek to offset %" PRIu64
256 " while copying extra data received before a stream rotation",
257 (uint64_t) previous_stream_copy_origin
);
262 /* Move data from the old file to the new file. */
263 while (copy_bytes_left
) {
265 char copy_buffer
[FILE_IO_STACK_BUFFER_SIZE
];
266 const off_t copy_size_this_pass
= std::min
<uint64_t>(copy_bytes_left
, sizeof(copy_buffer
));
268 io_ret
= fs_handle_read(previous_stream_file
, copy_buffer
,
269 copy_size_this_pass
);
270 if (io_ret
< (ssize_t
) copy_size_this_pass
) {
272 PERROR("Failed to read %" PRIu64
273 " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64
,
275 __FUNCTION__
, io_ret
,
276 stream
->stream_handle
);
278 ERR("Failed to read %" PRIu64
279 " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64
,
281 __FUNCTION__
, io_ret
,
282 stream
->stream_handle
);
288 io_ret
= fs_handle_write(
289 stream
->file
, copy_buffer
, copy_size_this_pass
);
290 if (io_ret
< (ssize_t
) copy_size_this_pass
) {
292 PERROR("Failed to write %" PRIu64
293 " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64
,
295 __FUNCTION__
, io_ret
,
296 stream
->stream_handle
);
298 ERR("Failed to write %" PRIu64
299 " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64
,
301 __FUNCTION__
, io_ret
,
302 stream
->stream_handle
);
307 copy_bytes_left
-= copy_size_this_pass
;
310 /* Truncate the file to get rid of the excess data. */
311 ret
= fs_handle_truncate(
312 previous_stream_file
, previous_stream_copy_origin
);
314 PERROR("Failed to truncate current stream file to offset %" PRIu64
,
315 previous_stream_copy_origin
);
320 * Update the offset and FD of all the eventual indexes created by the
321 * data connection before the rotation command arrived.
323 ret
= relay_index_switch_all_files(stream
);
325 ERR("Failed to rotate index file");
329 stream
->tracefile_size_current
= misplaced_data_size
;
330 /* Index and data contents are back in sync. */
331 stream
->pos_after_last_complete_data_index
= 0;
334 lttng_trace_chunk_put(previous_chunk
);
339 * Check if a stream's data file (as opposed to index) should be rotated
340 * (for session rotation).
341 * Must be called with the stream lock held.
343 * Return 0 on success, a negative value on error.
345 static int try_rotate_stream_data(struct relay_stream
*stream
)
349 if (caa_likely(!stream
->ongoing_rotation
.is_set
)) {
350 /* No rotation expected. */
354 if (stream
->ongoing_rotation
.value
.data_rotated
) {
355 /* Rotation of the data file has already occurred. */
359 DBG("%s: Stream %" PRIu64
360 " (rotate_at_index_packet_seq_num = %" PRIu64
361 ", rotate_at_prev_data_net_seq = %" PRIu64
362 ", prev_data_seq = %" PRIu64
")",
363 __func__
, stream
->stream_handle
,
364 stream
->ongoing_rotation
.value
.packet_seq_num
,
365 stream
->ongoing_rotation
.value
.prev_data_net_seq
,
366 stream
->prev_data_seq
);
368 if (stream
->prev_data_seq
== -1ULL ||
369 stream
->ongoing_rotation
.value
.prev_data_net_seq
== -1ULL ||
370 stream
->prev_data_seq
<
371 stream
->ongoing_rotation
.value
.prev_data_net_seq
) {
373 * The next packet that will be written is not part of the next
376 DBG("Stream %" PRIu64
" data not yet ready for rotation "
377 "(rotate_at_index_packet_seq_num = %" PRIu64
378 ", rotate_at_prev_data_net_seq = %" PRIu64
379 ", prev_data_seq = %" PRIu64
")",
380 stream
->stream_handle
,
381 stream
->ongoing_rotation
.value
.packet_seq_num
,
382 stream
->ongoing_rotation
.value
.prev_data_net_seq
,
383 stream
->prev_data_seq
);
385 } else if (stream
->prev_data_seq
> stream
->ongoing_rotation
.value
.prev_data_net_seq
) {
387 * prev_data_seq is checked here since indexes and rotation
388 * commands are serialized with respect to each other.
390 DBG("Rotation after too much data has been written in tracefile "
391 "for stream %" PRIu64
", need to truncate before "
392 "rotating", stream
->stream_handle
);
393 ret
= rotate_truncate_stream(stream
);
395 ERR("Failed to truncate stream");
399 ret
= stream_rotate_data_file(stream
);
407 * Close the current index file if it is open, and create a new one.
409 * Return 0 on success, -1 on error.
411 static int create_index_file(struct relay_stream
*stream
,
412 struct lttng_trace_chunk
*chunk
)
415 uint32_t major
, minor
;
416 char *index_subpath
= NULL
;
417 enum lttng_trace_chunk_status status
;
419 ASSERT_LOCKED(stream
->lock
);
421 /* Put ref on previous index_file. */
422 if (stream
->index_file
) {
423 lttng_index_file_put(stream
->index_file
);
424 stream
->index_file
= NULL
;
426 major
= stream
->trace
->session
->major
;
427 minor
= stream
->trace
->session
->minor
;
433 ret
= asprintf(&index_subpath
, "%s/%s", stream
->path_name
,
439 status
= lttng_trace_chunk_create_subdirectory(chunk
,
442 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
446 status
= lttng_index_file_create_from_trace_chunk(
447 chunk
, stream
->path_name
,
448 stream
->channel_name
, stream
->tracefile_size
,
449 stream
->tracefile_current_index
,
450 lttng_to_index_major(major
, minor
),
451 lttng_to_index_minor(major
, minor
), true,
452 &stream
->index_file
);
453 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
465 * Check if a stream's index file should be rotated (for session rotation).
466 * Must be called with the stream lock held.
468 * Return 0 on success, a negative value on error.
470 static int try_rotate_stream_index(struct relay_stream
*stream
)
474 if (!stream
->ongoing_rotation
.is_set
) {
475 /* No rotation expected. */
479 if (stream
->ongoing_rotation
.value
.index_rotated
) {
480 /* Rotation of the index has already occurred. */
484 DBG("%s: Stream %" PRIu64
485 " (rotate_at_packet_seq_num = %" PRIu64
486 ", received_packet_seq_num = "
487 "(value = %" PRIu64
", is_set = %" PRIu8
"))",
488 __func__
, stream
->stream_handle
,
489 stream
->ongoing_rotation
.value
.packet_seq_num
,
490 stream
->received_packet_seq_num
.value
,
491 stream
->received_packet_seq_num
.is_set
);
493 if (!stream
->received_packet_seq_num
.is_set
||
494 LTTNG_OPTIONAL_GET(stream
->received_packet_seq_num
) + 1 <
495 stream
->ongoing_rotation
.value
.packet_seq_num
) {
496 DBG("Stream %" PRIu64
" index not yet ready for rotation "
497 "(rotate_at_packet_seq_num = %" PRIu64
498 ", received_packet_seq_num = "
499 "(value = %" PRIu64
", is_set = %" PRIu8
"))",
500 stream
->stream_handle
,
501 stream
->ongoing_rotation
.value
.packet_seq_num
,
502 stream
->received_packet_seq_num
.value
,
503 stream
->received_packet_seq_num
.is_set
);
507 * The next index belongs to the new trace chunk; rotate.
508 * In overwrite mode, the packet seq num may jump over the
511 LTTNG_ASSERT(LTTNG_OPTIONAL_GET(stream
->received_packet_seq_num
) + 1 >=
512 stream
->ongoing_rotation
.value
.packet_seq_num
);
513 DBG("Rotating stream %" PRIu64
" index file",
514 stream
->stream_handle
);
515 if (stream
->index_file
) {
516 lttng_index_file_put(stream
->index_file
);
517 stream
->index_file
= NULL
;
519 stream
->ongoing_rotation
.value
.index_rotated
= true;
522 * Set the rotation pivot position for the data, now that we have the
523 * net_seq_num matching the packet_seq_num index pivot position.
525 stream
->ongoing_rotation
.value
.prev_data_net_seq
=
526 stream
->prev_index_seq
;
527 if (stream
->ongoing_rotation
.value
.data_rotated
&&
528 stream
->ongoing_rotation
.value
.index_rotated
) {
529 /* Rotation completed; reset its state. */
530 DBG("Rotation completed for stream %" PRIu64
,
531 stream
->stream_handle
);
532 stream_complete_rotation(stream
);
540 static int stream_set_trace_chunk(struct relay_stream
*stream
,
541 struct lttng_trace_chunk
*chunk
)
544 enum lttng_trace_chunk_status status
;
545 bool acquired_reference
;
547 status
= lttng_trace_chunk_create_subdirectory(chunk
,
549 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
554 lttng_trace_chunk_put(stream
->trace_chunk
);
555 acquired_reference
= lttng_trace_chunk_get(chunk
);
556 LTTNG_ASSERT(acquired_reference
);
557 stream
->trace_chunk
= chunk
;
560 fs_handle_close(stream
->file
);
563 ret
= stream_create_data_output_file_from_trace_chunk(stream
, chunk
,
564 false, &stream
->file
);
570 * We keep ownership of path_name and channel_name.
572 struct relay_stream
*stream_create(struct ctf_trace
*trace
,
573 uint64_t stream_handle
, char *path_name
,
574 char *channel_name
, uint64_t tracefile_size
,
575 uint64_t tracefile_count
)
578 struct relay_stream
*stream
= NULL
;
579 struct relay_session
*session
= trace
->session
;
580 bool acquired_reference
= false;
581 struct lttng_trace_chunk
*current_trace_chunk
;
583 stream
= (relay_stream
*) zmalloc(sizeof(struct relay_stream
));
584 if (stream
== NULL
) {
585 PERROR("relay stream zmalloc");
589 stream
->stream_handle
= stream_handle
;
590 stream
->prev_data_seq
= -1ULL;
591 stream
->prev_index_seq
= -1ULL;
592 stream
->last_net_seq_num
= -1ULL;
593 stream
->ctf_stream_id
= -1ULL;
594 stream
->tracefile_size
= tracefile_size
;
595 stream
->tracefile_count
= tracefile_count
;
596 stream
->path_name
= path_name
;
597 stream
->channel_name
= channel_name
;
598 stream
->beacon_ts_end
= -1ULL;
599 lttng_ht_node_init_u64(&stream
->node
, stream
->stream_handle
);
600 pthread_mutex_init(&stream
->lock
, NULL
);
601 urcu_ref_init(&stream
->ref
);
602 ctf_trace_get(trace
);
603 stream
->trace
= trace
;
605 pthread_mutex_lock(&trace
->session
->lock
);
606 current_trace_chunk
= trace
->session
->current_trace_chunk
;
607 if (current_trace_chunk
) {
608 acquired_reference
= lttng_trace_chunk_get(current_trace_chunk
);
610 pthread_mutex_unlock(&trace
->session
->lock
);
611 if (!acquired_reference
) {
612 ERR("Cannot create stream for channel \"%s\" as a reference to the session's current trace chunk could not be acquired",
618 stream
->indexes_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
619 if (!stream
->indexes_ht
) {
620 ERR("Cannot created indexes_ht");
625 pthread_mutex_lock(&stream
->lock
);
626 ret
= stream_set_trace_chunk(stream
, current_trace_chunk
);
627 pthread_mutex_unlock(&stream
->lock
);
629 ERR("Failed to set the current trace chunk of session \"%s\" on newly created stream of channel \"%s\"",
630 trace
->session
->session_name
,
631 stream
->channel_name
);
635 stream
->tfa
= tracefile_array_create(stream
->tracefile_count
);
641 stream
->is_metadata
= !strcmp(stream
->channel_name
,
642 DEFAULT_METADATA_NAME
);
643 stream
->in_recv_list
= true;
646 * Add the stream in the recv list of the session. Once the end stream
647 * message is received, all session streams are published.
649 pthread_mutex_lock(&session
->recv_list_lock
);
650 cds_list_add_rcu(&stream
->recv_node
, &session
->recv_list
);
651 session
->stream_count
++;
652 pthread_mutex_unlock(&session
->recv_list_lock
);
655 * Both in the ctf_trace object and the global stream ht since the data
656 * side of the relayd does not have the concept of session.
658 lttng_ht_add_unique_u64(relay_streams_ht
, &stream
->node
);
659 stream
->in_stream_ht
= true;
661 DBG("Relay new stream added %s with ID %" PRIu64
, stream
->channel_name
,
662 stream
->stream_handle
);
668 fs_handle_close(stream
->file
);
674 if (acquired_reference
) {
675 lttng_trace_chunk_put(current_trace_chunk
);
681 * path_name and channel_name need to be freed explicitly here
682 * because we cannot rely on stream_put().
690 * Called with the session lock held.
692 void stream_publish(struct relay_stream
*stream
)
694 struct relay_session
*session
;
696 pthread_mutex_lock(&stream
->lock
);
697 if (stream
->published
) {
701 session
= stream
->trace
->session
;
703 pthread_mutex_lock(&session
->recv_list_lock
);
704 if (stream
->in_recv_list
) {
705 cds_list_del_rcu(&stream
->recv_node
);
706 stream
->in_recv_list
= false;
708 pthread_mutex_unlock(&session
->recv_list_lock
);
710 pthread_mutex_lock(&stream
->trace
->stream_list_lock
);
711 cds_list_add_rcu(&stream
->stream_node
, &stream
->trace
->stream_list
);
712 pthread_mutex_unlock(&stream
->trace
->stream_list_lock
);
714 stream
->published
= true;
716 pthread_mutex_unlock(&stream
->lock
);
720 * Stream must be protected by holding the stream lock or by virtue of being
721 * called from stream_destroy.
723 static void stream_unpublish(struct relay_stream
*stream
)
725 if (stream
->in_stream_ht
) {
726 struct lttng_ht_iter iter
;
729 iter
.iter
.node
= &stream
->node
.node
;
730 ret
= lttng_ht_del(relay_streams_ht
, &iter
);
732 stream
->in_stream_ht
= false;
734 if (stream
->published
) {
735 pthread_mutex_lock(&stream
->trace
->stream_list_lock
);
736 cds_list_del_rcu(&stream
->stream_node
);
737 pthread_mutex_unlock(&stream
->trace
->stream_list_lock
);
738 stream
->published
= false;
742 static void stream_destroy(struct relay_stream
*stream
)
744 if (stream
->indexes_ht
) {
746 * Calling lttng_ht_destroy in call_rcu worker thread so
747 * we don't hold the RCU read-side lock while calling
750 lttng_ht_destroy(stream
->indexes_ht
);
753 tracefile_array_destroy(stream
->tfa
);
755 free(stream
->path_name
);
756 free(stream
->channel_name
);
760 static void stream_destroy_rcu(struct rcu_head
*rcu_head
)
762 struct relay_stream
*stream
=
763 caa_container_of(rcu_head
, struct relay_stream
, rcu_node
);
765 stream_destroy(stream
);
769 * No need to take stream->lock since this is only called on the final
770 * stream_put which ensures that a single thread may act on the stream.
772 static void stream_release(struct urcu_ref
*ref
)
774 struct relay_stream
*stream
=
775 caa_container_of(ref
, struct relay_stream
, ref
);
776 struct relay_session
*session
;
778 session
= stream
->trace
->session
;
780 DBG("Releasing stream id %" PRIu64
, stream
->stream_handle
);
782 pthread_mutex_lock(&session
->recv_list_lock
);
783 session
->stream_count
--;
784 if (stream
->in_recv_list
) {
785 cds_list_del_rcu(&stream
->recv_node
);
786 stream
->in_recv_list
= false;
788 pthread_mutex_unlock(&session
->recv_list_lock
);
790 stream_unpublish(stream
);
793 fs_handle_close(stream
->file
);
796 if (stream
->index_file
) {
797 lttng_index_file_put(stream
->index_file
);
798 stream
->index_file
= NULL
;
801 ctf_trace_put(stream
->trace
);
802 stream
->trace
= NULL
;
804 stream_complete_rotation(stream
);
805 lttng_trace_chunk_put(stream
->trace_chunk
);
806 stream
->trace_chunk
= NULL
;
808 call_rcu(&stream
->rcu_node
, stream_destroy_rcu
);
811 void stream_put(struct relay_stream
*stream
)
814 LTTNG_ASSERT(stream
->ref
.refcount
!= 0);
816 * Wait until we have processed all the stream packets before
817 * actually putting our last stream reference.
819 urcu_ref_put(&stream
->ref
, stream_release
);
823 int stream_set_pending_rotation(struct relay_stream
*stream
,
824 struct lttng_trace_chunk
*next_trace_chunk
,
825 uint64_t rotation_sequence_number
)
828 const struct relay_stream_rotation rotation
= {
829 .data_rotated
= false,
830 .index_rotated
= false,
831 .packet_seq_num
= rotation_sequence_number
,
832 .prev_data_net_seq
= -1ULL,
833 .next_trace_chunk
= next_trace_chunk
,
836 if (stream
->ongoing_rotation
.is_set
) {
837 ERR("Attempted to set a pending rotation on a stream already being rotated (protocol error)");
842 if (next_trace_chunk
) {
843 const bool reference_acquired
=
844 lttng_trace_chunk_get(next_trace_chunk
);
846 LTTNG_ASSERT(reference_acquired
);
848 LTTNG_OPTIONAL_SET(&stream
->ongoing_rotation
, rotation
);
850 DBG("Setting pending rotation: stream_id = %" PRIu64
851 ", rotate_at_packet_seq_num = %" PRIu64
,
852 stream
->stream_handle
, rotation_sequence_number
);
853 if (stream
->is_metadata
) {
855 * A metadata stream has no index; consider it already rotated.
857 stream
->ongoing_rotation
.value
.index_rotated
= true;
858 if (next_trace_chunk
) {
860 * The metadata will be received again in the new chunk.
862 stream
->metadata_received
= 0;
864 ret
= stream_rotate_data_file(stream
);
866 ret
= try_rotate_stream_index(stream
);
871 ret
= try_rotate_stream_data(stream
);
880 void try_stream_close(struct relay_stream
*stream
)
882 bool session_aborted
;
883 struct relay_session
*session
= stream
->trace
->session
;
885 DBG("Trying to close stream %" PRIu64
, stream
->stream_handle
);
887 pthread_mutex_lock(&session
->lock
);
888 session_aborted
= session
->aborted
;
889 pthread_mutex_unlock(&session
->lock
);
891 pthread_mutex_lock(&stream
->lock
);
893 * Can be called concurently by connection close and reception of last
896 if (stream
->closed
) {
897 pthread_mutex_unlock(&stream
->lock
);
898 DBG("closing stream %" PRIu64
" aborted since it is already marked as closed", stream
->stream_handle
);
902 stream
->close_requested
= true;
904 if (stream
->last_net_seq_num
== -1ULL) {
906 * Handle connection close without explicit stream close
909 * We can be clever about indexes partially received in
910 * cases where we received the data socket part, but not
911 * the control socket part: since we're currently closing
912 * the stream on behalf of the control socket, we *know*
913 * there won't be any more control information for this
914 * socket. Therefore, we can destroy all indexes for
915 * which we have received only the file descriptor (from
916 * data socket). This takes care of consumerd crashes
917 * between sending the data and control information for
918 * a packet. Since those are sent in that order, we take
919 * care of consumerd crashes.
921 DBG("relay_index_close_partial_fd");
922 relay_index_close_partial_fd(stream
);
924 * Use the highest net_seq_num we currently have pending
925 * As end of stream indicator. Leave last_net_seq_num
926 * at -1ULL if we cannot find any index.
928 stream
->last_net_seq_num
= relay_index_find_last(stream
);
929 DBG("Updating stream->last_net_seq_num to %" PRIu64
, stream
->last_net_seq_num
);
930 /* Fall-through into the next check. */
933 if (stream
->last_net_seq_num
!= -1ULL &&
934 ((int64_t) (stream
->prev_data_seq
- stream
->last_net_seq_num
)) < 0
935 && !session_aborted
) {
937 * Don't close since we still have data pending. This
938 * handles cases where an explicit close command has
939 * been received for this stream, and cases where the
940 * connection has been closed, and we are awaiting for
941 * index information from the data socket. It is
942 * therefore expected that all the index fd information
943 * we need has already been received on the control
944 * socket. Matching index information from data socket
945 * should be Expected Soon(TM).
947 * TODO: We should implement a timer to garbage collect
948 * streams after a timeout to be resilient against a
949 * consumerd implementation that would not match this
952 pthread_mutex_unlock(&stream
->lock
);
953 DBG("closing stream %" PRIu64
" aborted since it still has data pending", stream
->stream_handle
);
957 * We received all the indexes we can expect.
959 stream_unpublish(stream
);
960 stream
->closed
= true;
961 /* Relay indexes are only used by the "consumer/sessiond" end. */
962 relay_index_close_all(stream
);
965 * If we are closed by an application exiting (per-pid buffers),
966 * we need to put our reference on the stream trace chunk right
967 * away, because otherwise still holding the reference on the
968 * trace chunk could allow a viewer stream (which holds a reference
969 * to the stream) to postpone destroy waiting for the chunk to cease
970 * to exist endlessly until the viewer is detached.
973 /* Put stream fd before put chunk. */
975 fs_handle_close(stream
->file
);
978 if (stream
->index_file
) {
979 lttng_index_file_put(stream
->index_file
);
980 stream
->index_file
= NULL
;
982 lttng_trace_chunk_put(stream
->trace_chunk
);
983 stream
->trace_chunk
= NULL
;
984 pthread_mutex_unlock(&stream
->lock
);
985 DBG("Succeeded in closing stream %" PRIu64
, stream
->stream_handle
);
989 int stream_init_packet(struct relay_stream
*stream
, size_t packet_size
,
994 ASSERT_LOCKED(stream
->lock
);
996 if (!stream
->file
|| !stream
->trace_chunk
) {
997 ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64
", channel_name = %s",
998 stream
->stream_handle
, stream
->channel_name
);
1003 if (caa_likely(stream
->tracefile_size
== 0)) {
1004 /* No size limit set; nothing to check. */
1009 * Check if writing the new packet would exceed the maximal file size.
1011 if (caa_unlikely((stream
->tracefile_size_current
+ packet_size
) >
1012 stream
->tracefile_size
)) {
1013 const uint64_t new_file_index
=
1014 (stream
->tracefile_current_index
+ 1) %
1015 stream
->tracefile_count
;
1017 if (new_file_index
< stream
->tracefile_current_index
) {
1018 stream
->tracefile_wrapped_around
= true;
1020 DBG("New stream packet causes stream file rotation: stream_id = %" PRIu64
1021 ", current_file_size = %" PRIu64
1022 ", packet_size = %zu, current_file_index = %" PRIu64
1023 " new_file_index = %" PRIu64
,
1024 stream
->stream_handle
,
1025 stream
->tracefile_size_current
, packet_size
,
1026 stream
->tracefile_current_index
, new_file_index
);
1027 tracefile_array_file_rotate(stream
->tfa
, TRACEFILE_ROTATE_WRITE
);
1028 stream
->tracefile_current_index
= new_file_index
;
1031 fs_handle_close(stream
->file
);
1032 stream
->file
= NULL
;
1034 ret
= stream_create_data_output_file_from_trace_chunk(stream
,
1035 stream
->trace_chunk
, false, &stream
->file
);
1037 ERR("Failed to perform trace file rotation of stream %" PRIu64
,
1038 stream
->stream_handle
);
1043 * Reset current size because we just performed a stream
1046 DBG("%s: reset tracefile_size_current for stream %" PRIu64
" was %" PRIu64
,
1047 __func__
, stream
->stream_handle
, stream
->tracefile_size_current
);
1048 stream
->tracefile_size_current
= 0;
1049 *file_rotated
= true;
1051 *file_rotated
= false;
1057 /* Note that the packet is not necessarily complete. */
1058 int stream_write(struct relay_stream
*stream
,
1059 const struct lttng_buffer_view
*packet
, size_t padding_len
)
1063 size_t padding_to_write
= padding_len
;
1064 char padding_buffer
[FILE_IO_STACK_BUFFER_SIZE
];
1066 ASSERT_LOCKED(stream
->lock
);
1067 memset(padding_buffer
, 0,
1068 std::min(sizeof(padding_buffer
), padding_to_write
));
1070 if (!stream
->file
|| !stream
->trace_chunk
) {
1071 ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64
", channel_name = %s",
1072 stream
->stream_handle
, stream
->channel_name
);
1077 write_ret
= fs_handle_write(
1078 stream
->file
, packet
->data
, packet
->size
);
1079 if (write_ret
!= packet
->size
) {
1080 PERROR("Failed to write to stream file of %sstream %" PRIu64
,
1081 stream
->is_metadata
? "metadata " : "",
1082 stream
->stream_handle
);
1088 while (padding_to_write
> 0) {
1089 const size_t padding_to_write_this_pass
=
1090 std::min(padding_to_write
, sizeof(padding_buffer
));
1092 write_ret
= fs_handle_write(stream
->file
, padding_buffer
,
1093 padding_to_write_this_pass
);
1094 if (write_ret
!= padding_to_write_this_pass
) {
1095 PERROR("Failed to write padding to file of %sstream %" PRIu64
,
1096 stream
->is_metadata
? "metadata " : "",
1097 stream
->stream_handle
);
1101 padding_to_write
-= padding_to_write_this_pass
;
1104 if (stream
->is_metadata
) {
1107 recv_len
= packet
? packet
->size
: 0;
1108 recv_len
+= padding_len
;
1109 stream
->metadata_received
+= recv_len
;
1111 stream
->no_new_metadata_notified
= false;
1115 DBG("Wrote to %sstream %" PRIu64
": data_length = %zu, padding_length = %zu",
1116 stream
->is_metadata
? "metadata " : "",
1117 stream
->stream_handle
,
1118 packet
? packet
->size
: (size_t) 0, padding_len
);
1124 * Update index after receiving a packet for a data stream.
1126 * Called with the stream lock held.
1128 * Return 0 on success else a negative value.
1130 int stream_update_index(struct relay_stream
*stream
, uint64_t net_seq_num
,
1131 bool rotate_index
, bool *flushed
, uint64_t total_size
)
1134 uint64_t data_offset
;
1135 struct relay_index
*index
;
1137 LTTNG_ASSERT(stream
->trace_chunk
);
1138 ASSERT_LOCKED(stream
->lock
);
1139 /* Get data offset because we are about to update the index. */
1140 data_offset
= htobe64(stream
->tracefile_size_current
);
1142 DBG("handle_index_data: stream %" PRIu64
" net_seq_num %" PRIu64
" data offset %" PRIu64
,
1143 stream
->stream_handle
, net_seq_num
, stream
->tracefile_size_current
);
1146 * Lookup for an existing index for that stream id/sequence
1147 * number. If it exists, the control thread has already received the
1148 * data for it, thus we need to write it to disk.
1150 index
= relay_index_get_by_id_or_create(stream
, net_seq_num
);
1156 if (rotate_index
|| !stream
->index_file
) {
1157 ret
= create_index_file(stream
, stream
->trace_chunk
);
1159 ERR("Failed to create index file for stream %" PRIu64
,
1160 stream
->stream_handle
);
1161 /* Put self-ref for this index due to error. */
1162 relay_index_put(index
);
1168 if (relay_index_set_file(index
, stream
->index_file
, data_offset
)) {
1170 /* Put self-ref for this index due to error. */
1171 relay_index_put(index
);
1176 ret
= relay_index_try_flush(index
);
1178 tracefile_array_file_rotate(stream
->tfa
, TRACEFILE_ROTATE_READ
);
1179 tracefile_array_commit_seq(stream
->tfa
, stream
->index_received_seqcount
);
1180 stream
->index_received_seqcount
++;
1181 LTTNG_OPTIONAL_SET(&stream
->received_packet_seq_num
,
1182 be64toh(index
->index_data
.packet_seq_num
));
1184 } else if (ret
> 0) {
1185 index
->total_size
= total_size
;
1192 * relay_index_try_flush is responsible for the self-reference
1193 * put of the index object on error.
1195 ERR("relay_index_try_flush error %d", ret
);
1202 int stream_complete_packet(struct relay_stream
*stream
, size_t packet_total_size
,
1203 uint64_t sequence_number
, bool index_flushed
)
1207 ASSERT_LOCKED(stream
->lock
);
1209 stream
->tracefile_size_current
+= packet_total_size
;
1210 if (index_flushed
) {
1211 stream
->pos_after_last_complete_data_index
=
1212 stream
->tracefile_size_current
;
1213 stream
->prev_index_seq
= sequence_number
;
1214 ret
= try_rotate_stream_index(stream
);
1220 stream
->prev_data_seq
= sequence_number
;
1221 ret
= try_rotate_stream_data(stream
);
1227 int stream_add_index(struct relay_stream
*stream
,
1228 const struct lttcomm_relayd_index
*index_info
)
1231 struct relay_index
*index
;
1233 ASSERT_LOCKED(stream
->lock
);
1235 DBG("stream_add_index for stream %" PRIu64
, stream
->stream_handle
);
1237 /* Live beacon handling */
1238 if (index_info
->packet_size
== 0) {
1239 DBG("Received live beacon for stream %" PRIu64
,
1240 stream
->stream_handle
);
1243 * Only flag a stream inactive when it has already
1244 * received data and no indexes are in flight.
1246 if (stream
->index_received_seqcount
> 0
1247 && stream
->indexes_in_flight
== 0) {
1248 stream
->beacon_ts_end
= index_info
->timestamp_end
;
1253 stream
->beacon_ts_end
= -1ULL;
1256 if (stream
->ctf_stream_id
== -1ULL) {
1257 stream
->ctf_stream_id
= index_info
->stream_id
;
1260 index
= relay_index_get_by_id_or_create(stream
, index_info
->net_seq_num
);
1263 ERR("Failed to get or create index %" PRIu64
,
1264 index_info
->net_seq_num
);
1267 if (relay_index_set_control_data(index
, index_info
,
1268 stream
->trace
->session
->minor
)) {
1269 ERR("set_index_control_data error");
1270 relay_index_put(index
);
1274 ret
= relay_index_try_flush(index
);
1276 tracefile_array_file_rotate(stream
->tfa
, TRACEFILE_ROTATE_READ
);
1277 tracefile_array_commit_seq(stream
->tfa
, stream
->index_received_seqcount
);
1278 stream
->index_received_seqcount
++;
1279 stream
->pos_after_last_complete_data_index
+= index
->total_size
;
1280 stream
->prev_index_seq
= index_info
->net_seq_num
;
1281 LTTNG_OPTIONAL_SET(&stream
->received_packet_seq_num
,
1282 index_info
->packet_seq_num
);
1284 ret
= try_rotate_stream_index(stream
);
1288 ret
= try_rotate_stream_data(stream
);
1292 } else if (ret
> 0) {
1299 * relay_index_try_flush is responsible for the self-reference
1300 * put of the index object on error.
1302 ERR("relay_index_try_flush error %d", ret
);
1309 static void print_stream_indexes(struct relay_stream
*stream
)
1311 struct lttng_ht_iter iter
;
1312 struct relay_index
*index
;
1315 cds_lfht_for_each_entry(stream
->indexes_ht
->ht
, &iter
.iter
, index
,
1317 DBG("index %p net_seq_num %" PRIu64
" refcount %ld"
1318 " stream %" PRIu64
" trace %" PRIu64
1319 " session %" PRIu64
,
1322 stream
->ref
.refcount
,
1323 index
->stream
->stream_handle
,
1324 index
->stream
->trace
->id
,
1325 index
->stream
->trace
->session
->id
);
1330 int stream_reset_file(struct relay_stream
*stream
)
1332 ASSERT_LOCKED(stream
->lock
);
1337 ret
= fs_handle_close(stream
->file
);
1339 ERR("Failed to close stream file handle: channel name = \"%s\", id = %" PRIu64
,
1340 stream
->channel_name
,
1341 stream
->stream_handle
);
1343 stream
->file
= NULL
;
1346 DBG("%s: reset tracefile_size_current for stream %" PRIu64
" was %" PRIu64
,
1347 __func__
, stream
->stream_handle
, stream
->tracefile_size_current
);
1348 stream
->tracefile_size_current
= 0;
1349 stream
->prev_data_seq
= 0;
1350 stream
->prev_index_seq
= 0;
1351 /* Note that this does not reset the tracefile array. */
1352 stream
->tracefile_current_index
= 0;
1353 stream
->pos_after_last_complete_data_index
= 0;
1355 return stream_create_data_output_file_from_trace_chunk(stream
,
1356 stream
->trace_chunk
, true, &stream
->file
);
1359 void print_relay_streams(void)
1361 struct lttng_ht_iter iter
;
1362 struct relay_stream
*stream
;
1364 if (!relay_streams_ht
) {
1369 cds_lfht_for_each_entry(relay_streams_ht
->ht
, &iter
.iter
, stream
,
1371 if (!stream_get(stream
)) {
1374 DBG("stream %p refcount %ld stream %" PRIu64
" trace %" PRIu64
1375 " session %" PRIu64
,
1377 stream
->ref
.refcount
,
1378 stream
->stream_handle
,
1380 stream
->trace
->session
->id
);
1381 print_stream_indexes(stream
);