Clean-up: consumerd: reduce duplication of stream output close code
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 23 Jun 2022 18:28:01 +0000 (14:28 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 4 Oct 2022 21:59:42 +0000 (17:59 -0400)
The kernel space consumer implements its own version of a stream_close
operation where it could use the common consumer code. This change
separates the tear down of the buffers (munmap, in the kernel case) from
the closing of the output stream in consumer_stream_close().

This change allows the kernel snapshot code to re-use the common
close function instead of rolling its own `finalize_snapshot_stream`.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I48e3193ceb3c15ddd8c6fcecd37ab60b793f7e66

src/common/consumer/consumer-stream.cpp
src/common/consumer/consumer-stream.hpp
src/common/consumer/consumer.cpp
src/common/kernel-consumer/kernel-consumer.cpp
src/common/ust-consumer/ust-consumer.cpp

index bb0ec0a2436db1a9fb042b29b1e53b622c2eb60e..b30e9aac0680eb6795a763f836a5da8ff18349fa 100644 (file)
@@ -850,69 +850,19 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
  * The consumer data lock MUST be acquired.
  * The stream lock MUST be acquired.
  */
-void consumer_stream_close(struct lttng_consumer_stream *stream)
+void consumer_stream_close_output(struct lttng_consumer_stream *stream)
 {
-       int ret;
        struct consumer_relayd_sock_pair *relayd;
 
        LTTNG_ASSERT(stream);
 
-       switch (the_consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               if (stream->mmap_base != NULL) {
-                       ret = munmap(stream->mmap_base, stream->mmap_len);
-                       if (ret != 0) {
-                               PERROR("munmap");
-                       }
-               }
-
-               if (stream->wait_fd >= 0) {
-                       ret = close(stream->wait_fd);
-                       if (ret) {
-                               PERROR("close");
-                       }
-                       stream->wait_fd = -1;
-               }
-               if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) {
-                       utils_close_pipe(stream->splice_pipe);
-               }
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-       {
-               /*
-                * Special case for the metadata since the wait fd is an internal pipe
-                * polled in the metadata thread.
-                */
-               if (stream->metadata_flag && stream->chan->monitor) {
-                       int rpipe = stream->ust_metadata_poll_pipe[0];
-
-                       /*
-                        * This will stop the channel timer if one and close the write side
-                        * of the metadata poll pipe.
-                        */
-                       lttng_ustconsumer_close_metadata(stream->chan);
-                       if (rpipe >= 0) {
-                               ret = close(rpipe);
-                               if (ret < 0) {
-                                       PERROR("closing metadata pipe read side");
-                               }
-                               stream->ust_metadata_poll_pipe[0] = -1;
-                       }
-               }
-               break;
-       }
-       default:
-               ERR("Unknown consumer_data type");
-               abort();
-       }
-
        /* Close output fd. Could be a socket or local file at this point. */
        if (stream->out_fd >= 0) {
-               ret = close(stream->out_fd);
+               const auto ret = close(stream->out_fd);
                if (ret) {
-                       PERROR("close");
+                       PERROR("Failed to close stream output file descriptor");
                }
+
                stream->out_fd = -1;
        }
 
@@ -929,7 +879,9 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != NULL) {
                consumer_stream_relayd_close(stream, relayd);
+               stream->net_seq_idx = -1ULL;
        }
+
        rcu_read_unlock();
 }
 
@@ -1001,9 +953,54 @@ void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream)
 
        switch (the_consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
+               if (stream->mmap_base != NULL) {
+                       const auto ret = munmap(stream->mmap_base, stream->mmap_len);
+
+                       if (ret != 0) {
+                               PERROR("munmap");
+                       }
+               }
+
+               if (stream->wait_fd >= 0) {
+                       const auto ret = close(stream->wait_fd);
+
+                       if (ret) {
+                               PERROR("close");
+                       }
+
+                       stream->wait_fd = -1;
+               }
+
+               if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) {
+                       utils_close_pipe(stream->splice_pipe);
+               }
+
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
+               /*
+                * Special case for the metadata since the wait fd is an internal pipe
+                * polled in the metadata thread.
+                */
+               if (stream->metadata_flag && stream->chan->monitor) {
+                       const auto rpipe = stream->ust_metadata_poll_pipe[0];
+
+                       /*
+                        * This will stop the channel timer if one and close the write side
+                        * of the metadata poll pipe.
+                        */
+                       lttng_ustconsumer_close_metadata(stream->chan);
+                       if (rpipe >= 0) {
+                               const auto ret = close(rpipe);
+
+                               if (ret < 0) {
+                                       PERROR("closing metadata pipe read side");
+                               }
+
+                               stream->ust_metadata_poll_pipe[0] = -1;
+                       }
+               }
+
                lttng_ustconsumer_del_stream(stream);
                break;
        default:
@@ -1024,7 +1021,7 @@ static void destroy_close_stream(struct lttng_consumer_stream *stream)
        /* Destroy tracer buffers of the stream. */
        consumer_stream_destroy_buffers(stream);
        /* Close down everything including the relayd if one. */
-       consumer_stream_close(stream);
+       consumer_stream_close_output(stream);
 }
 
 /*
index a8e5ddd5507b3a31ed7db90e50406a89ddab2dfe..ae43f732259a92aea78146b8621cf709eb90f23c 100644 (file)
@@ -41,7 +41,7 @@ struct lttng_consumer_stream *consumer_stream_create(
  * The stream lock MUST be acquired.
  * The consumer data lock MUST be acquired.
  */
