{
int ret;
char *metadata_str = NULL;
- size_t len, offset;
+ size_t len, offset, new_metadata_len_sent;
ssize_t ret_val;
+ uint64_t metadata_key;
assert(registry);
assert(socket);
+ metadata_key = registry->metadata_key;
+
/*
* Means that no metadata was assigned to the session. This can
* happens if no start has been done previously.
*/
- if (!registry->metadata_key) {
+ if (!metadata_key) {
return 0;
}
offset = registry->metadata_len_sent;
len = registry->metadata_len - registry->metadata_len_sent;
+ new_metadata_len_sent = registry->metadata_len;
if (len == 0) {
DBG3("No metadata to push for metadata key %" PRIu64,
registry->metadata_key);
ret_val = -ENOMEM;
goto error;
}
- /* Copy what we haven't send out. */
+ /* Copy what we haven't sent out. */
memcpy(metadata_str, registry->metadata + offset, len);
- registry->metadata_len_sent += len;
push_data:
- ret = consumer_push_metadata(socket, registry->metadata_key,
+ pthread_mutex_unlock(®istry->lock);
+ /*
+ * We need to unlock the registry while we push metadata to
+ * break a circular dependency between the consumerd metadata
+ * lock and the sessiond registry lock. Indeed, pushing metadata
+ * to the consumerd awaits that it gets pushed all the way to
+ * relayd, but doing so requires grabbing the metadata lock. If
+ * a concurrent metadata request is being performed by
+ * consumerd, this can try to grab the registry lock on the
+ * sessiond while holding the metadata lock on the consumer
+ * daemon. Those push and pull schemes are performed on two
+ * different bidirectionnal communication sockets.
+ */
+ ret = consumer_push_metadata(socket, metadata_key,
metadata_str, len, offset);
+ pthread_mutex_lock(®istry->lock);
if (ret < 0) {
/*
* There is an acceptable race here between the registry
*/
if (ret == -LTTCOMM_CONSUMERD_CHANNEL_FAIL) {
ret = 0;
+ } else {
+ ERR("Error pushing metadata to consumer");
}
-
- /*
- * Update back the actual metadata len sent since it
- * failed here.
- */
- registry->metadata_len_sent -= len;
ret_val = ret;
goto error_push;
+ } else {
+ /*
+ * Metadata may have been concurrently pushed, since
+ * we're not holding the registry lock while pushing to
+ * consumer. This is handled by the fact that we send
+ * the metadata content, size, and the offset at which
+ * that metadata belongs. This may arrive out of order
+ * on the consumer side, and the consumer is able to
+ * deal with overlapping fragments. The consumer
+ * supports overlapping fragments, which must be
+ * contiguous starting from offset 0. We keep the
+ * largest metadata_len_sent value of the concurrent
+ * send.
+ */
+ registry->metadata_len_sent =
+ max_t(size_t, registry->metadata_len_sent,
+ new_metadata_len_sent);
}
-
free(metadata_str);
return len;
/*
* Write metadata to the cache, extend the cache if necessary. We support
- * non-contiguous updates but not overlapping ones. If there is contiguous
- * metadata in the cache, we send it to the ring buffer. The metadata cache
+ * overlapping updates, but they need to be contiguous. Send the
+ * contiguous metadata in cache to the ring buffer. The metadata cache
* lock MUST be acquired to write in the cache.
*
* Return 0 on success, a negative value on error.
}
memcpy(cache->data + offset, data, len);
- cache->total_bytes_written += len;
if (offset + len > cache->max_offset) {
- cache->max_offset = offset + len;
- }
-
- if (cache->max_offset == cache->total_bytes_written) {
char dummy = 'c';
- cache->contiguous = cache->max_offset;
+ cache->max_offset = offset + len;
if (channel->monitor) {
size_ret = lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1],
&dummy, 1);
struct consumer_metadata_cache {
char *data;
uint64_t cache_alloc_size;
- /*
- * How many bytes from the cache are written contiguously.
- */
- uint64_t contiguous;
- /*
- * How many bytes are written in the buffer (excluding the wholes).
- */
- uint64_t total_bytes_written;
/*
* The upper-limit of data written inside the buffer.
*
* With the total_bytes_written it allows us to keep track of when the
* cache contains contiguous metadata ready to be sent to the RB.
- * The metadata cache updates must not overlap.
+ * All cached data is contiguous.
*/
uint64_t max_offset;
/*
return ret;
}
-static int check_kernel_stream(struct lttng_consumer_stream *stream)
+int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
{
uint64_t ts, stream_id;
int ret;
- /*
- * While holding the stream mutex, try to take a snapshot, if it
- * succeeds, it means that data is ready to be sent, just let the data
- * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
- * means that there is no data to read after the flush, so we can
- * safely send the empty index.
- */
- pthread_mutex_lock(&stream->lock);
ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
if (ret < 0) {
ERR("Failed to get the current timestamp");
- goto error_unlock;
+ goto end;
}
ret = kernctl_buffer_flush(stream->wait_fd);
if (ret < 0) {
ERR("Failed to flush kernel stream");
- goto error_unlock;
+ goto end;
}
ret = kernctl_snapshot(stream->wait_fd);
if (ret < 0) {
if (errno != EAGAIN && errno != ENODATA) {
PERROR("live timer kernel snapshot");
ret = -1;
- goto error_unlock;
+ goto end;
}
ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
if (ret < 0) {
PERROR("kernctl_get_stream_id");
- goto error_unlock;
+ goto end;
}
DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
ret = send_empty_index(stream, ts, stream_id);
if (ret < 0) {
- goto error_unlock;
+ goto end;
}
}
ret = 0;
-
-error_unlock:
- pthread_mutex_unlock(&stream->lock);
+end:
return ret;
}
-static int check_ust_stream(struct lttng_consumer_stream *stream)
+static int check_kernel_stream(struct lttng_consumer_stream *stream)
{
- uint64_t ts, stream_id;
int ret;
- assert(stream);
- assert(stream->ustream);
/*
* While holding the stream mutex, try to take a snapshot, if it
* succeeds, it means that data is ready to be sent, just let the data
* thread handle that. Otherwise, if the snapshot returns EAGAIN, it
* means that there is no data to read after the flush, so we can
* safely send the empty index.
+ *
+ * Doing a trylock and checking if waiting on metadata if
+ * trylock fails. Bail out of the stream is indeed waiting for
+ * metadata to be pushed. Busy wait on trylock otherwise.
*/
- pthread_mutex_lock(&stream->lock);
+ for (;;) {
+ ret = pthread_mutex_trylock(&stream->lock);
+ switch (ret) {
+ case 0:
+ break; /* We have the lock. */
+ case EBUSY:
+ pthread_mutex_lock(&stream->metadata_timer_lock);
+ if (stream->waiting_on_metadata) {
+ ret = 0;
+ stream->missed_metadata_flush = true;
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ goto end; /* Bail out. */
+ }
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ /* Try again. */
+ caa_cpu_relax();
+ continue;
+ default:
+ ERR("Unexpected pthread_mutex_trylock error %d", ret);
+ ret = -1;
+ goto end;
+ }
+ break;
+ }
+ ret = consumer_flush_kernel_index(stream);
+ pthread_mutex_unlock(&stream->lock);
+end:
+ return ret;
+}
+
+int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
+{
+ uint64_t ts, stream_id;
+ int ret;
+
ret = cds_lfht_is_node_deleted(&stream->node.node);
if (ret) {
- goto error_unlock;
+ goto end;
}
ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
if (ret < 0) {
ERR("Failed to get the current timestamp");
- goto error_unlock;
+ goto end;
}
lttng_ustconsumer_flush_buffer(stream, 1);
ret = lttng_ustconsumer_take_snapshot(stream);
if (ret != -EAGAIN) {
ERR("Taking UST snapshot");
ret = -1;
- goto error_unlock;
+ goto end;
}
ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
if (ret < 0) {
PERROR("ustctl_get_stream_id");
- goto error_unlock;
+ goto end;
}
DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
ret = send_empty_index(stream, ts, stream_id);
if (ret < 0) {
- goto error_unlock;
+ goto end;
}
}
ret = 0;
+end:
+ return ret;
+}
-error_unlock:
+static int check_ust_stream(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ assert(stream);
+ assert(stream->ustream);
+ /*
+ * While holding the stream mutex, try to take a snapshot, if it
+ * succeeds, it means that data is ready to be sent, just let the data
+ * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
+ * means that there is no data to read after the flush, so we can
+ * safely send the empty index.
+ *
+ * Doing a trylock and checking if waiting on metadata if
+ * trylock fails. Bail out of the stream is indeed waiting for
+ * metadata to be pushed. Busy wait on trylock otherwise.
+ */
+ for (;;) {
+ ret = pthread_mutex_trylock(&stream->lock);
+ switch (ret) {
+ case 0:
+ break; /* We have the lock. */
+ case EBUSY:
+ pthread_mutex_lock(&stream->metadata_timer_lock);
+ if (stream->waiting_on_metadata) {
+ ret = 0;
+ stream->missed_metadata_flush = true;
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ goto end; /* Bail out. */
+ }
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ /* Try again. */
+ caa_cpu_relax();
+ continue;
+ default:
+ ERR("Unexpected pthread_mutex_trylock error %d", ret);
+ ret = -1;
+ goto end;
+ }
+ break;
+ }
+ ret = consumer_flush_ust_index(stream);
pthread_mutex_unlock(&stream->lock);
+end:
return ret;
}
void *consumer_timer_thread(void *data);
void consumer_signal_init(void);
+int consumer_flush_kernel_index(struct lttng_consumer_stream *stream);
+int consumer_flush_ust_index(struct lttng_consumer_stream *stream);
+
#endif /* CONSUMER_TIMER_H */
stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
stream->index_fd = -1;
pthread_mutex_init(&stream->lock, NULL);
+ pthread_mutex_init(&stream->metadata_timer_lock, NULL);
/* If channel is the metadata, flag this stream as metadata. */
if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
int shm_fd_is_copy;
int data_read;
int hangup_flush_done;
+
+ /*
+ * metadata_timer_lock protects flags waiting_on_metadata and
+ * missed_metadata_flush.
+ */
+ pthread_mutex_t metadata_timer_lock;
+ /*
+ * Flag set when awaiting metadata to be pushed. Used in the
+ * timer thread to skip waiting on the stream (and stream lock) to
+ * ensure we can proceed to flushing metadata in live mode.
+ */
+ bool waiting_on_metadata;
+ /* Raised when a timer misses a metadata flush. */
+ bool missed_metadata_flush;
+
enum lttng_event_output output;
/* Maximum subbuffer size. */
unsigned long max_sb_size;
/*
* In live, block until all the metadata is sent.
*/
+ pthread_mutex_lock(&stream->metadata_timer_lock);
+ assert(!stream->missed_metadata_flush);
+ stream->waiting_on_metadata = true;
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+
err = consumer_stream_sync_metadata(ctx, stream->session_id);
+
+ pthread_mutex_lock(&stream->metadata_timer_lock);
+ stream->waiting_on_metadata = false;
+ if (stream->missed_metadata_flush) {
+ stream->missed_metadata_flush = false;
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ (void) consumer_flush_kernel_index(stream);
+ } else {
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ }
if (err < 0) {
goto end;
}
}
/*
- * Receive the metadata updates from the sessiond.
+ * Receive the metadata updates from the sessiond. Supports receiving
+ * overlapping metadata, but is needs to always belong to a contiguous
+ * range starting from 0.
+ * Be careful about the locks held when calling this function: it needs
+ * the metadata cache flush to concurrently progress in order to
+ * complete.
*/
int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
uint64_t len, struct lttng_consumer_channel *channel,
health_code_update();
+ if (!len) {
+ /*
+ * There is nothing to receive. We have simply
+ * checked whether the channel can be found.
+ */
+ ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ goto end_msg_sessiond;
+ }
+
/* Tell session daemon we are ready to receive the metadata. */
ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
if (ret < 0) {
int ret;
pthread_mutex_lock(&stream->chan->metadata_cache->lock);
- if (stream->chan->metadata_cache->contiguous
+ if (stream->chan->metadata_cache->max_offset
== stream->ust_metadata_pushed) {
ret = 0;
goto end;
write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
&stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
- stream->chan->metadata_cache->contiguous
+ stream->chan->metadata_cache->max_offset
- stream->ust_metadata_pushed);
assert(write_len != 0);
if (write_len < 0) {
}
stream->ust_metadata_pushed += write_len;
- assert(stream->chan->metadata_cache->contiguous >=
+ assert(stream->chan->metadata_cache->max_offset >=
stream->ust_metadata_pushed);
ret = write_len;
* Sync metadata meaning request them to the session daemon and snapshot to the
* metadata thread can consumer them.
*
- * Metadata stream lock MUST be acquired.
+ * Metadata stream lock is held here, but we need to release it when
+ * interacting with sessiond, else we cause a deadlock with live
+ * awaiting on metadata to be pushed out.
*
* Return 0 if new metadatda is available, EAGAIN if the metadata stream
* is empty or a negative value on error.
assert(ctx);
assert(metadata);
+ pthread_mutex_unlock(&metadata->lock);
/*
* Request metadata from the sessiond, but don't wait for the flush
* because we locked the metadata thread.
if (ret < 0) {
goto end;
}
+ pthread_mutex_lock(&metadata->lock);
ret = commit_one_metadata_packet(metadata);
if (ret <= 0) {
/*
* In live, block until all the metadata is sent.
*/
+ pthread_mutex_lock(&stream->metadata_timer_lock);
+ assert(!stream->missed_metadata_flush);
+ stream->waiting_on_metadata = true;
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+
err = consumer_stream_sync_metadata(ctx, stream->session_id);
+
+ pthread_mutex_lock(&stream->metadata_timer_lock);
+ stream->waiting_on_metadata = false;
+ if (stream->missed_metadata_flush) {
+ stream->missed_metadata_flush = false;
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ (void) consumer_flush_ust_index(stream);
+ } else {
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ }
+
if (err < 0) {
goto end;
}
uint64_t contiguous, pushed;
/* Ease our life a bit. */
- contiguous = stream->chan->metadata_cache->contiguous;
+ contiguous = stream->chan->metadata_cache->max_offset;
pushed = stream->ust_metadata_pushed;
/*
* function or any of its callees. Timers have a very strict locking
* semantic with respect to teardown. Failure to respect this semantic
* introduces deadlocks.
+ *
+ * DON'T hold the metadata lock when calling this function, else this
+ * can cause deadlock involving consumer awaiting for metadata to be
+ * pushed out due to concurrent interaction with the session daemon.
*/
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_channel *channel, int timer, int wait)