X-Git-Url: http://git.lttng.org./?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=2612063e2ab68b115113f6c8081b35708714f38d;hb=27c3d93497da2a109e6e42e9ef8c78e42caa127e;hp=ddfca408b07739b6d18b9853b168b1a60bad5594;hpb=e1eb1e0e23e27f60cb610708fc460dc0c8045d33;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index ddfca408b..2612063e2 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -564,6 +564,19 @@ free_stream_rcu: call_rcu(&stream->node.head, free_stream_rcu); } +/* + * XXX naming of del vs destroy is all mixed up. + */ +void consumer_del_stream_for_data(struct lttng_consumer_stream *stream) +{ + consumer_del_stream(stream, data_ht); +} + +void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream) +{ + consumer_del_stream(stream, metadata_ht); +} + struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, uint64_t stream_key, enum lttng_consumer_stream_state state, @@ -642,9 +655,9 @@ end: /* * Add a stream to the global list protected by a mutex. */ -static int add_stream(struct lttng_consumer_stream *stream, - struct lttng_ht *ht) +int consumer_add_data_stream(struct lttng_consumer_stream *stream) { + struct lttng_ht *ht = data_ht; int ret = 0; struct consumer_relayd_sock_pair *relayd; @@ -706,6 +719,11 @@ static int add_stream(struct lttng_consumer_stream *stream, return ret; } +void consumer_del_data_stream(struct lttng_consumer_stream *stream) +{ + consumer_del_stream(stream, data_ht); +} + /* * Add relayd socket to global consumer data hashtable. RCU read side lock MUST * be acquired before calling this. @@ -1349,6 +1367,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( if (stream->net_seq_idx != (uint64_t) -1ULL) { relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd == NULL) { + ret = -EPIPE; goto end; } } @@ -1358,28 +1377,31 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( case LTTNG_CONSUMER_KERNEL: mmap_base = stream->mmap_base; ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset); + if (ret != 0) { + PERROR("tracer ctl get_mmap_read_offset"); + written = -errno; + goto end; + } break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: mmap_base = lttng_ustctl_get_mmap_base(stream); if (!mmap_base) { ERR("read mmap get mmap base for stream %s", stream->name); - written = -1; + written = -EPERM; goto end; } ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset); - + if (ret != 0) { + PERROR("tracer ctl get_mmap_read_offset"); + written = ret; + goto end; + } break; default: ERR("Unknown consumer_data type"); assert(0); } - if (ret != 0) { - errno = -ret; - PERROR("tracer ctl get_mmap_read_offset"); - written = ret; - goto end; - } /* Handle stream on the relayd if the output is on the network */ if (relayd) { @@ -1442,6 +1464,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( outfd = stream->out_fd = ret; /* Reset current size because we just perform a rotation. */ stream->tracefile_size_current = 0; + stream->out_fd_offset = 0; + orig_offset = 0; } stream->tracefile_size_current += len; } @@ -1460,7 +1484,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( */ DBG("Error in file write mmap"); if (written == 0) { - written = ret; + written = -errno; } /* Socket operation failed. We consider the relayd dead */ if (errno == EPIPE || errno == EINVAL) { @@ -1548,6 +1572,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( if (stream->net_seq_idx != (uint64_t) -1ULL) { relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd == NULL) { + ret = -EPIPE; goto end; } } @@ -1624,6 +1649,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( outfd = stream->out_fd = ret; /* Reset current size because we just perform a rotation. */ stream->tracefile_size_current = 0; + stream->out_fd_offset = 0; + orig_offset = 0; } stream->tracefile_size_current += len; } @@ -2009,9 +2036,9 @@ free_stream_rcu: * Action done with the metadata stream when adding it to the consumer internal * data structures to handle it. */ -static int add_metadata_stream(struct lttng_consumer_stream *stream, - struct lttng_ht *ht) +int consumer_add_metadata_stream(struct lttng_consumer_stream *stream) { + struct lttng_ht *ht = metadata_ht; int ret = 0; struct consumer_relayd_sock_pair *relayd; struct lttng_ht_iter iter; @@ -2223,7 +2250,7 @@ restart: pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe, &stream, sizeof(stream)); if (pipe_len < 0) { - ERR("read metadata stream, ret: %ld", pipe_len); + ERR("read metadata stream, ret: %zd", pipe_len); /* * Continue here to handle the rest of the streams. */ @@ -2240,14 +2267,6 @@ restart: DBG("Adding metadata stream %d to poll set", stream->wait_fd); - ret = add_metadata_stream(stream, metadata_ht); - if (ret) { - ERR("Unable to add metadata stream"); - /* Stream was not setup properly. Continuing. */ - consumer_del_metadata_stream(stream, NULL); - continue; - } - /* Add metadata stream to the global poll events list */ lttng_poll_add(&events, stream->wait_fd, LPOLLIN | LPOLLPRI); @@ -2439,7 +2458,7 @@ void *consumer_thread_data_poll(void *data) pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe, &new_stream, sizeof(new_stream)); if (pipe_readlen < 0) { - ERR("Consumer data pipe ret %ld", pipe_readlen); + ERR("Consumer data pipe ret %zd", pipe_readlen); /* Continue so we can at least handle the current stream(s). */ continue; } @@ -2454,17 +2473,6 @@ void *consumer_thread_data_poll(void *data) continue; } - ret = add_stream(new_stream, data_ht); - if (ret) { - ERR("Consumer add stream %" PRIu64 " failed. Continuing", - new_stream->key); - /* - * At this point, if the add_stream fails, it is not in the - * hash table thus passing the NULL value here. - */ - consumer_del_stream(new_stream, NULL); - } - /* Continue to update the local streams and handle prio ones */ continue; }