-void consumer_stream_close(struct lttng_consumer_stream *stream);
+void consumer_stream_close_output(struct lttng_consumer_stream *stream);
 
 /*
  * Close stream on the relayd side. This call can destroy a relayd if the
index 3272c129fe353c2ad6b75f3f359391ce55ced9bf..c1abb58df616f036e3f76a9a13959c74d217883b 100644 (file)
@@ -2167,7 +2167,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        consumer_stream_delete(stream, ht);
 
        /* Close down everything including the relayd if one. */
-       consumer_stream_close(stream);
+       consumer_stream_close_output(stream);
        /* Destroy tracer buffers of the stream. */
        consumer_stream_destroy_buffers(stream);
 
index d22aea1831978225da90eaf5b5d9507bd248ca4a..ce1d3d8293463898d09b1d9f12eeab018dbbbf9e 100644 (file)
@@ -135,30 +135,6 @@ error:
        return ret;
 }
 
-static void finalize_snapshot_stream(
-               struct lttng_consumer_stream *stream, uint64_t relayd_id)
-{
-       ASSERT_LOCKED(stream->lock);
-
-       if (relayd_id == (uint64_t) -1ULL) {
-               if (stream->out_fd >= 0) {
-                       const int ret = close(stream->out_fd);
-
-                       if (ret < 0) {
-                               PERROR("Failed to close stream snapshot output file descriptor");
-                       }
-
-                       stream->out_fd = -1;
-               }
-       } else {
-               close_relayd_stream(stream);
-               stream->net_seq_idx = (uint64_t) -1ULL;
-       }
-
-       lttng_trace_chunk_put(stream->trace_chunk);
-       stream->trace_chunk = NULL;
-}
-
 /*
  * Take a snapshot of all the stream of a channel
  * RCU read-side lock must be held across this function to ensure existence of
@@ -222,13 +198,13 @@ static int lttng_kconsumer_snapshot_channel(
                        ret = consumer_send_relayd_stream(stream, path);
                        if (ret < 0) {
                                ERR("sending stream to relayd");
-                               goto error_finalize_stream;
+                               goto error_close_stream_output;
                        }
                } else {
                        ret = consumer_stream_create_output_files(stream,
                                        false);
                        if (ret < 0) {
-                               goto error_finalize_stream;
+                               goto error_close_stream_output;
                        }
                        DBG("Kernel consumer snapshot stream (%" PRIu64 ")",
                                        stream->key);
@@ -246,7 +222,7 @@ static int lttng_kconsumer_snapshot_channel(
                        ret = kernctl_buffer_flush(stream->wait_fd);
                        if (ret < 0) {
                                ERR("Failed to flush kernel stream");
-                               goto error_finalize_stream;
+                               goto error_close_stream_output;
                        }
                        goto end_unlock;
                }
@@ -254,19 +230,19 @@ static int lttng_kconsumer_snapshot_channel(
                ret = lttng_kconsumer_take_snapshot(stream);
                if (ret < 0) {
                        ERR("Taking kernel snapshot");
-                       goto error_finalize_stream;
+                       goto error_close_stream_output;
                }
 
                ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
                if (ret < 0) {
                        ERR("Produced kernel snapshot position");
-                       goto error_finalize_stream;
+                       goto error_close_stream_output;
                }
 
                ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
                if (ret < 0) {
                        ERR("Consumerd kernel snapshot position");
-                       goto error_finalize_stream;
+                       goto error_close_stream_output;
                }
 
                consumed_pos = consumer_get_consume_start_pos(consumed_pos,
@@ -286,7 +262,7 @@ static int lttng_kconsumer_snapshot_channel(
                        if (ret < 0) {
                                if (ret != -EAGAIN) {
                                        PERROR("kernctl_get_subbuf snapshot");
-                                       goto error_finalize_stream;
+                                       goto error_close_stream_output;
                                }
                                DBG("Kernel consumer get subbuf failed. Skipping it.");
                                consumed_pos += stream->max_sb_size;
@@ -336,12 +312,12 @@ static int lttng_kconsumer_snapshot_channel(
                        ret = kernctl_put_subbuf(stream->wait_fd);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_put_subbuf");
-                               goto error_finalize_stream;
+                               goto error_close_stream_output;
                        }
                        consumed_pos += stream->max_sb_size;
                }
 
-               finalize_snapshot_stream(stream, relayd_id);
+               consumer_stream_close_output(stream);
                pthread_mutex_unlock(&stream->lock);
        }
 
@@ -354,8 +330,8 @@ error_put_subbuf:
        if (ret < 0) {
                ERR("Snapshot kernctl_put_subbuf error path");
        }
-error_finalize_stream:
-       finalize_snapshot_stream(stream, relayd_id);
+error_close_stream_output:
+       consumer_stream_close_output(stream);
 end_unlock:
        pthread_mutex_unlock(&stream->lock);
 end:
index b08cafdff9da6c217fb2702a7ad0e55379514de1..872bc1abf8f89c322e217b365138e88c5341d7f8 100644 (file)
@@ -1206,7 +1206,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                }
 
                /* Simply close the stream so we can use it on the next snapshot. */
-               consumer_stream_close(stream);
+               consumer_stream_close_output(stream);
                pthread_mutex_unlock(&stream->lock);
        }
 
@@ -1218,7 +1218,7 @@ error_put_subbuf:
                ERR("Snapshot lttng_ust_ctl_put_subbuf");
        }
 error_close_stream:
-       consumer_stream_close(stream);
+       consumer_stream_close_output(stream);
 error_unlock:
        pthread_mutex_unlock(&stream->lock);
        rcu_read_unlock();
This page took 0.033431 seconds and 4 git commands to generate.