From: David Goulet Date: Thu, 11 Oct 2012 19:22:59 +0000 (-0400) Subject: Move add data stream to the data thread X-Git-Tag: v2.1.0-rc5~18 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=c869f647b0c4476645ab9ee01e362401fb8c1e42;p=lttng-tools.git Move add data stream to the data thread As a second step of refactoring, upon receiving a data stream, we send it to the data thread that is now in charge of handling it. The ustctl_* calls are moved into the on_recv_stream function call executed once the stream is allocated and fds have been received. This also fits with the kernel consumer which mmap the buffers in the on_recv_stream call. This commit should speed up the add stream process for the session daemon. There is still some actions to move out of the session daemon poll thread to significantly gain speed, especially for network streaming. Signed-off-by: David Goulet --- diff --git a/src/common/consumer.c b/src/common/consumer.c index cf1b7b96f..be78e256f 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -89,7 +89,7 @@ static struct lttng_consumer_stream *consumer_find_stream(int key, return stream; } -static void consumer_steal_stream_key(int key, struct lttng_ht *ht) +void consumer_steal_stream_key(int key, struct lttng_ht *ht) { struct lttng_consumer_stream *stream; @@ -409,6 +409,14 @@ struct lttng_consumer_stream *consumer_allocate_stream( lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd); lttng_ht_node_init_ulong(&stream->node, stream->key); + /* + * The cpu number is needed before using any ustctl_* actions. Ignored for + * the kernel so the value does not matter. + */ + pthread_mutex_lock(&consumer_data.lock); + stream->cpu = stream->chan->cpucount++; + pthread_mutex_unlock(&consumer_data.lock); + DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu," " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key, stream->shm_fd, stream->wait_fd, @@ -437,28 +445,6 @@ int consumer_add_stream(struct lttng_consumer_stream *stream) pthread_mutex_lock(&consumer_data.lock); rcu_read_lock(); - switch (consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - stream->cpu = stream->chan->cpucount++; - ret = lttng_ustconsumer_add_stream(stream); - if (ret) { - ret = -EINVAL; - goto error; - } - - /* Steal stream identifier only for UST */ - consumer_steal_stream_key(stream->key, consumer_data.stream_ht); - break; - default: - ERR("Unknown consumer_data type"); - assert(0); - ret = -ENOSYS; - goto error; - } - lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node); /* Check and cleanup relayd */ @@ -485,7 +471,6 @@ int consumer_add_stream(struct lttng_consumer_stream *stream) consumer_data.stream_count++; consumer_data.need_update = 1; -error: rcu_read_unlock(); pthread_mutex_unlock(&consumer_data.lock); @@ -1585,12 +1570,6 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, goto free_stream; } - rcu_read_lock(); - iter.iter.node = &stream->waitfd_node.node; - ret = lttng_ht_del(ht, &iter); - assert(!ret); - rcu_read_unlock(); - pthread_mutex_lock(&consumer_data.lock); switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -1611,6 +1590,12 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, goto end; } + rcu_read_lock(); + iter.iter.node = &stream->waitfd_node.node; + ret = lttng_ht_del(ht, &iter); + assert(!ret); + rcu_read_unlock(); + if (stream->out_fd >= 0) { ret = close(stream->out_fd); if (ret) { @@ -1697,27 +1682,6 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream, pthread_mutex_lock(&consumer_data.lock); - switch (consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - ret = lttng_ustconsumer_add_stream(stream); - if (ret) { - ret = -EINVAL; - goto error; - } - - /* Steal stream identifier only for UST */ - consumer_steal_stream_key(stream->wait_fd, ht); - break; - default: - ERR("Unknown consumer_data type"); - assert(0); - ret = -ENOSYS; - goto error; - } - /* * From here, refcounts are updated so be _careful_ when returning an error * after this point. @@ -1747,7 +1711,6 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream, lttng_ht_add_unique_ulong(ht, &stream->waitfd_node); rcu_read_unlock(); -error: pthread_mutex_unlock(&consumer_data.lock); return ret; } @@ -1947,7 +1910,7 @@ void *consumer_thread_data_poll(void *data) int num_rdy, num_hup, high_prio, ret, i; struct pollfd *pollfd = NULL; /* local view of the streams */ - struct lttng_consumer_stream **local_stream = NULL; + struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; /* local view of consumer_data.fds_count */ int nb_fd = 0; struct lttng_consumer_local_data *ctx = data; @@ -2035,13 +1998,35 @@ void *consumer_thread_data_poll(void *data) */ if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) { size_t pipe_readlen; - char tmp; DBG("consumer_poll_pipe wake up"); /* Consume 1 byte of pipe data */ do { - pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1); + pipe_readlen = read(ctx->consumer_poll_pipe[0], &new_stream, + sizeof(new_stream)); } while (pipe_readlen == -1 && errno == EINTR); + + /* + * If the stream is NULL, just ignore it. It's also possible that + * the sessiond poll thread changed the consumer_quit state and is + * waking us up to test it. + */ + if (new_stream == NULL) { + continue; + } + + ret = consumer_add_stream(new_stream); + if (ret) { + ERR("Consumer add stream %d failed. Continuing", + new_stream->key); + /* + * At this point, if the add_stream fails, it is not in the + * hash table thus passing the NULL value here. + */ + consumer_del_stream(new_stream, NULL); + } + + /* Continue to update the local streams and handle prio ones */ continue; } @@ -2261,19 +2246,16 @@ end: consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT; /* - * Wake-up the other end by writing a null byte in the pipe - * (non-blocking). Important note: Because writing into the - * pipe is non-blocking (and therefore we allow dropping wakeup - * data, as long as there is wakeup data present in the pipe - * buffer to wake up the other end), the other end should - * perform the following sequence for waiting: - * 1) empty the pipe (reads). - * 2) perform update operation. - * 3) wait on the pipe (poll). + * Notify the data poll thread to poll back again and test the + * consumer_quit state to quit gracefully. */ do { - ret = write(ctx->consumer_poll_pipe[1], "", 1); + struct lttng_consumer_stream *null_stream = NULL; + + ret = write(ctx->consumer_poll_pipe[1], &null_stream, + sizeof(null_stream)); } while (ret < 0 && errno == EINTR); + rcu_unregister_thread(); return NULL; } diff --git a/src/common/consumer.h b/src/common/consumer.h index 4b225e43c..8e5891aef 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -362,6 +362,7 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( struct consumer_relayd_sock_pair *consumer_find_relayd(int key); int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, size_t data_size); +void consumer_steal_stream_key(int key, struct lttng_ht *ht); extern struct lttng_consumer_local_data *lttng_consumer_create( enum lttng_consumer_type type, diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 13cbe2149..1d725c231 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -233,12 +233,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (ret < 0) { PERROR("write metadata pipe"); consumer_del_stream(new_stream, NULL); + goto end_nosignal; } } else { - ret = consumer_add_stream(new_stream); - if (ret) { - ERR("Consumer add stream %d failed. Continuing", - new_stream->key); + do { + ret = write(ctx->consumer_poll_pipe[1], &new_stream, + sizeof(new_stream)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("write data pipe"); consumer_del_stream(new_stream, NULL); goto end_nosignal; } @@ -284,20 +287,6 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } - /* - * Wake-up the other end by writing a null byte in the pipe (non-blocking). - * Important note: Because writing into the pipe is non-blocking (and - * therefore we allow dropping wakeup data, as long as there is wakeup data - * present in the pipe buffer to wake up the other end), the other end - * should perform the following sequence for waiting: - * - * 1) empty the pipe (reads). - * 2) perform update operation. - * 3) wait on the pipe (poll). - */ - do { - ret = write(ctx->consumer_poll_pipe[1], "", 1); - } while (ret < 0 && errno == EINTR); end_nosignal: rcu_read_unlock(); diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 11706877a..718887971 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -265,14 +265,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } } else { - ret = consumer_add_stream(new_stream); - if (ret) { - ERR("Consumer add stream %d failed. Continuing", - new_stream->key); - /* - * At this point, if the add_stream fails, it is not in the - * hash table thus passing the NULL value here. - */ + do { + ret = write(ctx->consumer_poll_pipe[1], &new_stream, + sizeof(new_stream)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("write data pipe"); consumer_del_stream(new_stream, NULL); goto end_nosignal; } @@ -334,20 +332,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, break; } - /* - * Wake-up the other end by writing a null byte in the pipe (non-blocking). - * Important note: Because writing into the pipe is non-blocking (and - * therefore we allow dropping wakeup data, as long as there is wakeup data - * present in the pipe buffer to wake up the other end), the other end - * should perform the following sequence for waiting: - * - * 1) empty the pipe (reads). - * 2) perform update operation. - * 3) wait on the pipe (poll). - */ - do { - ret = write(ctx->consumer_poll_pipe[1], "", 1); - } while (ret < 0 && errno == EINTR); end_nosignal: rcu_read_unlock(); @@ -528,6 +512,13 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) stream->out_fd = ret; } + ret = lttng_ustconsumer_add_stream(stream); + if (ret) { + consumer_del_stream(stream, NULL); + ret = -1; + goto error; + } + /* we return 0 to let the library handle the FD internally */ return 0;