Fix: create/destroy a splice_pipe per stream
authorJulien Desfossez <jdesfossez@efficios.com>
Wed, 12 Nov 2014 23:36:17 +0000 (18:36 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Mon, 17 Nov 2014 18:43:06 +0000 (13:43 -0500)
We had a per-thread splice_pipe (one for data and one for metadata), but
in case of error, we would end up filling the write side of the pipe and
never emptying it. This could lead to leaking data from one session to
the other, but also to stall the consumer trying to splice into a full
pipe.

Now we create a splice_pipe per-stream, so it is destroyed when the
session is destroyed.

Fixes: #726
Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/common/consumer-stream.c
src/common/consumer.c
src/common/consumer.h
src/common/kernel-consumer/kernel-consumer.c

index 8fe02e74eaa8dfe59b90ed4fbe2d6b4cf13b637b..92576aab408a193530e56f1d3739c47f43d58b35 100644 (file)
@@ -28,6 +28,7 @@
 #include <common/kernel-consumer/kernel-consumer.h>
 #include <common/relayd/relayd.h>
 #include <common/ust-consumer/ust-consumer.h>
+#include <common/utils.h>
 
 #include "consumer-stream.h"
 
@@ -119,6 +120,9 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)
                        }
                        stream->wait_fd = -1;
                }
+               if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) {
+                       utils_close_pipe(stream->splice_pipe);
+               }
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
index 86a34ddad86c452a4425d3d992d709c3fda5f07c..ee03795532aa505b8383215710cd6aac96835dc9 100644 (file)
@@ -1301,12 +1301,6 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_quit_pipe;
        }
 
-       ret = pipe(ctx->consumer_thread_pipe);
-       if (ret < 0) {
-               PERROR("Error creating thread pipe");
-               goto error_thread_pipe;
-       }
-
        ret = pipe(ctx->consumer_channel_pipe);
        if (ret < 0) {
                PERROR("Error creating channel pipe");
@@ -1318,20 +1312,11 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_metadata_pipe;
        }
 
-       ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe);
-       if (ret < 0) {
-               goto error_splice_pipe;
-       }
-
        return ctx;
 
-error_splice_pipe:
-       lttng_pipe_destroy(ctx->consumer_metadata_pipe);
 error_metadata_pipe:
        utils_close_pipe(ctx->consumer_channel_pipe);
 error_channel_pipe:
-       utils_close_pipe(ctx->consumer_thread_pipe);
-error_thread_pipe:
        utils_close_pipe(ctx->consumer_should_quit);
 error_quit_pipe:
        lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
@@ -1418,13 +1403,11 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
        if (ret) {
                PERROR("close");
        }
-       utils_close_pipe(ctx->consumer_thread_pipe);
        utils_close_pipe(ctx->consumer_channel_pipe);
        lttng_pipe_destroy(ctx->consumer_data_pipe);
        lttng_pipe_destroy(ctx->consumer_metadata_pipe);
        lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
        utils_close_pipe(ctx->consumer_should_quit);
-       utils_close_pipe(ctx->consumer_splice_metadata_pipe);
 
        unlink(ctx->consumer_command_sock_path);
        free(ctx);
@@ -1717,17 +1700,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                        goto end;
                }
        }
-
-       /*
-        * Choose right pipe for splice. Metadata and trace data are handled by
-        * different threads hence the use of two pipes in order not to race or
-        * corrupt the written data.
-        */
-       if (stream->metadata_flag) {
-               splice_pipe = ctx->consumer_splice_metadata_pipe;
-       } else {
-               splice_pipe = ctx->consumer_thread_pipe;
-       }
+       splice_pipe = stream->splice_pipe;
 
        /* Write metadata stream id before payload */
        if (relayd) {
@@ -1833,7 +1806,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                /* Splice data out */
                ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
                                ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
-               DBG("Consumer splice pipe to file, ret %zd", ret_splice);
+               DBG("Consumer splice pipe to file (out_fd: %d), ret %zd",
+                               outfd, ret_splice);
                if (ret_splice < 0) {
                        ret = errno;
                        written = -ret;
index 4ac823c017d03754ed545a9a1288df707b84778f..1e378f04ea631ea4387a1940c407599d19a63966 100644 (file)
@@ -336,6 +336,11 @@ struct lttng_consumer_stream {
         */
        int index_fd;
 
+       /*
+        * Local pipe to extract data when using splice.
+        */
+       int splice_pipe[2];
+
        /*
         * Rendez-vous point between data and metadata stream in live mode.
         */
@@ -451,9 +456,7 @@ struct lttng_consumer_local_data {
        /* socket to exchange commands with sessiond */
        char *consumer_command_sock_path;
        /* communication with splice */
-       int consumer_thread_pipe[2];
        int consumer_channel_pipe[2];
-       int consumer_splice_metadata_pipe[2];
        /* Data stream poll thread pipe. To transfer data stream to the thread */
        struct lttng_pipe *consumer_data_pipe;
 
index 6c047f9cbfbae5df0412f09088aba23f0fabaf6c..79b25d35d2970d827f41637b51987e2e1580dc48 100644 (file)
@@ -641,6 +641,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                switch (channel->output) {
                case CONSUMER_CHANNEL_SPLICE:
                        new_stream->output = LTTNG_EVENT_SPLICE;
+                       ret = utils_create_pipe(new_stream->splice_pipe);
+                       if (ret < 0) {
+                               goto end_nosignal;
+                       }
                        break;
                case CONSUMER_CHANNEL_MMAP:
                        new_stream->output = LTTNG_EVENT_MMAP;
This page took 0.030099 seconds and 4 git commands to generate.