X-Git-Url: http://git.lttng.org./?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=152455ca64fa7e938834c01d697bb3b97e6198a8;hb=b7d17fdfe9a0f52c272b4d2640a8e9b4b396ac1a;hp=bff360edb18165a9f7f70cd48c1dc9892ffcaf93;hpb=aa01b94c96331e8a79fbf34666961d72b0bd08d1;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index bff360edb..152455ca6 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -79,6 +79,7 @@ static void destroy_channel(struct lttng_consumer_channel *channel) */ if (channel->uchan) { lttng_ustconsumer_del_channel(channel); + lttng_ustconsumer_free_channel(channel); } free(channel); } @@ -327,8 +328,12 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, /* Keep stream reference when creating metadata. */ if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) { channel->metadata_stream = stream; - stream->ust_metadata_poll_pipe[0] = ust_metadata_pipe[0]; - stream->ust_metadata_poll_pipe[1] = ust_metadata_pipe[1]; + if (channel->monitor) { + /* Set metadata poll pipe if we created one */ + memcpy(stream->ust_metadata_poll_pipe, + ust_metadata_pipe, + sizeof(ust_metadata_pipe)); + } } } @@ -859,7 +864,7 @@ error: * Returns 0 on success, < 0 on error */ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, - uint64_t max_stream_size, struct lttng_consumer_local_data *ctx) + uint64_t nb_packets_per_stream, struct lttng_consumer_local_data *ctx) { int ret; unsigned use_relayd = 0; @@ -941,12 +946,13 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, /* * The original value is sent back if max stream size is larger than - * the possible size of the snapshot. Also, we asume that the session + * the possible size of the snapshot. Also, we assume that the session * daemon should never send a maximum stream size that is lower than * subbuffer size. */ - consumed_pos = consumer_get_consumed_maxsize(consumed_pos, - produced_pos, max_stream_size); + consumed_pos = consumer_get_consume_start_pos(consumed_pos, + produced_pos, nb_packets_per_stream, + stream->max_sb_size); while (consumed_pos < produced_pos) { ssize_t read_len; @@ -1023,7 +1029,12 @@ error: } /* - * Receive the metadata updates from the sessiond. + * Receive the metadata updates from the sessiond. Supports receiving + * overlapping metadata, but is needs to always belong to a contiguous + * range starting from 0. + * Be careful about the locks held when calling this function: it needs + * the metadata cache flush to concurrently progress in order to + * complete. */ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, uint64_t len, struct lttng_consumer_channel *channel, @@ -1435,6 +1446,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); + if (!len) { + /* + * There is nothing to receive. We have simply + * checked whether the channel can be found. + */ + ret_code = LTTCOMM_CONSUMERD_SUCCESS; + goto end_msg_sessiond; + } + /* Tell session daemon we are ready to receive the metadata. */ ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS); if (ret < 0) { @@ -1489,7 +1509,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = snapshot_channel(msg.u.snapshot_channel.key, msg.u.snapshot_channel.pathname, msg.u.snapshot_channel.relayd_id, - msg.u.snapshot_channel.max_stream_size, + msg.u.snapshot_channel.nb_packets_per_stream, ctx); if (ret < 0) { ERR("Snapshot channel failed"); @@ -1668,6 +1688,13 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) if (chan->switch_timer_enabled == 1) { consumer_timer_switch_stop(chan); } +} + +void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan) +{ + assert(chan); + assert(chan->uchan); + consumer_metadata_cache_destroy(chan); ustctl_destroy_channel(chan->uchan); } @@ -1768,7 +1795,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) int ret; pthread_mutex_lock(&stream->chan->metadata_cache->lock); - if (stream->chan->metadata_cache->contiguous + if (stream->chan->metadata_cache->max_offset == stream->ust_metadata_pushed) { ret = 0; goto end; @@ -1776,7 +1803,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan, &stream->chan->metadata_cache->data[stream->ust_metadata_pushed], - stream->chan->metadata_cache->contiguous + stream->chan->metadata_cache->max_offset - stream->ust_metadata_pushed); assert(write_len != 0); if (write_len < 0) { @@ -1786,7 +1813,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) } stream->ust_metadata_pushed += write_len; - assert(stream->chan->metadata_cache->contiguous >= + assert(stream->chan->metadata_cache->max_offset >= stream->ust_metadata_pushed); ret = write_len; @@ -1800,7 +1827,9 @@ end: * Sync metadata meaning request them to the session daemon and snapshot to the * metadata thread can consumer them. * - * Metadata stream lock MUST be acquired. + * Metadata stream lock is held here, but we need to release it when + * interacting with sessiond, else we cause a deadlock with live + * awaiting on metadata to be pushed out. * * Return 0 if new metadatda is available, EAGAIN if the metadata stream * is empty or a negative value on error. @@ -1814,6 +1843,7 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, assert(ctx); assert(metadata); + pthread_mutex_unlock(&metadata->lock); /* * Request metadata from the sessiond, but don't wait for the flush * because we locked the metadata thread. @@ -1822,6 +1852,7 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, if (ret < 0) { goto end; } + pthread_mutex_lock(&metadata->lock); ret = commit_one_metadata_packet(metadata); if (ret <= 0) { @@ -1881,7 +1912,7 @@ static int notify_if_more_data(struct lttng_consumer_stream *stream, goto end; } - ret = ustctl_put_next_subbuf(ustream); + ret = ustctl_put_subbuf(ustream); assert(!ret); /* This stream still has data. Flag it and wake up the data thread. */ @@ -2048,7 +2079,23 @@ retry: /* * In live, block until all the metadata is sent. */ + pthread_mutex_lock(&stream->metadata_timer_lock); + assert(!stream->missed_metadata_flush); + stream->waiting_on_metadata = true; + pthread_mutex_unlock(&stream->metadata_timer_lock); + err = consumer_stream_sync_metadata(ctx, stream->session_id); + + pthread_mutex_lock(&stream->metadata_timer_lock); + stream->waiting_on_metadata = false; + if (stream->missed_metadata_flush) { + stream->missed_metadata_flush = false; + pthread_mutex_unlock(&stream->metadata_timer_lock); + (void) consumer_flush_ust_index(stream); + } else { + pthread_mutex_unlock(&stream->metadata_timer_lock); + } + if (err < 0) { goto end; } @@ -2129,7 +2176,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) uint64_t contiguous, pushed; /* Ease our life a bit. */ - contiguous = stream->chan->metadata_cache->contiguous; + contiguous = stream->chan->metadata_cache->max_offset; pushed = stream->ust_metadata_pushed; /* @@ -2258,6 +2305,10 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) * function or any of its callees. Timers have a very strict locking * semantic with respect to teardown. Failure to respect this semantic * introduces deadlocks. + * + * DON'T hold the metadata lock when calling this function, else this + * can cause deadlock involving consumer awaiting for metadata to be + * pushed out due to concurrent interaction with the session daemon. */ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, struct lttng_consumer_channel *channel, int timer, int wait)