summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
9fd9263)
Also, an important change here is that this pipe is no longer in non
block mode. Before sending stream's pointer over this pipe, only one
byte was written thus making it unlikely to fail in a read/write race
condition between threads. Now, 4 bytes are written so keeping this pipe
non block with threads is a bit of a "looking for trouble situation".
The lttng pipe wrappers make sure that the read and write side are
synchronized between threads using a mutex for each side. Furthermore,
the read and write handle partial I/O and EINTR meaning that once the
call returns we are sure that either everything was read/written or an
error occured thus making it not possible for the read side to block
indefinitely after a poll event.
Fixes #475
Signed-off-by: David Goulet <dgoulet@efficios.com>
} while (ret < 0 && errno == EINTR);
}
} while (ret < 0 && errno == EINTR);
}
+/*
+ * Notify a thread lttng pipe to poll back again. This usually means that some
+ * global state has changed so we just send back the thread in a poll wait
+ * call.
+ */
+static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
+{
+ struct lttng_consumer_stream *null_stream = NULL;
+
+ assert(pipe);
+
+ (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
+}
+
static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_channel *chan,
uint64_t key,
static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_channel *chan,
uint64_t key,
* read of this status which happens AFTER receiving this notify.
*/
if (ctx) {
* read of this status which happens AFTER receiving this notify.
*/
if (ctx) {
- notify_thread_pipe(ctx->consumer_data_pipe[1]);
+ notify_thread_lttng_pipe(ctx->consumer_data_pipe);
notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
}
}
notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
}
}
* Insert the consumer_data_pipe at the end of the array and don't
* increment i so nb_fd is the number of real FD.
*/
* Insert the consumer_data_pipe at the end of the array and don't
* increment i so nb_fd is the number of real FD.
*/
- (*pollfd)[i].fd = ctx->consumer_data_pipe[0];
+ (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
(*pollfd)[i].events = POLLIN | POLLPRI;
return i;
}
(*pollfd)[i].events = POLLIN | POLLPRI;
return i;
}
ctx->on_recv_stream = recv_stream;
ctx->on_update_stream = update_stream;
ctx->on_recv_stream = recv_stream;
ctx->on_update_stream = update_stream;
- ret = pipe(ctx->consumer_data_pipe);
- if (ret < 0) {
- PERROR("Error creating poll pipe");
+ ctx->consumer_data_pipe = lttng_pipe_open(0);
+ if (!ctx->consumer_data_pipe) {
- /* set read end of the pipe to non-blocking */
- ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK);
- if (ret < 0) {
- PERROR("fcntl O_NONBLOCK");
- goto error_poll_fcntl;
- }
-
- /* set write end of the pipe to non-blocking */
- ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK);
- if (ret < 0) {
- PERROR("fcntl O_NONBLOCK");
- goto error_poll_fcntl;
- }
-
ret = pipe(ctx->consumer_should_quit);
if (ret < 0) {
PERROR("Error creating recv pipe");
ret = pipe(ctx->consumer_should_quit);
if (ret < 0) {
PERROR("Error creating recv pipe");
utils_close_pipe(ctx->consumer_thread_pipe);
error_thread_pipe:
utils_close_pipe(ctx->consumer_should_quit);
utils_close_pipe(ctx->consumer_thread_pipe);
error_thread_pipe:
utils_close_pipe(ctx->consumer_should_quit);
- utils_close_pipe(ctx->consumer_data_pipe);
+ lttng_pipe_destroy(ctx->consumer_data_pipe);
error_poll_pipe:
free(ctx);
error:
error_poll_pipe:
free(ctx);
error:
}
utils_close_pipe(ctx->consumer_thread_pipe);
utils_close_pipe(ctx->consumer_channel_pipe);
}
utils_close_pipe(ctx->consumer_thread_pipe);
utils_close_pipe(ctx->consumer_channel_pipe);
- utils_close_pipe(ctx->consumer_data_pipe);
+ lttng_pipe_destroy(ctx->consumer_data_pipe);
utils_close_pipe(ctx->consumer_should_quit);
utils_close_pipe(ctx->consumer_splice_metadata_pipe);
utils_close_pipe(ctx->consumer_should_quit);
utils_close_pipe(ctx->consumer_splice_metadata_pipe);
ssize_t pipe_readlen;
DBG("consumer_data_pipe wake up");
ssize_t pipe_readlen;
DBG("consumer_data_pipe wake up");
- /* Consume 1 byte of pipe data */
- do {
- pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
- sizeof(new_stream));
- } while (pipe_readlen == -1 && errno == EINTR);
+ pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
+ &new_stream, sizeof(new_stream));
- PERROR("read consumer data pipe");
+ ERR("Consumer data pipe ret %ld", pipe_readlen);
/* Continue so we can at least handle the current stream(s). */
continue;
}
/* Continue so we can at least handle the current stream(s). */
continue;
}
* Notify the data poll thread to poll back again and test the
* consumer_quit state that we just set so to quit gracefully.
*/
* Notify the data poll thread to poll back again and test the
* consumer_quit state that we just set so to quit gracefully.
*/
- notify_thread_pipe(ctx->consumer_data_pipe[1]);
+ notify_thread_lttng_pipe(ctx->consumer_data_pipe);
notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
#include <common/compat/fcntl.h>
#include <common/compat/uuid.h>
#include <common/sessiond-comm/sessiond-comm.h>
#include <common/compat/fcntl.h>
#include <common/compat/uuid.h>
#include <common/sessiond-comm/sessiond-comm.h>
+#include <common/pipe.h>
/* Commands for consumer */
enum lttng_consumer_command {
/* Commands for consumer */
enum lttng_consumer_command {
int consumer_channel_pipe[2];
int consumer_splice_metadata_pipe[2];
/* Data stream poll thread pipe. To transfer data stream to the thread */
int consumer_channel_pipe[2];
int consumer_splice_metadata_pipe[2];
/* Data stream poll thread pipe. To transfer data stream to the thread */
- int consumer_data_pipe[2];
+ struct lttng_pipe *consumer_data_pipe;
/* to let the signal handler wake up the fd receiver thread */
int consumer_should_quit[2];
/* Metadata poll thread pipe. Transfer metadata stream to it */
/* to let the signal handler wake up the fd receiver thread */
int consumer_should_quit[2];
/* Metadata poll thread pipe. Transfer metadata stream to it */
#include <common/sessiond-comm/sessiond-comm.h>
#include <common/sessiond-comm/relayd.h>
#include <common/compat/fcntl.h>
#include <common/sessiond-comm/sessiond-comm.h>
#include <common/sessiond-comm/relayd.h>
#include <common/compat/fcntl.h>
+#include <common/pipe.h>
#include <common/relayd/relayd.h>
#include <common/utils.h>
#include <common/relayd/relayd.h>
#include <common/utils.h>
if (new_stream->metadata_flag) {
stream_pipe = ctx->consumer_metadata_pipe[1];
} else {
if (new_stream->metadata_flag) {
stream_pipe = ctx->consumer_metadata_pipe[1];
} else {
- stream_pipe = ctx->consumer_data_pipe[1];
+ stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe);
if (stream->metadata_flag) {
stream_pipe = ctx->consumer_metadata_pipe[1];
} else {
if (stream->metadata_flag) {
stream_pipe = ctx->consumer_metadata_pipe[1];
} else {
- stream_pipe = ctx->consumer_data_pipe[1];
+ stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe);