X-Git-Url: http://git.lttng.org./?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=fe52702c77489209dcb43208206bb6e7dd1fb04c;hb=a12886a5728de2b48c895abb8a9799e111e8f40b;hp=adb044f1d3ebd4b6b6d58a035c6e4d9eece706b9;hpb=1b95be80706a2f046cf526825b9da1f87a94b1ec;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index adb044f1d..fe52702c7 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -71,6 +71,7 @@ #include "session.h" #include "stream.h" #include "connection.h" +#include "tracefile-array.h" /* command line options */ char *opt_output_path; @@ -1265,9 +1266,17 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, ret = -1; goto end; } + + /* + * Set last_net_seq_num before the close flag. Required by data + * pending check. + */ pthread_mutex_lock(&stream->lock); - stream->closed = true; stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num); + pthread_mutex_unlock(&stream->lock); + + stream_close(stream); + if (stream->is_metadata) { struct relay_viewer_stream *vstream; @@ -1286,7 +1295,6 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, viewer_stream_put(vstream); } } - pthread_mutex_unlock(&stream->lock); stream_put(stream); end: @@ -1890,7 +1898,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, * Only flag a stream inactive when it has already * received data and no indexes are in flight. */ - if (stream->total_index_received > 0 + if (stream->index_received_seqcount > 0 && stream->indexes_in_flight == 0) { stream->beacon_ts_end = be64toh(index_info.timestamp_end); @@ -1918,7 +1926,8 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, } ret = relay_index_try_flush(index); if (ret == 0) { - stream->total_index_received++; + tracefile_array_commit_seq(stream->tfa); + stream->index_received_seqcount++; } else if (ret > 0) { /* no flush. */ ret = 0; @@ -2091,7 +2100,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, fd = index_create_file(stream->path_name, stream->channel_name, -1, -1, stream->tracefile_size, - stream->current_tracefile_id); + tracefile_array_get_file_index_head(stream->tfa)); if (fd < 0) { ret = -1; /* Put self-ref for this index due to error. */ @@ -2120,7 +2129,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, ret = relay_index_try_flush(index); if (ret == 0) { - stream->total_index_received++; + tracefile_array_commit_seq(stream->tfa); + stream->index_received_seqcount++; } else if (ret > 0) { /* No flush. */ ret = 0; @@ -2204,35 +2214,23 @@ static int relay_process_data(struct relay_connection *conn) if (stream->tracefile_size > 0 && (stream->tracefile_size_current + data_size) > stream->tracefile_size) { - uint64_t new_id; + uint64_t old_id, new_id; + + old_id = tracefile_array_get_file_index_head(stream->tfa); + tracefile_array_file_rotate(stream->tfa); + + /* new_id is updated by utils_rotate_stream_file. */ + new_id = old_id; - new_id = (stream->current_tracefile_id + 1) % - stream->tracefile_count; - /* - * Move viewer oldest available data position forward if - * we are overwriting a tracefile. - */ - if (new_id == stream->oldest_tracefile_id) { - stream->oldest_tracefile_id = - (stream->oldest_tracefile_id + 1) % - stream->tracefile_count; - } ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, stream->tracefile_size, stream->tracefile_count, -1, -1, stream->stream_fd->fd, - &stream->current_tracefile_id, - &stream->stream_fd->fd); + &new_id, &stream->stream_fd->fd); if (ret < 0) { ERR("Rotating stream output file"); goto end_stream_unlock; } - stream->current_tracefile_seq++; - if (stream->current_tracefile_seq - - stream->oldest_tracefile_seq >= - stream->tracefile_count) { - stream->oldest_tracefile_seq++; - } /* * Reset current size because we just performed a stream * rotation.