From: Jérémie Galarneau Date: Thu, 23 Jun 2022 18:28:01 +0000 (-0400) Subject: Clean-up: consumerd: reduce duplication of stream output close code X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=d119bd017a99d56ad36901ce8f2175a8ea3b5e5e;p=lttng-tools.git Clean-up: consumerd: reduce duplication of stream output close code The kernel space consumer implements its own version of a stream_close operation where it could use the common consumer code. This change separates the tear down of the buffers (munmap, in the kernel case) from the closing of the output stream in consumer_stream_close(). This change allows the kernel snapshot code to re-use the common close function instead of rolling its own `finalize_snapshot_stream`. Signed-off-by: Jérémie Galarneau Change-Id: I48e3193ceb3c15ddd8c6fcecd37ab60b793f7e66 --- diff --git a/src/common/consumer/consumer-stream.cpp b/src/common/consumer/consumer-stream.cpp index bb0ec0a24..b30e9aac0 100644 --- a/src/common/consumer/consumer-stream.cpp +++ b/src/common/consumer/consumer-stream.cpp @@ -850,69 +850,19 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream, * The consumer data lock MUST be acquired. * The stream lock MUST be acquired. */ -void consumer_stream_close(struct lttng_consumer_stream *stream) +void consumer_stream_close_output(struct lttng_consumer_stream *stream) { - int ret; struct consumer_relayd_sock_pair *relayd; LTTNG_ASSERT(stream); - switch (the_consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - if (stream->mmap_base != NULL) { - ret = munmap(stream->mmap_base, stream->mmap_len); - if (ret != 0) { - PERROR("munmap"); - } - } - - if (stream->wait_fd >= 0) { - ret = close(stream->wait_fd); - if (ret) { - PERROR("close"); - } - stream->wait_fd = -1; - } - if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) { - utils_close_pipe(stream->splice_pipe); - } - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - { - /* - * Special case for the metadata since the wait fd is an internal pipe - * polled in the metadata thread. - */ - if (stream->metadata_flag && stream->chan->monitor) { - int rpipe = stream->ust_metadata_poll_pipe[0]; - - /* - * This will stop the channel timer if one and close the write side - * of the metadata poll pipe. - */ - lttng_ustconsumer_close_metadata(stream->chan); - if (rpipe >= 0) { - ret = close(rpipe); - if (ret < 0) { - PERROR("closing metadata pipe read side"); - } - stream->ust_metadata_poll_pipe[0] = -1; - } - } - break; - } - default: - ERR("Unknown consumer_data type"); - abort(); - } - /* Close output fd. Could be a socket or local file at this point. */ if (stream->out_fd >= 0) { - ret = close(stream->out_fd); + const auto ret = close(stream->out_fd); if (ret) { - PERROR("close"); + PERROR("Failed to close stream output file descriptor"); } + stream->out_fd = -1; } @@ -929,7 +879,9 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd != NULL) { consumer_stream_relayd_close(stream, relayd); + stream->net_seq_idx = -1ULL; } + rcu_read_unlock(); } @@ -1001,9 +953,54 @@ void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream) switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: + if (stream->mmap_base != NULL) { + const auto ret = munmap(stream->mmap_base, stream->mmap_len); + + if (ret != 0) { + PERROR("munmap"); + } + } + + if (stream->wait_fd >= 0) { + const auto ret = close(stream->wait_fd); + + if (ret) { + PERROR("close"); + } + + stream->wait_fd = -1; + } + + if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) { + utils_close_pipe(stream->splice_pipe); + } + break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: + /* + * Special case for the metadata since the wait fd is an internal pipe + * polled in the metadata thread. + */ + if (stream->metadata_flag && stream->chan->monitor) { + const auto rpipe = stream->ust_metadata_poll_pipe[0]; + + /* + * This will stop the channel timer if one and close the write side + * of the metadata poll pipe. + */ + lttng_ustconsumer_close_metadata(stream->chan); + if (rpipe >= 0) { + const auto ret = close(rpipe); + + if (ret < 0) { + PERROR("closing metadata pipe read side"); + } + + stream->ust_metadata_poll_pipe[0] = -1; + } + } + lttng_ustconsumer_del_stream(stream); break; default: @@ -1024,7 +1021,7 @@ static void destroy_close_stream(struct lttng_consumer_stream *stream) /* Destroy tracer buffers of the stream. */ consumer_stream_destroy_buffers(stream); /* Close down everything including the relayd if one. */ - consumer_stream_close(stream); + consumer_stream_close_output(stream); } /* diff --git a/src/common/consumer/consumer-stream.hpp b/src/common/consumer/consumer-stream.hpp index a8e5ddd55..ae43f7322 100644 --- a/src/common/consumer/consumer-stream.hpp +++ b/src/common/consumer/consumer-stream.hpp @@ -41,7 +41,7 @@ struct lttng_consumer_stream *consumer_stream_create( * The stream lock MUST be acquired. * The consumer data lock MUST be acquired. */ -void consumer_stream_close(struct lttng_consumer_stream *stream); +void consumer_stream_close_output(struct lttng_consumer_stream *stream); /* * Close stream on the relayd side. This call can destroy a relayd if the diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index 3272c129f..c1abb58df 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -2167,7 +2167,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, consumer_stream_delete(stream, ht); /* Close down everything including the relayd if one. */ - consumer_stream_close(stream); + consumer_stream_close_output(stream); /* Destroy tracer buffers of the stream. */ consumer_stream_destroy_buffers(stream); diff --git a/src/common/kernel-consumer/kernel-consumer.cpp b/src/common/kernel-consumer/kernel-consumer.cpp index d22aea183..ce1d3d829 100644 --- a/src/common/kernel-consumer/kernel-consumer.cpp +++ b/src/common/kernel-consumer/kernel-consumer.cpp @@ -135,30 +135,6 @@ error: return ret; } -static void finalize_snapshot_stream( - struct lttng_consumer_stream *stream, uint64_t relayd_id) -{ - ASSERT_LOCKED(stream->lock); - - if (relayd_id == (uint64_t) -1ULL) { - if (stream->out_fd >= 0) { - const int ret = close(stream->out_fd); - - if (ret < 0) { - PERROR("Failed to close stream snapshot output file descriptor"); - } - - stream->out_fd = -1; - } - } else { - close_relayd_stream(stream); - stream->net_seq_idx = (uint64_t) -1ULL; - } - - lttng_trace_chunk_put(stream->trace_chunk); - stream->trace_chunk = NULL; -} - /* * Take a snapshot of all the stream of a channel * RCU read-side lock must be held across this function to ensure existence of @@ -222,13 +198,13 @@ static int lttng_kconsumer_snapshot_channel( ret = consumer_send_relayd_stream(stream, path); if (ret < 0) { ERR("sending stream to relayd"); - goto error_finalize_stream; + goto error_close_stream_output; } } else { ret = consumer_stream_create_output_files(stream, false); if (ret < 0) { - goto error_finalize_stream; + goto error_close_stream_output; } DBG("Kernel consumer snapshot stream (%" PRIu64 ")", stream->key); @@ -246,7 +222,7 @@ static int lttng_kconsumer_snapshot_channel( ret = kernctl_buffer_flush(stream->wait_fd); if (ret < 0) { ERR("Failed to flush kernel stream"); - goto error_finalize_stream; + goto error_close_stream_output; } goto end_unlock; } @@ -254,19 +230,19 @@ static int lttng_kconsumer_snapshot_channel( ret = lttng_kconsumer_take_snapshot(stream); if (ret < 0) { ERR("Taking kernel snapshot"); - goto error_finalize_stream; + goto error_close_stream_output; } ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos); if (ret < 0) { ERR("Produced kernel snapshot position"); - goto error_finalize_stream; + goto error_close_stream_output; } ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos); if (ret < 0) { ERR("Consumerd kernel snapshot position"); - goto error_finalize_stream; + goto error_close_stream_output; } consumed_pos = consumer_get_consume_start_pos(consumed_pos, @@ -286,7 +262,7 @@ static int lttng_kconsumer_snapshot_channel( if (ret < 0) { if (ret != -EAGAIN) { PERROR("kernctl_get_subbuf snapshot"); - goto error_finalize_stream; + goto error_close_stream_output; } DBG("Kernel consumer get subbuf failed. Skipping it."); consumed_pos += stream->max_sb_size; @@ -336,12 +312,12 @@ static int lttng_kconsumer_snapshot_channel( ret = kernctl_put_subbuf(stream->wait_fd); if (ret < 0) { ERR("Snapshot kernctl_put_subbuf"); - goto error_finalize_stream; + goto error_close_stream_output; } consumed_pos += stream->max_sb_size; } - finalize_snapshot_stream(stream, relayd_id); + consumer_stream_close_output(stream); pthread_mutex_unlock(&stream->lock); } @@ -354,8 +330,8 @@ error_put_subbuf: if (ret < 0) { ERR("Snapshot kernctl_put_subbuf error path"); } -error_finalize_stream: - finalize_snapshot_stream(stream, relayd_id); +error_close_stream_output: + consumer_stream_close_output(stream); end_unlock: pthread_mutex_unlock(&stream->lock); end: diff --git a/src/common/ust-consumer/ust-consumer.cpp b/src/common/ust-consumer/ust-consumer.cpp index b08cafdff..872bc1abf 100644 --- a/src/common/ust-consumer/ust-consumer.cpp +++ b/src/common/ust-consumer/ust-consumer.cpp @@ -1206,7 +1206,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, } /* Simply close the stream so we can use it on the next snapshot. */ - consumer_stream_close(stream); + consumer_stream_close_output(stream); pthread_mutex_unlock(&stream->lock); } @@ -1218,7 +1218,7 @@ error_put_subbuf: ERR("Snapshot lttng_ust_ctl_put_subbuf"); } error_close_stream: - consumer_stream_close(stream); + consumer_stream_close_output(stream); error_unlock: pthread_mutex_unlock(&stream->lock); rcu_read_unlock();