2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 * 2019 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License, version 2 only, as
9 * published by the Free Software Foundation.
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
16 * You should have received a copy of the GNU General Public License along with
17 * this program; if not, write to the Free Software Foundation, Inc., 51
18 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 #include <common/common.h>
23 #include <common/utils.h>
24 #include <common/defaults.h>
25 #include <common/sessiond-comm/relayd.h>
26 #include <urcu/rculist.h>
29 #include "lttng-relayd.h"
32 #include "viewer-stream.h"
34 #include <sys/types.h>
37 #define FILE_IO_STACK_BUFFER_SIZE 65536
39 /* Should be called with RCU read-side lock held. */
40 bool stream_get(struct relay_stream
*stream
)
42 return urcu_ref_get_unless_zero(&stream
->ref
);
46 * Get stream from stream id from the streams hash table. Return stream
47 * if found else NULL. A stream reference is taken when a stream is
48 * returned. stream_put() must be called on that stream.
50 struct relay_stream
*stream_get_by_id(uint64_t stream_id
)
52 struct lttng_ht_node_u64
*node
;
53 struct lttng_ht_iter iter
;
54 struct relay_stream
*stream
= NULL
;
57 lttng_ht_lookup(relay_streams_ht
, &stream_id
, &iter
);
58 node
= lttng_ht_iter_get_node_u64(&iter
);
60 DBG("Relay stream %" PRIu64
" not found", stream_id
);
63 stream
= caa_container_of(node
, struct relay_stream
, node
);
64 if (!stream_get(stream
)) {
72 static void stream_complete_rotation(struct relay_stream
*stream
)
74 DBG("Rotation completed for stream %" PRIu64
, stream
->stream_handle
);
75 lttng_trace_chunk_put(stream
->trace_chunk
);
76 stream
->trace_chunk
= stream
->ongoing_rotation
.value
.next_trace_chunk
;
77 stream
->ongoing_rotation
= (typeof(stream
->ongoing_rotation
)) {};
80 static int stream_create_data_output_file_from_trace_chunk(
81 struct relay_stream
*stream
,
82 struct lttng_trace_chunk
*trace_chunk
,
84 struct stream_fd
**out_stream_fd
)
87 char stream_path
[LTTNG_PATH_MAX
];
88 enum lttng_trace_chunk_status status
;
89 const int flags
= O_RDWR
| O_CREAT
| O_TRUNC
;
90 const mode_t mode
= S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IWGRP
;
92 ASSERT_LOCKED(stream
->lock
);
93 assert(stream
->trace_chunk
);
95 ret
= utils_stream_file_path(stream
->path_name
, stream
->channel_name
,
96 stream
->tracefile_size
, stream
->tracefile_current_index
,
97 NULL
, stream_path
, sizeof(stream_path
));
102 if (stream
->tracefile_wrapped_around
|| force_unlink
) {
104 * The on-disk ring-buffer has wrapped around.
105 * Newly created stream files will replace existing files. Since
106 * live clients may be consuming existing files, the file about
107 * to be replaced is unlinked in order to not overwrite its
110 status
= lttng_trace_chunk_unlink_file(trace_chunk
,
112 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
113 PERROR("Failed to unlink stream file \"%s\" during trace file rotation",
116 * Don't abort if the file doesn't exist, it is
117 * unexpected, but should not be a fatal error.
119 if (errno
!= ENOENT
) {
126 status
= lttng_trace_chunk_open_file(
127 trace_chunk
, stream_path
, flags
, mode
, &fd
);
128 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
129 ERR("Failed to open stream file \"%s\"", stream
->channel_name
);
134 *out_stream_fd
= stream_fd_create(fd
);
135 if (!*out_stream_fd
) {
137 PERROR("Error closing stream file descriptor %d", ret
);
146 static int stream_rotate_data_file(struct relay_stream
*stream
)
150 DBG("Rotating stream %" PRIu64
" data file",
151 stream
->stream_handle
);
153 if (stream
->stream_fd
) {
154 stream_fd_put(stream
->stream_fd
);
155 stream
->stream_fd
= NULL
;
158 stream
->tracefile_wrapped_around
= false;
159 stream
->tracefile_current_index
= 0;
161 if (stream
->ongoing_rotation
.value
.next_trace_chunk
) {
162 struct stream_fd
*new_stream_fd
= NULL
;
163 enum lttng_trace_chunk_status chunk_status
;
165 chunk_status
= lttng_trace_chunk_create_subdirectory(
166 stream
->ongoing_rotation
.value
.next_trace_chunk
,
168 if (chunk_status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
173 /* Rotate the data file. */
174 ret
= stream_create_data_output_file_from_trace_chunk(stream
,
175 stream
->ongoing_rotation
.value
.next_trace_chunk
,
176 false, &new_stream_fd
);
177 stream
->stream_fd
= new_stream_fd
;
179 ERR("Failed to rotate stream data file");
183 stream
->tracefile_size_current
= 0;
184 stream
->pos_after_last_complete_data_index
= 0;
185 stream
->ongoing_rotation
.value
.data_rotated
= true;
187 if (stream
->ongoing_rotation
.value
.index_rotated
) {
188 /* Rotation completed; reset its state. */
189 stream_complete_rotation(stream
);
196 * If too much data has been written in a tracefile before we received the
197 * rotation command, we have to move the excess data to the new tracefile and
198 * perform the rotation. This can happen because the control and data
199 * connections are separate, the indexes as well as the commands arrive from
200 * the control connection and we have no control over the order so we could be
201 * in a situation where too much data has been received on the data connection
202 * before the rotation command on the control connection arrives.
204 static int rotate_truncate_stream(struct relay_stream
*stream
)
207 off_t lseek_ret
, previous_stream_copy_origin
;
208 uint64_t copy_bytes_left
, misplaced_data_size
;
209 bool acquired_reference
;
210 struct stream_fd
*previous_stream_fd
= NULL
;
211 struct lttng_trace_chunk
*previous_chunk
= NULL
;
213 ASSERT_LOCKED(stream
->lock
);
215 * Acquire a reference to the current trace chunk to ensure
216 * it is not reclaimed when `stream_rotate_data_file` is called.
217 * Failing to do so would violate the contract of the trace
218 * chunk API as an active file descriptor would outlive the
221 acquired_reference
= lttng_trace_chunk_get(stream
->trace_chunk
);
222 assert(acquired_reference
);
223 previous_chunk
= stream
->trace_chunk
;
226 * Steal the stream's reference to its stream_fd. A new
227 * stream_fd will be created when the rotation completes and
228 * the orinal stream_fd will be used to copy the "extra" data
231 assert(stream
->stream_fd
);
232 previous_stream_fd
= stream
->stream_fd
;
233 stream
->stream_fd
= NULL
;
235 assert(!stream
->is_metadata
);
236 assert(stream
->tracefile_size_current
>
237 stream
->pos_after_last_complete_data_index
);
238 misplaced_data_size
= stream
->tracefile_size_current
-
239 stream
->pos_after_last_complete_data_index
;
240 copy_bytes_left
= misplaced_data_size
;
241 previous_stream_copy_origin
= stream
->pos_after_last_complete_data_index
;
243 ret
= stream_rotate_data_file(stream
);
249 * Seek the current tracefile to the position at which the rotation
250 * should have occurred.
252 lseek_ret
= lseek(previous_stream_fd
->fd
, previous_stream_copy_origin
,
255 PERROR("Failed to seek to offset %" PRIu64
256 " while copying extra data received before a stream rotation",
257 (uint64_t) previous_stream_copy_origin
);
262 /* Move data from the old file to the new file. */
263 while (copy_bytes_left
) {
265 char copy_buffer
[FILE_IO_STACK_BUFFER_SIZE
];
266 const off_t copy_size_this_pass
= min_t(
267 off_t
, copy_bytes_left
, sizeof(copy_buffer
));
269 io_ret
= lttng_read(previous_stream_fd
->fd
, copy_buffer
,
270 copy_size_this_pass
);
271 if (io_ret
< (ssize_t
) copy_size_this_pass
) {
273 PERROR("Failed to read %" PRIu64
274 " bytes from fd %i in %s(), returned %zi",
276 previous_stream_fd
->fd
,
277 __FUNCTION__
, io_ret
);
279 ERR("Failed to read %" PRIu64
280 " bytes from fd %i in %s(), returned %zi",
282 previous_stream_fd
->fd
,
283 __FUNCTION__
, io_ret
);
289 io_ret
= lttng_write(stream
->stream_fd
->fd
, copy_buffer
,
290 copy_size_this_pass
);
291 if (io_ret
< (ssize_t
) copy_size_this_pass
) {
293 PERROR("Failed to write %" PRIu64
294 " bytes from fd %i in %s(), returned %zi",
296 stream
->stream_fd
->fd
,
297 __FUNCTION__
, io_ret
);
299 ERR("Failed to write %" PRIu64
300 " bytes from fd %i in %s(), returned %zi",
302 stream
->stream_fd
->fd
,
303 __FUNCTION__
, io_ret
);
308 copy_bytes_left
-= copy_size_this_pass
;
311 /* Truncate the file to get rid of the excess data. */
312 ret
= ftruncate(previous_stream_fd
->fd
, previous_stream_copy_origin
);
314 PERROR("Failed to truncate current stream file to offset %" PRIu64
,
315 previous_stream_copy_origin
);
320 * Update the offset and FD of all the eventual indexes created by the
321 * data connection before the rotation command arrived.
323 ret
= relay_index_switch_all_files(stream
);
325 ERR("Failed to rotate index file");
329 stream
->tracefile_size_current
= misplaced_data_size
;
330 /* Index and data contents are back in sync. */
331 stream
->pos_after_last_complete_data_index
= 0;
334 lttng_trace_chunk_put(previous_chunk
);
335 stream_fd_put(previous_stream_fd
);
340 * Check if a stream's data file (as opposed to index) should be rotated
341 * (for session rotation).
342 * Must be called with the stream lock held.
344 * Return 0 on success, a negative value on error.
346 static int try_rotate_stream_data(struct relay_stream
*stream
)
350 if (caa_likely(!stream
->ongoing_rotation
.is_set
)) {
351 /* No rotation expected. */
355 if (stream
->ongoing_rotation
.value
.data_rotated
) {
356 /* Rotation of the data file has already occurred. */
360 if (stream
->prev_data_seq
== -1ULL ||
361 stream
->prev_data_seq
+ 1 < stream
->ongoing_rotation
.value
.seq_num
) {
363 * The next packet that will be written is not part of the next
366 DBG("Stream %" PRIu64
" not yet ready for rotation (rotate_at_seq_num = %" PRIu64
367 ", prev_data_seq = %" PRIu64
")",
368 stream
->stream_handle
,
369 stream
->ongoing_rotation
.value
.seq_num
,
370 stream
->prev_data_seq
);
372 } else if (stream
->prev_data_seq
> stream
->ongoing_rotation
.value
.seq_num
) {
374 * prev_data_seq is checked here since indexes and rotation
375 * commands are serialized with respect to each other.
377 DBG("Rotation after too much data has been written in tracefile "
378 "for stream %" PRIu64
", need to truncate before "
379 "rotating", stream
->stream_handle
);
380 ret
= rotate_truncate_stream(stream
);
382 ERR("Failed to truncate stream");
386 ret
= stream_rotate_data_file(stream
);
394 * Close the current index file if it is open, and create a new one.
396 * Return 0 on success, -1 on error.
398 static int create_index_file(struct relay_stream
*stream
,
399 struct lttng_trace_chunk
*chunk
)
402 uint32_t major
, minor
;
403 char *index_subpath
= NULL
;
404 enum lttng_trace_chunk_status status
;
406 ASSERT_LOCKED(stream
->lock
);
408 /* Put ref on previous index_file. */
409 if (stream
->index_file
) {
410 lttng_index_file_put(stream
->index_file
);
411 stream
->index_file
= NULL
;
413 major
= stream
->trace
->session
->major
;
414 minor
= stream
->trace
->session
->minor
;
420 ret
= asprintf(&index_subpath
, "%s/%s", stream
->path_name
,
426 status
= lttng_trace_chunk_create_subdirectory(chunk
,
429 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
433 stream
->index_file
= lttng_index_file_create_from_trace_chunk(
434 chunk
, stream
->path_name
,
435 stream
->channel_name
, stream
->tracefile_size
,
436 stream
->tracefile_current_index
,
437 lttng_to_index_major(major
, minor
),
438 lttng_to_index_minor(major
, minor
), true);
439 if (!stream
->index_file
) {
451 * Check if a stream's index file should be rotated (for session rotation).
452 * Must be called with the stream lock held.
454 * Return 0 on success, a negative value on error.
456 static int try_rotate_stream_index(struct relay_stream
*stream
)
460 if (!stream
->ongoing_rotation
.is_set
) {
461 /* No rotation expected. */
465 if (stream
->ongoing_rotation
.value
.index_rotated
) {
466 /* Rotation of the index has already occurred. */
470 if (stream
->prev_index_seq
== -1ULL ||
471 stream
->prev_index_seq
+ 1 < stream
->ongoing_rotation
.value
.seq_num
) {
472 DBG("Stream %" PRIu64
" index not yet ready for rotation (rotate_at_seq_num = %" PRIu64
", prev_index_seq = %" PRIu64
")",
473 stream
->stream_handle
,
474 stream
->ongoing_rotation
.value
.seq_num
,
475 stream
->prev_index_seq
);
478 /* The next index belongs to the new trace chunk; rotate. */
479 assert(stream
->prev_index_seq
+ 1 ==
480 stream
->ongoing_rotation
.value
.seq_num
);
481 DBG("Rotating stream %" PRIu64
" index file",
482 stream
->stream_handle
);
483 ret
= create_index_file(stream
,
484 stream
->ongoing_rotation
.value
.next_trace_chunk
);
485 stream
->ongoing_rotation
.value
.index_rotated
= true;
487 if (stream
->ongoing_rotation
.value
.data_rotated
&&
488 stream
->ongoing_rotation
.value
.index_rotated
) {
489 /* Rotation completed; reset its state. */
490 DBG("Rotation completed for stream %" PRIu64
,
491 stream
->stream_handle
);
492 stream_complete_rotation(stream
);
500 static int stream_set_trace_chunk(struct relay_stream
*stream
,
501 struct lttng_trace_chunk
*chunk
)
504 enum lttng_trace_chunk_status status
;
505 bool acquired_reference
;
506 struct stream_fd
*new_stream_fd
= NULL
;
508 status
= lttng_trace_chunk_create_subdirectory(chunk
,
510 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
515 lttng_trace_chunk_put(stream
->trace_chunk
);
516 acquired_reference
= lttng_trace_chunk_get(chunk
);
517 assert(acquired_reference
);
518 stream
->trace_chunk
= chunk
;
520 if (stream
->stream_fd
) {
521 stream_fd_put(stream
->stream_fd
);
522 stream
->stream_fd
= NULL
;
524 ret
= stream_create_data_output_file_from_trace_chunk(stream
, chunk
,
525 false, &new_stream_fd
);
526 stream
->stream_fd
= new_stream_fd
;
532 * We keep ownership of path_name and channel_name.
534 struct relay_stream
*stream_create(struct ctf_trace
*trace
,
535 uint64_t stream_handle
, char *path_name
,
536 char *channel_name
, uint64_t tracefile_size
,
537 uint64_t tracefile_count
)
540 struct relay_stream
*stream
= NULL
;
541 struct relay_session
*session
= trace
->session
;
542 bool acquired_reference
= false;
543 struct lttng_trace_chunk
*current_trace_chunk
;
545 stream
= zmalloc(sizeof(struct relay_stream
));
546 if (stream
== NULL
) {
547 PERROR("relay stream zmalloc");
551 stream
->stream_handle
= stream_handle
;
552 stream
->prev_data_seq
= -1ULL;
553 stream
->prev_index_seq
= -1ULL;
554 stream
->last_net_seq_num
= -1ULL;
555 stream
->ctf_stream_id
= -1ULL;
556 stream
->tracefile_size
= tracefile_size
;
557 stream
->tracefile_count
= tracefile_count
;
558 stream
->path_name
= path_name
;
559 stream
->channel_name
= channel_name
;
560 stream
->beacon_ts_end
= -1ULL;
561 lttng_ht_node_init_u64(&stream
->node
, stream
->stream_handle
);
562 pthread_mutex_init(&stream
->lock
, NULL
);
563 urcu_ref_init(&stream
->ref
);
564 ctf_trace_get(trace
);
565 stream
->trace
= trace
;
567 pthread_mutex_lock(&trace
->session
->lock
);
568 current_trace_chunk
= trace
->session
->current_trace_chunk
;
569 if (current_trace_chunk
) {
570 acquired_reference
= lttng_trace_chunk_get(current_trace_chunk
);
572 pthread_mutex_unlock(&trace
->session
->lock
);
573 if (!acquired_reference
) {
574 ERR("Cannot create stream for channel \"%s\" as a reference to the session's current trace chunk could not be acquired",
580 stream
->indexes_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
581 if (!stream
->indexes_ht
) {
582 ERR("Cannot created indexes_ht");
587 pthread_mutex_lock(&stream
->lock
);
588 ret
= stream_set_trace_chunk(stream
, current_trace_chunk
);
589 pthread_mutex_unlock(&stream
->lock
);
591 ERR("Failed to set the current trace chunk of session \"%s\" on newly created stream of channel \"%s\"",
592 trace
->session
->session_name
,
593 stream
->channel_name
);
597 stream
->tfa
= tracefile_array_create(stream
->tracefile_count
);
603 stream
->is_metadata
= !strcmp(stream
->channel_name
,
604 DEFAULT_METADATA_NAME
);
605 stream
->in_recv_list
= true;
608 * Add the stream in the recv list of the session. Once the end stream
609 * message is received, all session streams are published.
611 pthread_mutex_lock(&session
->recv_list_lock
);
612 cds_list_add_rcu(&stream
->recv_node
, &session
->recv_list
);
613 session
->stream_count
++;
614 pthread_mutex_unlock(&session
->recv_list_lock
);
617 * Both in the ctf_trace object and the global stream ht since the data
618 * side of the relayd does not have the concept of session.
620 lttng_ht_add_unique_u64(relay_streams_ht
, &stream
->node
);
621 stream
->in_stream_ht
= true;
623 DBG("Relay new stream added %s with ID %" PRIu64
, stream
->channel_name
,
624 stream
->stream_handle
);
629 if (stream
->stream_fd
) {
630 stream_fd_put(stream
->stream_fd
);
631 stream
->stream_fd
= NULL
;
636 lttng_trace_chunk_put(current_trace_chunk
);
641 * path_name and channel_name need to be freed explicitly here
642 * because we cannot rely on stream_put().
650 * Called with the session lock held.
652 void stream_publish(struct relay_stream
*stream
)
654 struct relay_session
*session
;
656 pthread_mutex_lock(&stream
->lock
);
657 if (stream
->published
) {
661 session
= stream
->trace
->session
;
663 pthread_mutex_lock(&session
->recv_list_lock
);
664 if (stream
->in_recv_list
) {
665 cds_list_del_rcu(&stream
->recv_node
);
666 stream
->in_recv_list
= false;
668 pthread_mutex_unlock(&session
->recv_list_lock
);
670 pthread_mutex_lock(&stream
->trace
->stream_list_lock
);
671 cds_list_add_rcu(&stream
->stream_node
, &stream
->trace
->stream_list
);
672 pthread_mutex_unlock(&stream
->trace
->stream_list_lock
);
674 stream
->published
= true;
676 pthread_mutex_unlock(&stream
->lock
);
680 * Stream must be protected by holding the stream lock or by virtue of being
681 * called from stream_destroy.
683 static void stream_unpublish(struct relay_stream
*stream
)
685 if (stream
->in_stream_ht
) {
686 struct lttng_ht_iter iter
;
689 iter
.iter
.node
= &stream
->node
.node
;
690 ret
= lttng_ht_del(relay_streams_ht
, &iter
);
692 stream
->in_stream_ht
= false;
694 if (stream
->published
) {
695 pthread_mutex_lock(&stream
->trace
->stream_list_lock
);
696 cds_list_del_rcu(&stream
->stream_node
);
697 pthread_mutex_unlock(&stream
->trace
->stream_list_lock
);
698 stream
->published
= false;
702 static void stream_destroy(struct relay_stream
*stream
)
704 if (stream
->indexes_ht
) {
706 * Calling lttng_ht_destroy in call_rcu worker thread so
707 * we don't hold the RCU read-side lock while calling
710 lttng_ht_destroy(stream
->indexes_ht
);
713 tracefile_array_destroy(stream
->tfa
);
715 free(stream
->path_name
);
716 free(stream
->channel_name
);
720 static void stream_destroy_rcu(struct rcu_head
*rcu_head
)
722 struct relay_stream
*stream
=
723 caa_container_of(rcu_head
, struct relay_stream
, rcu_node
);
725 stream_destroy(stream
);
729 * No need to take stream->lock since this is only called on the final
730 * stream_put which ensures that a single thread may act on the stream.
732 static void stream_release(struct urcu_ref
*ref
)
734 struct relay_stream
*stream
=
735 caa_container_of(ref
, struct relay_stream
, ref
);
736 struct relay_session
*session
;
738 session
= stream
->trace
->session
;
740 DBG("Releasing stream id %" PRIu64
, stream
->stream_handle
);
742 pthread_mutex_lock(&session
->recv_list_lock
);
743 session
->stream_count
--;
744 if (stream
->in_recv_list
) {
745 cds_list_del_rcu(&stream
->recv_node
);
746 stream
->in_recv_list
= false;
748 pthread_mutex_unlock(&session
->recv_list_lock
);
750 stream_unpublish(stream
);
752 if (stream
->stream_fd
) {
753 stream_fd_put(stream
->stream_fd
);
754 stream
->stream_fd
= NULL
;
756 if (stream
->index_file
) {
757 lttng_index_file_put(stream
->index_file
);
758 stream
->index_file
= NULL
;
761 ctf_trace_put(stream
->trace
);
762 stream
->trace
= NULL
;
764 stream_complete_rotation(stream
);
765 lttng_trace_chunk_put(stream
->trace_chunk
);
766 stream
->trace_chunk
= NULL
;
768 call_rcu(&stream
->rcu_node
, stream_destroy_rcu
);
771 void stream_put(struct relay_stream
*stream
)
774 assert(stream
->ref
.refcount
!= 0);
776 * Wait until we have processed all the stream packets before
777 * actually putting our last stream reference.
779 urcu_ref_put(&stream
->ref
, stream_release
);
783 int stream_set_pending_rotation(struct relay_stream
*stream
,
784 struct lttng_trace_chunk
*next_trace_chunk
,
785 uint64_t rotation_sequence_number
)
788 const struct relay_stream_rotation rotation
= {
789 .seq_num
= rotation_sequence_number
,
790 .next_trace_chunk
= next_trace_chunk
,
793 if (stream
->ongoing_rotation
.is_set
) {
794 ERR("Attempted to set a pending rotation on a stream already being rotated (protocol error)");
799 if (next_trace_chunk
) {
800 const bool reference_acquired
=
801 lttng_trace_chunk_get(next_trace_chunk
);
803 assert(reference_acquired
);
805 LTTNG_OPTIONAL_SET(&stream
->ongoing_rotation
, rotation
);
807 DBG("Setting pending rotation: stream_id = %" PRIu64
", rotation_seq_num = %" PRIu64
,
808 stream
->stream_handle
, rotation_sequence_number
);
809 if (stream
->is_metadata
) {
811 * A metadata stream has no index; consider it already rotated.
813 stream
->ongoing_rotation
.value
.index_rotated
= true;
814 ret
= stream_rotate_data_file(stream
);
816 ret
= try_rotate_stream_data(stream
);
821 ret
= try_rotate_stream_index(stream
);
830 void try_stream_close(struct relay_stream
*stream
)
832 bool session_aborted
;
833 struct relay_session
*session
= stream
->trace
->session
;
835 DBG("Trying to close stream %" PRIu64
, stream
->stream_handle
);
837 pthread_mutex_lock(&session
->lock
);
838 session_aborted
= session
->aborted
;
839 pthread_mutex_unlock(&session
->lock
);
841 pthread_mutex_lock(&stream
->lock
);
843 * Can be called concurently by connection close and reception of last
846 if (stream
->closed
) {
847 pthread_mutex_unlock(&stream
->lock
);
848 DBG("closing stream %" PRIu64
" aborted since it is already marked as closed", stream
->stream_handle
);
852 stream
->close_requested
= true;
854 if (stream
->last_net_seq_num
== -1ULL) {
856 * Handle connection close without explicit stream close
859 * We can be clever about indexes partially received in
860 * cases where we received the data socket part, but not
861 * the control socket part: since we're currently closing
862 * the stream on behalf of the control socket, we *know*
863 * there won't be any more control information for this
864 * socket. Therefore, we can destroy all indexes for
865 * which we have received only the file descriptor (from
866 * data socket). This takes care of consumerd crashes
867 * between sending the data and control information for
868 * a packet. Since those are sent in that order, we take
869 * care of consumerd crashes.
871 DBG("relay_index_close_partial_fd");
872 relay_index_close_partial_fd(stream
);
874 * Use the highest net_seq_num we currently have pending
875 * As end of stream indicator. Leave last_net_seq_num
876 * at -1ULL if we cannot find any index.
878 stream
->last_net_seq_num
= relay_index_find_last(stream
);
879 DBG("Updating stream->last_net_seq_num to %" PRIu64
, stream
->last_net_seq_num
);
880 /* Fall-through into the next check. */
883 if (stream
->last_net_seq_num
!= -1ULL &&
884 ((int64_t) (stream
->prev_data_seq
- stream
->last_net_seq_num
)) < 0
885 && !session_aborted
) {
887 * Don't close since we still have data pending. This
888 * handles cases where an explicit close command has
889 * been received for this stream, and cases where the
890 * connection has been closed, and we are awaiting for
891 * index information from the data socket. It is
892 * therefore expected that all the index fd information
893 * we need has already been received on the control
894 * socket. Matching index information from data socket
895 * should be Expected Soon(TM).
897 * TODO: We should implement a timer to garbage collect
898 * streams after a timeout to be resilient against a
899 * consumerd implementation that would not match this
902 pthread_mutex_unlock(&stream
->lock
);
903 DBG("closing stream %" PRIu64
" aborted since it still has data pending", stream
->stream_handle
);
907 * We received all the indexes we can expect.
909 stream_unpublish(stream
);
910 stream
->closed
= true;
911 /* Relay indexes are only used by the "consumer/sessiond" end. */
912 relay_index_close_all(stream
);
913 pthread_mutex_unlock(&stream
->lock
);
914 DBG("Succeeded in closing stream %" PRIu64
, stream
->stream_handle
);
918 int stream_init_packet(struct relay_stream
*stream
, size_t packet_size
,
923 ASSERT_LOCKED(stream
->lock
);
924 if (caa_likely(stream
->tracefile_size
== 0)) {
925 /* No size limit set; nothing to check. */
930 * Check if writing the new packet would exceed the maximal file size.
932 if (caa_unlikely((stream
->tracefile_size_current
+ packet_size
) >
933 stream
->tracefile_size
)) {
934 const uint64_t new_file_index
=
935 (stream
->tracefile_current_index
+ 1) %
936 stream
->tracefile_count
;
938 if (new_file_index
< stream
->tracefile_current_index
) {
939 stream
->tracefile_wrapped_around
= true;
941 DBG("New stream packet causes stream file rotation: stream_id = %" PRIu64
942 ", current_file_size = %" PRIu64
943 ", packet_size = %zu, current_file_index = %" PRIu64
944 " new_file_index = %" PRIu64
,
945 stream
->stream_handle
,
946 stream
->tracefile_size_current
, packet_size
,
947 stream
->tracefile_current_index
, new_file_index
);
948 tracefile_array_file_rotate(stream
->tfa
);
949 stream
->tracefile_current_index
= new_file_index
;
951 if (stream
->stream_fd
) {
952 stream_fd_put(stream
->stream_fd
);
953 stream
->stream_fd
= NULL
;
955 ret
= stream_create_data_output_file_from_trace_chunk(stream
,
956 stream
->trace_chunk
, false, &stream
->stream_fd
);
958 ERR("Failed to perform trace file rotation of stream %" PRIu64
,
959 stream
->stream_handle
);
964 * Reset current size because we just performed a stream
967 stream
->tracefile_size_current
= 0;
968 *file_rotated
= true;
970 *file_rotated
= false;
976 /* Note that the packet is not necessarily complete. */
977 int stream_write(struct relay_stream
*stream
,
978 const struct lttng_buffer_view
*packet
, size_t padding_len
)
982 size_t padding_to_write
= padding_len
;
983 char padding_buffer
[FILE_IO_STACK_BUFFER_SIZE
];
985 ASSERT_LOCKED(stream
->lock
);
986 memset(padding_buffer
, 0,
987 min(sizeof(padding_buffer
), padding_to_write
));
990 write_ret
= lttng_write(stream
->stream_fd
->fd
,
991 packet
->data
, packet
->size
);
992 if (write_ret
!= packet
->size
) {
993 PERROR("Failed to write to stream file of %sstream %" PRIu64
,
994 stream
->is_metadata
? "metadata " : "",
995 stream
->stream_handle
);
1001 while (padding_to_write
> 0) {
1002 const size_t padding_to_write_this_pass
=
1003 min(padding_to_write
, sizeof(padding_buffer
));
1005 write_ret
= lttng_write(stream
->stream_fd
->fd
,
1006 padding_buffer
, padding_to_write_this_pass
);
1007 if (write_ret
!= padding_to_write_this_pass
) {
1008 PERROR("Failed to write padding to file of %sstream %" PRIu64
,
1009 stream
->is_metadata
? "metadata " : "",
1010 stream
->stream_handle
);
1014 padding_to_write
-= padding_to_write_this_pass
;
1017 if (stream
->is_metadata
) {
1018 stream
->metadata_received
+= packet
->size
+ padding_len
;
1021 DBG("Wrote to %sstream %" PRIu64
": data_length = %zu, padding_length = %zu",
1022 stream
->is_metadata
? "metadata " : "",
1023 stream
->stream_handle
,
1024 packet
? packet
->size
: (size_t) 0, padding_len
);
1030 * Update index after receiving a packet for a data stream.
1032 * Called with the stream lock held.
1034 * Return 0 on success else a negative value.
1036 int stream_update_index(struct relay_stream
*stream
, uint64_t net_seq_num
,
1037 bool rotate_index
, bool *flushed
, uint64_t total_size
)
1040 uint64_t data_offset
;
1041 struct relay_index
*index
;
1043 ASSERT_LOCKED(stream
->lock
);
1044 /* Get data offset because we are about to update the index. */
1045 data_offset
= htobe64(stream
->tracefile_size_current
);
1047 DBG("handle_index_data: stream %" PRIu64
" net_seq_num %" PRIu64
" data offset %" PRIu64
,
1048 stream
->stream_handle
, net_seq_num
, stream
->tracefile_size_current
);
1051 * Lookup for an existing index for that stream id/sequence
1052 * number. If it exists, the control thread has already received the
1053 * data for it, thus we need to write it to disk.
1055 index
= relay_index_get_by_id_or_create(stream
, net_seq_num
);
1061 if (rotate_index
|| !stream
->index_file
) {
1062 ret
= create_index_file(stream
, stream
->trace_chunk
);
1064 ERR("Failed to create index file for stream %" PRIu64
,
1065 stream
->stream_handle
);
1066 /* Put self-ref for this index due to error. */
1067 relay_index_put(index
);
1073 if (relay_index_set_file(index
, stream
->index_file
, data_offset
)) {
1075 /* Put self-ref for this index due to error. */
1076 relay_index_put(index
);
1081 ret
= relay_index_try_flush(index
);
1083 tracefile_array_commit_seq(stream
->tfa
);
1084 stream
->index_received_seqcount
++;
1086 } else if (ret
> 0) {
1087 index
->total_size
= total_size
;
1094 * relay_index_try_flush is responsible for the self-reference
1095 * put of the index object on error.
1097 ERR("relay_index_try_flush error %d", ret
);
1104 int stream_complete_packet(struct relay_stream
*stream
, size_t packet_total_size
,
1105 uint64_t sequence_number
, bool index_flushed
)
1109 ASSERT_LOCKED(stream
->lock
);
1111 stream
->tracefile_size_current
+= packet_total_size
;
1112 if (index_flushed
) {
1113 stream
->pos_after_last_complete_data_index
=
1114 stream
->tracefile_size_current
;
1115 stream
->prev_index_seq
= sequence_number
;
1116 ret
= try_rotate_stream_index(stream
);
1122 stream
->prev_data_seq
= sequence_number
;
1123 ret
= try_rotate_stream_data(stream
);
1131 int stream_add_index(struct relay_stream
*stream
,
1132 const struct lttcomm_relayd_index
*index_info
)
1135 struct relay_index
*index
;
1137 ASSERT_LOCKED(stream
->lock
);
1139 /* Live beacon handling */
1140 if (index_info
->packet_size
== 0) {
1141 DBG("Received live beacon for stream %" PRIu64
,
1142 stream
->stream_handle
);
1145 * Only flag a stream inactive when it has already
1146 * received data and no indexes are in flight.
1148 if (stream
->index_received_seqcount
> 0
1149 && stream
->indexes_in_flight
== 0) {
1150 stream
->beacon_ts_end
= index_info
->timestamp_end
;
1155 stream
->beacon_ts_end
= -1ULL;
1158 if (stream
->ctf_stream_id
== -1ULL) {
1159 stream
->ctf_stream_id
= index_info
->stream_id
;
1162 index
= relay_index_get_by_id_or_create(stream
, index_info
->net_seq_num
);
1165 ERR("Failed to get or create index %" PRIu64
,
1166 index_info
->net_seq_num
);
1169 if (relay_index_set_control_data(index
, index_info
,
1170 stream
->trace
->session
->minor
)) {
1171 ERR("set_index_control_data error");
1172 relay_index_put(index
);
1176 ret
= relay_index_try_flush(index
);
1178 tracefile_array_commit_seq(stream
->tfa
);
1179 stream
->index_received_seqcount
++;
1180 stream
->pos_after_last_complete_data_index
+= index
->total_size
;
1181 stream
->prev_index_seq
= index_info
->net_seq_num
;
1183 ret
= try_rotate_stream_index(stream
);
1187 } else if (ret
> 0) {
1194 * relay_index_try_flush is responsible for the self-reference
1195 * put of the index object on error.
1197 ERR("relay_index_try_flush error %d", ret
);
1204 static void print_stream_indexes(struct relay_stream
*stream
)
1206 struct lttng_ht_iter iter
;
1207 struct relay_index
*index
;
1210 cds_lfht_for_each_entry(stream
->indexes_ht
->ht
, &iter
.iter
, index
,
1212 DBG("index %p net_seq_num %" PRIu64
" refcount %ld"
1213 " stream %" PRIu64
" trace %" PRIu64
1214 " session %" PRIu64
,
1217 stream
->ref
.refcount
,
1218 index
->stream
->stream_handle
,
1219 index
->stream
->trace
->id
,
1220 index
->stream
->trace
->session
->id
);
1225 int stream_reset_file(struct relay_stream
*stream
)
1227 ASSERT_LOCKED(stream
->lock
);
1229 if (stream
->stream_fd
) {
1230 stream_fd_put(stream
->stream_fd
);
1231 stream
->stream_fd
= NULL
;
1234 stream
->tracefile_size_current
= 0;
1235 stream
->prev_data_seq
= 0;
1236 stream
->prev_index_seq
= 0;
1237 /* Note that this does not reset the tracefile array. */
1238 stream
->tracefile_current_index
= 0;
1239 stream
->pos_after_last_complete_data_index
= 0;
1241 return stream_create_data_output_file_from_trace_chunk(stream
,
1242 stream
->trace_chunk
, true, &stream
->stream_fd
);
1245 void print_relay_streams(void)
1247 struct lttng_ht_iter iter
;
1248 struct relay_stream
*stream
;
1250 if (!relay_streams_ht
) {
1255 cds_lfht_for_each_entry(relay_streams_ht
->ht
, &iter
.iter
, stream
,
1257 if (!stream_get(stream
)) {
1260 DBG("stream %p refcount %ld stream %" PRIu64
" trace %" PRIu64
1261 " session %" PRIu64
,
1263 stream
->ref
.refcount
,
1264 stream
->stream_handle
,
1266 stream
->trace
->session
->id
);
1267 print_stream_indexes(stream
);