#include <common/kernel-consumer/kernel-consumer.h>
#include <common/relayd/relayd.h>
#include <common/ust-consumer/ust-consumer.h>
+#include <common/utils.h>
#include "consumer-stream.h"
}
stream->wait_fd = -1;
}
+ if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) {
+ utils_close_pipe(stream->splice_pipe);
+ }
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
goto error_quit_pipe;
}
- ret = pipe(ctx->consumer_thread_pipe);
- if (ret < 0) {
- PERROR("Error creating thread pipe");
- goto error_thread_pipe;
- }
-
ret = pipe(ctx->consumer_channel_pipe);
if (ret < 0) {
PERROR("Error creating channel pipe");
goto error_metadata_pipe;
}
- ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe);
- if (ret < 0) {
- goto error_splice_pipe;
- }
-
return ctx;
-error_splice_pipe:
- lttng_pipe_destroy(ctx->consumer_metadata_pipe);
error_metadata_pipe:
utils_close_pipe(ctx->consumer_channel_pipe);
error_channel_pipe:
- utils_close_pipe(ctx->consumer_thread_pipe);
-error_thread_pipe:
utils_close_pipe(ctx->consumer_should_quit);
error_quit_pipe:
lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
if (ret) {
PERROR("close");
}
- utils_close_pipe(ctx->consumer_thread_pipe);
utils_close_pipe(ctx->consumer_channel_pipe);
lttng_pipe_destroy(ctx->consumer_data_pipe);
lttng_pipe_destroy(ctx->consumer_metadata_pipe);
lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
utils_close_pipe(ctx->consumer_should_quit);
- utils_close_pipe(ctx->consumer_splice_metadata_pipe);
unlink(ctx->consumer_command_sock_path);
free(ctx);
goto end;
}
}
-
- /*
- * Choose right pipe for splice. Metadata and trace data are handled by
- * different threads hence the use of two pipes in order not to race or
- * corrupt the written data.
- */
- if (stream->metadata_flag) {
- splice_pipe = ctx->consumer_splice_metadata_pipe;
- } else {
- splice_pipe = ctx->consumer_thread_pipe;
- }
+ splice_pipe = stream->splice_pipe;
/* Write metadata stream id before payload */
if (relayd) {
/* Splice data out */
ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
- DBG("Consumer splice pipe to file, ret %zd", ret_splice);
+ DBG("Consumer splice pipe to file (out_fd: %d), ret %zd",
+ outfd, ret_splice);
if (ret_splice < 0) {
ret = errno;
written = -ret;
*/
int index_fd;
+ /*
+ * Local pipe to extract data when using splice.
+ */
+ int splice_pipe[2];
+
/*
* Rendez-vous point between data and metadata stream in live mode.
*/
/* socket to exchange commands with sessiond */
char *consumer_command_sock_path;
/* communication with splice */
- int consumer_thread_pipe[2];
int consumer_channel_pipe[2];
- int consumer_splice_metadata_pipe[2];
/* Data stream poll thread pipe. To transfer data stream to the thread */
struct lttng_pipe *consumer_data_pipe;