free(session);
}
+static void close_stream(struct relay_stream *stream,
+ struct lttng_ht *viewer_streams_ht, struct lttng_ht *ctf_traces_ht)
+{
+ int delret;
+ struct relay_viewer_stream *vstream;
+ struct lttng_ht_iter iter;
+
+ assert(stream);
+ assert(viewer_streams_ht);
+
+ delret = close(stream->fd);
+ if (delret < 0) {
+ PERROR("close stream");
+ }
+
+ if (stream->index_fd >= 0) {
+ delret = close(stream->index_fd);
+ if (delret < 0) {
+ PERROR("close stream index_fd");
+ }
+ }
+
+ vstream = live_find_viewer_stream_by_id(stream->stream_handle,
+ viewer_streams_ht);
+ if (vstream) {
+ /*
+ * Set the last good value into the viewer stream. This is done
+ * right before the stream gets deleted from the hash table. The
+ * lookup failure on the live thread side of a stream indicates
+ * that the viewer stream index received value should be used.
+ */
+ vstream->total_index_received = stream->total_index_received;
+ }
+
+ iter.iter.node = &stream->stream_n.node;
+ delret = lttng_ht_del(relay_streams_ht, &iter);
+ assert(!delret);
+ iter.iter.node = &stream->ctf_trace_node.node;
+ delret = lttng_ht_del(ctf_traces_ht, &iter);
+ assert(!delret);
+ call_rcu(&stream->rcu_node, deferred_free_stream);
+ DBG("Closed tracefile %d from close stream", stream->fd);
+}
+
/*
* relay_delete_session: Free all memory associated with a session and
* close all the FDs
int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_command *cmd, struct lttng_ht *viewer_streams_ht)
{
+ int ret, send_ret;
struct relay_session *session = cmd->session;
struct lttcomm_relayd_close_stream stream_info;
struct lttcomm_relayd_generic_reply reply;
struct relay_stream *stream;
- int ret, send_ret;
- struct lttng_ht_iter iter;
DBG("Close stream received");
stream->close_flag = 1;
if (close_stream_check(stream)) {
- int delret;
- struct relay_viewer_stream *vstream;
-
- delret = close(stream->fd);
- if (delret < 0) {
- PERROR("close stream");
- }
-
- if (stream->index_fd >= 0) {
- delret = close(stream->index_fd);
- if (delret < 0) {
- PERROR("close stream index_fd");
- }
- }
-
- vstream = live_find_viewer_stream_by_id(stream->stream_handle,
- viewer_streams_ht);
- if (vstream) {
- /*
- * Set the last good value into the viewer stream. This is done
- * right before the stream gets deleted from the hash table. The
- * lookup failure on the live thread side of a stream indicates
- * that the viewer stream index received value should be used.
- */
- vstream->total_index_received = stream->total_index_received;
- }
-
- iter.iter.node = &stream->stream_n.node;
- delret = lttng_ht_del(relay_streams_ht, &iter);
- assert(!delret);
- iter.iter.node = &stream->ctf_trace_node.node;
- delret = lttng_ht_del(cmd->ctf_traces_ht, &iter);
- assert(!delret);
- call_rcu(&stream->rcu_node,
- deferred_free_stream);
- DBG("Closed tracefile %d from close stream", stream->fd);
+ close_stream(stream, viewer_streams_ht, cmd->ctf_traces_ht);
}
end_unlock:
*/
static
int relay_process_data(struct relay_command *cmd,
- struct lttng_ht *indexes_ht)
+ struct lttng_ht *indexes_ht, struct lttng_ht *viewer_streams_ht)
{
int ret = 0, rotate_index = 0, index_created = 0;
struct relay_stream *stream;
/* Check if we need to close the FD */
if (close_stream_check(stream)) {
- int cret;
- struct lttng_ht_iter iter;
-
- cret = close(stream->fd);
- if (cret < 0) {
- PERROR("close stream process data");
- }
-
- cret = close(stream->index_fd);
- if (cret < 0) {
- PERROR("close stream index_fd");
- }
- iter.iter.node = &stream->stream_n.node;
- ret = lttng_ht_del(relay_streams_ht, &iter);
- assert(!ret);
- call_rcu(&stream->rcu_node,
- deferred_free_stream);
- DBG("Closed tracefile %d after recv data", stream->fd);
+ close_stream(stream, viewer_streams_ht, cmd->ctf_traces_ht);
}
end_rcu_unlock:
continue;
}
- ret = relay_process_data(relay_connection, indexes_ht);
+ ret = relay_process_data(relay_connection, indexes_ht,
+ relay_ctx->viewer_streams_ht);
/* connection closed */
if (ret < 0) {
relay_cleanup_poll_connection(&events, pollfd);
&iter, relay_connection, sessions_ht);
}
}
- rcu_read_unlock();
error_poll_create:
- lttng_ht_destroy(indexes_ht);
+ {
+ struct relay_index *index;
+ cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) {
+ relay_index_delete(index, indexes_ht);
+ }
+ lttng_ht_destroy(indexes_ht);
+ }
+ rcu_read_unlock();
indexes_ht_error:
lttng_ht_destroy(relay_connections_ht);
relay_connections_ht_error:
#include <common/common.h>
#include <common/index/index.h>
+#include <common/kernel-consumer/kernel-consumer.h>
#include <common/relayd/relayd.h>
#include <common/ust-consumer/ust-consumer.h>
rcu_read_unlock();
return ret;
}
+
+/*
+ * Synchronize the metadata using a given session ID. A successful acquisition
+ * of a metadata stream will trigger a request to the session daemon and a
+ * snapshot so the metadata thread can consume it.
+ *
+ * This function call is a rendez-vous point between the metadata thread and
+ * the data thread.
+ *
+ * Return 0 on success or else a negative value.
+ */
+int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
+ uint64_t session_id)
+{
+ int ret;
+ struct lttng_consumer_stream *metadata = NULL, *stream = NULL;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht;
+
+ assert(ctx);
+
+ /* Ease our life a bit. */
+ ht = consumer_data.stream_list_ht;
+
+ rcu_read_lock();
+
+ /* Search the metadata associated with the session id of the given stream. */
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct,
+ &session_id, &iter.iter, stream, node_session_id.node) {
+ if (stream->metadata_flag) {
+ metadata = stream;
+ break;
+ }
+ }
+ if (!metadata) {
+ ret = 0;
+ goto end_unlock_rcu;
+ }
+
+ /*
+ * In UST, since we have to write the metadata from the cache packet
+ * by packet, we might need to start this procedure multiple times
+ * until all the metadata from the cache has been extracted.
+ */
+ do {
+ /*
+ * Steps :
+ * - Lock the metadata stream
+ * - Check if metadata stream node was deleted before locking.
+ * - if yes, release and return success
+ * - Check if new metadata is ready (flush + snapshot pos)
+ * - If nothing : release and return.
+ * - Lock the metadata_rdv_lock
+ * - Unlock the metadata stream
+ * - cond_wait on metadata_rdv to wait the wakeup from the
+ * metadata thread
+ * - Unlock the metadata_rdv_lock
+ */
+ pthread_mutex_lock(&metadata->lock);
+
+ /*
+ * There is a possibility that we were able to acquire a reference on the
+ * stream from the RCU hash table but between then and now, the node might
+ * have been deleted just before the lock is acquired. Thus, after locking,
+ * we make sure the metadata node has not been deleted which means that the
+ * buffers are closed.
+ *
+ * In that case, there is no need to sync the metadata hence returning a
+ * success return code.
+ */
+ ret = cds_lfht_is_node_deleted(&metadata->node.node);
+ if (ret) {
+ ret = 0;
+ goto end_unlock_mutex;
+ }
+
+ switch (ctx->type) {
+ case LTTNG_CONSUMER_KERNEL:
+ /*
+ * Empty the metadata cache and flush the current stream.
+ */
+ ret = lttng_kconsumer_sync_metadata(metadata);
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ /*
+ * Ask the sessiond if we have new metadata waiting and update the
+ * consumer metadata cache.
+ */
+ ret = lttng_ustconsumer_sync_metadata(ctx, metadata);
+ break;
+ default:
+ assert(0);
+ ret = -1;
+ break;
+ }
+ /*
+ * Error or no new metadata, we exit here.
+ */
+ if (ret <= 0 || ret == ENODATA) {
+ goto end_unlock_mutex;
+ }
+
+ /*
+ * At this point, new metadata have been flushed, so we wait on the
+ * rendez-vous point for the metadata thread to wake us up when it
+ * finishes consuming the metadata and continue execution.
+ */
+
+ pthread_mutex_lock(&metadata->metadata_rdv_lock);
+
+ /*
+ * Release metadata stream lock so the metadata thread can process it.
+ */
+ pthread_mutex_unlock(&metadata->lock);
+
+ /*
+ * Wait on the rendez-vous point. Once woken up, it means the metadata was
+ * consumed and thus synchronization is achieved.
+ */
+ pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock);
+ pthread_mutex_unlock(&metadata->metadata_rdv_lock);
+ } while (ret == EAGAIN);
+
+ ret = 0;
+ goto end_unlock_rcu;
+
+end_unlock_mutex:
+ pthread_mutex_unlock(&metadata->lock);
+end_unlock_rcu:
+ rcu_read_unlock();
+ return ret;
+}
int consumer_stream_write_index(struct lttng_consumer_stream *stream,
struct lttng_packet_index *index);
+int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
+ uint64_t session_id);
+
#endif /* LTTNG_CONSUMER_STREAM_H */
* they are held while consumer_timer_switch_stop() is
* called.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, channel, 1);
+ ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
if (ret < 0) {
channel->switch_timer_error = 1;
}
* safely send the empty index.
*/
pthread_mutex_lock(&stream->lock);
+ ret = cds_lfht_is_node_deleted(&stream->node.node);
+ if (ret) {
+ goto error_unlock;
+ }
+
ret = ustctl_get_current_timestamp(stream->ustream, &ts);
if (ret < 0) {
ERR("Failed to get the current timestamp");
ustctl_flush_buffer(stream->ustream, 1);
ret = ustctl_snapshot(stream->ustream);
if (ret < 0) {
- if (errno != EAGAIN) {
+ if (ret != -EAGAIN) {
ERR("Taking UST snapshot");
ret = -1;
goto error_unlock;
stream->metadata_flag = 1;
/* Metadata is flat out. */
strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
+ /* Live rendez-vous point. */
+ pthread_cond_init(&stream->metadata_rdv, NULL);
+ pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
} else {
/* Format stream name to <channel_name>_<cpu_number> */
ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
ssize_t ret;
pthread_mutex_lock(&stream->lock);
+ if (stream->metadata_flag) {
+ pthread_mutex_lock(&stream->metadata_rdv_lock);
+ }
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
}
+ if (stream->metadata_flag) {
+ pthread_cond_broadcast(&stream->metadata_rdv);
+ pthread_mutex_unlock(&stream->metadata_rdv_lock);
+ }
pthread_mutex_unlock(&stream->lock);
return ret;
}
* FD of the index file for this stream.
*/
int index_fd;
+
+ /*
+ * Rendez-vous point between data and metadata stream in live mode.
+ */
+ pthread_cond_t metadata_rdv;
+ pthread_mutex_t metadata_rdv_lock;
};
/*
} else {
ret = consumer_add_channel(new_channel, ctx);
}
- consumer_timer_live_start(new_channel, msg.u.channel.live_timer_interval);
+ if (CONSUMER_CHANNEL_TYPE_DATA) {
+ consumer_timer_live_start(new_channel,
+ msg.u.channel.live_timer_interval);
+ }
/* If we received an error in add_channel, we need to report it. */
if (ret < 0) {
error:
return ret;
}
+/*
+ * Sync metadata meaning request them to the session daemon and snapshot to the
+ * metadata thread can consumer them.
+ *
+ * Metadata stream lock MUST be acquired.
+ *
+ * Return 0 if new metadatda is available, EAGAIN if the metadata stream
+ * is empty or a negative value on error.
+ */
+int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata)
+{
+ int ret;
+
+ assert(metadata);
+
+ ret = kernctl_buffer_flush(metadata->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to flush kernel stream");
+ goto end;
+ }
+
+ ret = kernctl_snapshot(metadata->wait_fd);
+ if (ret < 0) {
+ if (errno != EAGAIN) {
+ ERR("Sync metadata, taking kernel snapshot failed.");
+ goto end;
+ }
+ DBG("Sync metadata, no new kernel metadata");
+ /* No new metadata, exit. */
+ ret = ENODATA;
+ goto end;
+ }
+
+end:
+ return ret;
+}
/*
* Consume data on a file descriptor and write it on a trace file.
goto end;
}
+ if (stream->chan->live_timer_interval && !stream->metadata_flag) {
+ /*
+ * In live, block until all the metadata is sent.
+ */
+ err = consumer_stream_sync_metadata(ctx, stream->session_id);
+ if (err < 0) {
+ goto end;
+ }
+ }
+
err = consumer_stream_write_index(stream, &index);
if (err < 0) {
goto end;
struct lttng_consumer_local_data *ctx);
int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream);
int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream);
+int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata);
#endif /* _LTTNG_KCONSUMER_H */
* Ask the sessiond if we have new metadata waiting and update the
* consumer metadata cache.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0);
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
if (ret < 0) {
goto error;
}
metadata_stream->tracefile_size_current = 0;
}
- pthread_mutex_lock(&metadata_channel->metadata_cache->lock);
-
do {
ret = lttng_consumer_read_subbuffer(metadata_stream, ctx);
if (ret < 0) {
- goto error_unlock;
+ goto error_stream;
}
} while (ret > 0);
-error_unlock:
- pthread_mutex_unlock(&metadata_channel->metadata_cache->lock);
-
error_stream:
/*
* Clean up the stream completly because the next snapshot will use a new
*/
int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
uint64_t len, struct lttng_consumer_channel *channel,
- int timer)
+ int timer, int wait)
{
int ret, ret_code = LTTNG_OK;
char *metadata_str;
}
pthread_mutex_unlock(&channel->metadata_cache->lock);
+ if (!wait) {
+ goto end_free;
+ }
while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
DBG("Waiting for metadata to be flushed");
usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
}
consumer_timer_switch_start(channel, attr.switch_timer_interval);
attr.switch_timer_interval = 0;
+ } else {
+ consumer_timer_live_start(channel,
+ msg.u.ask_channel.live_timer_interval);
}
- consumer_timer_live_start(channel, msg.u.ask_channel.live_timer_interval);
-
/*
* Add the channel to the internal state AFTER all streams were created
* and successfully sent to session daemon. This way, all streams must
}
ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
- len, channel, 0);
+ len, channel, 0, 1);
if (ret < 0) {
/* error receiving from sessiond */
goto error_fatal;
return ret;
}
+/*
+ * Write up to one packet from the metadata cache to the channel.
+ *
+ * Returns the number of bytes pushed in the cache, or a negative value
+ * on error.
+ */
+static
+int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
+{
+ ssize_t write_len;
+ int ret;
+
+ pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+ if (stream->chan->metadata_cache->contiguous
+ == stream->ust_metadata_pushed) {
+ ret = 0;
+ goto end;
+ }
+
+ write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
+ &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
+ stream->chan->metadata_cache->contiguous
+ - stream->ust_metadata_pushed);
+ assert(write_len != 0);
+ if (write_len < 0) {
+ ERR("Writing one metadata packet");
+ ret = -1;
+ goto end;
+ }
+ stream->ust_metadata_pushed += write_len;
+
+ assert(stream->chan->metadata_cache->contiguous >=
+ stream->ust_metadata_pushed);
+ ret = write_len;
+
+end:
+ pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
+ return ret;
+}
+
+/*
+ * Sync metadata meaning request them to the session daemon and snapshot to the
+ * metadata thread can consumer them.
+ *
+ * Metadata stream lock MUST be acquired.
+ *
+ * Return 0 if new metadatda is available, EAGAIN if the metadata stream
+ * is empty or a negative value on error.
+ */
+int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *metadata)
+{
+ int ret;
+ int retry = 0;
+
+ assert(ctx);
+ assert(metadata);
+
+ /*
+ * Request metadata from the sessiond, but don't wait for the flush
+ * because we locked the metadata thread.
+ */
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata->chan, 0, 0);
+ if (ret < 0) {
+ goto end;
+ }
+
+ ret = commit_one_metadata_packet(metadata);
+ if (ret <= 0) {
+ goto end;
+ } else if (ret > 0) {
+ retry = 1;
+ }
+
+ ustctl_flush_buffer(metadata->ustream, 1);
+ ret = ustctl_snapshot(metadata->ustream);
+ if (ret < 0) {
+ if (errno != EAGAIN) {
+ ERR("Sync metadata, taking UST snapshot");
+ goto end;
+ }
+ DBG("No new metadata when syncing them.");
+ /* No new metadata, exit. */
+ ret = ENODATA;
+ goto end;
+ }
+
+ /*
+ * After this flush, we still need to extract metadata.
+ */
+ if (retry) {
+ ret = EAGAIN;
+ }
+
+end:
+ return ret;
+}
+
+/*
+ * Read subbuffer from the given stream.
+ *
+ * Stream lock MUST be acquired.
+ *
+ * Return 0 on success else a negative value.
+ */
int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
* already been read.
*/
if (stream->metadata_flag) {
- ssize_t write_len;
-
- if (stream->chan->metadata_cache->contiguous
- == stream->ust_metadata_pushed) {
- ret = 0;
+ ret = commit_one_metadata_packet(stream);
+ if (ret <= 0) {
goto end;
}
-
- write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
- &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
- stream->chan->metadata_cache->contiguous
- - stream->ust_metadata_pushed);
- assert(write_len != 0);
- if (write_len < 0) {
- ERR("Writing one metadata packet");
- ret = -1;
- goto end;
- }
- stream->ust_metadata_pushed += write_len;
ustctl_flush_buffer(stream->ustream, 1);
goto retry;
}
goto end;
}
+ if (stream->chan->live_timer_interval && !stream->metadata_flag) {
+ /*
+ * In live, block until all the metadata is sent.
+ */
+ err = consumer_stream_sync_metadata(ctx, stream->session_id);
+ if (err < 0) {
+ goto end;
+ }
+ }
+
assert(!stream->metadata_flag);
err = consumer_stream_write_index(stream, &index);
if (err < 0) {
* introduces deadlocks.
*/
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *channel, int timer)
+ struct lttng_consumer_channel *channel, int timer, int wait)
{
struct lttcomm_metadata_request_msg request;
struct lttcomm_consumer_msg msg;
}
ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
- key, offset, len, channel, timer);
+ key, offset, len, channel, timer, wait);
if (ret_code >= 0) {
/*
* Only send the status msg if the sessiond is alive meaning a positive
void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream);
int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
uint64_t len, struct lttng_consumer_channel *channel,
- int timer);
+ int timer, int wait);
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *channel, int timer);
+ struct lttng_consumer_channel *channel, int timer, int wait);
+int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *metadata);
#else /* HAVE_LIBLTTNG_UST_CTL */
{
return -ENOSYS;
}
+static inline
+int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *metadata)
+{
+ return -ENOSYS;
+}
#endif /* HAVE_LIBLTTNG_UST_CTL */
#endif /* _LTTNG_USTCONSUMER_H */