}
case LTTNG_CONSUMER_ADD_STREAM:
{
- int fd, stream_pipe;
+ int fd;
+ struct lttng_pipe *stream_pipe;
struct consumer_relayd_sock_pair *relayd = NULL;
struct lttng_consumer_stream *new_stream;
struct lttng_consumer_channel *channel;
/* Get the right pipe where the stream will be sent. */
if (new_stream->metadata_flag) {
- stream_pipe = lttng_pipe_get_writefd(ctx->consumer_metadata_pipe);
+ stream_pipe = ctx->consumer_metadata_pipe;
} else {
- stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe);
+ stream_pipe = ctx->consumer_data_pipe;
}
- do {
- ret = write(stream_pipe, &new_stream, sizeof(new_stream));
- } while (ret < 0 && errno == EINTR);
+ ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
if (ret < 0) {
- PERROR("Consumer write %s stream to pipe %d",
+ ERR("Consumer write %s stream to pipe %d",
new_stream->metadata_flag ? "metadata" : "data",
- stream_pipe);
+ lttng_pipe_get_writefd(stream_pipe));
consumer_del_stream(new_stream, NULL);
goto end_nosignal;
}
static int send_stream_to_thread(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
- int ret, stream_pipe;
+ int ret;
+ struct lttng_pipe *stream_pipe;
/* Get the right pipe where the stream will be sent. */
if (stream->metadata_flag) {
- stream_pipe = lttng_pipe_get_writefd(ctx->consumer_metadata_pipe);
+ stream_pipe = ctx->consumer_metadata_pipe;
} else {
- stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe);
+ stream_pipe = ctx->consumer_data_pipe;
}
- do {
- ret = write(stream_pipe, &stream, sizeof(stream));
- } while (ret < 0 && errno == EINTR);
+ ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
if (ret < 0) {
- PERROR("Consumer write %s stream to pipe %d",
- stream->metadata_flag ? "metadata" : "data", stream_pipe);
+ ERR("Consumer write %s stream to pipe %d",
+ stream->metadata_flag ? "metadata" : "data",
+ lttng_pipe_get_writefd(stream_pipe));
}
return ret;