Test for new metadata at each packet
authorJulien Desfossez <jdesfossez@efficios.com>
Thu, 12 Sep 2013 15:04:22 +0000 (11:04 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Fri, 27 Sep 2013 17:57:14 +0000 (13:57 -0400)
After sending each data packet in live, we need to check if new metadata
is available before sending the index informing the viewer it can read
the trace.

Since the data and the metadata are handled by two different threads,
this patch introduces a rendez-vous point: if new metadata is available,
the data thread flushes the metadata stream and waits on a conditionnal
variable. When the metadata thread finishes to send its data, it wakes
up the data thread which can send its index.

That way, the viewer is informed new metadata is available before
attempting to read a packet that might require an update of the
metadata.

Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
src/bin/lttng-relayd/main.c
src/common/consumer-stream.c
src/common/consumer-stream.h
src/common/consumer-timer.c
src/common/consumer.c
src/common/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/kernel-consumer/kernel-consumer.h
src/common/ust-consumer/ust-consumer.c
src/common/ust-consumer/ust-consumer.h

index 43e6f318aef9eec57daecb9a6106f4b314c9296b..ba97ab3d78a05bd2cec21c564b8ce4ca6bf35262 100644 (file)
@@ -761,6 +761,50 @@ void deferred_free_session(struct rcu_head *head)
        free(session);
 }
 
        free(session);
 }
 
+static void close_stream(struct relay_stream *stream,
+               struct lttng_ht *viewer_streams_ht, struct lttng_ht *ctf_traces_ht)
+{
+       int delret;
+       struct relay_viewer_stream *vstream;
+       struct lttng_ht_iter iter;
+
+       assert(stream);
+       assert(viewer_streams_ht);
+
+       delret = close(stream->fd);
+       if (delret < 0) {
+               PERROR("close stream");
+       }
+
+       if (stream->index_fd >= 0) {
+               delret = close(stream->index_fd);
+               if (delret < 0) {
+                       PERROR("close stream index_fd");
+               }
+       }
+
+       vstream = live_find_viewer_stream_by_id(stream->stream_handle,
+                       viewer_streams_ht);
+       if (vstream) {
+               /*
+                * Set the last good value into the viewer stream. This is done
+                * right before the stream gets deleted from the hash table. The
+                * lookup failure on the live thread side of a stream indicates
+                * that the viewer stream index received value should be used.
+                */
+               vstream->total_index_received = stream->total_index_received;
+       }
+
+       iter.iter.node = &stream->stream_n.node;
+       delret = lttng_ht_del(relay_streams_ht, &iter);
+       assert(!delret);
+       iter.iter.node = &stream->ctf_trace_node.node;
+       delret = lttng_ht_del(ctf_traces_ht, &iter);
+       assert(!delret);
+       call_rcu(&stream->rcu_node, deferred_free_stream);
+       DBG("Closed tracefile %d from close stream", stream->fd);
+}
+
 /*
  * relay_delete_session: Free all memory associated with a session and
  * close all the FDs
 /*
  * relay_delete_session: Free all memory associated with a session and
  * close all the FDs
@@ -1031,12 +1075,11 @@ static
 int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
                struct relay_command *cmd, struct lttng_ht *viewer_streams_ht)
 {
 int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
                struct relay_command *cmd, struct lttng_ht *viewer_streams_ht)
 {
+       int ret, send_ret;
        struct relay_session *session = cmd->session;
        struct lttcomm_relayd_close_stream stream_info;
        struct lttcomm_relayd_generic_reply reply;
        struct relay_stream *stream;
        struct relay_session *session = cmd->session;
        struct lttcomm_relayd_close_stream stream_info;
        struct lttcomm_relayd_generic_reply reply;
        struct relay_stream *stream;
-       int ret, send_ret;
-       struct lttng_ht_iter iter;
 
        DBG("Close stream received");
 
 
        DBG("Close stream received");
 
@@ -1070,42 +1113,7 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
        stream->close_flag = 1;
 
        if (close_stream_check(stream)) {
        stream->close_flag = 1;
 
        if (close_stream_check(stream)) {
-               int delret;
-               struct relay_viewer_stream *vstream;
-
-               delret = close(stream->fd);
-               if (delret < 0) {
-                       PERROR("close stream");
-               }
-
-               if (stream->index_fd >= 0) {
-                       delret = close(stream->index_fd);
-                       if (delret < 0) {
-                               PERROR("close stream index_fd");
-                       }
-               }
-
-               vstream = live_find_viewer_stream_by_id(stream->stream_handle,
-                               viewer_streams_ht);
-               if (vstream) {
-                       /*
-                        * Set the last good value into the viewer stream. This is done
-                        * right before the stream gets deleted from the hash table. The
-                        * lookup failure on the live thread side of a stream indicates
-                        * that the viewer stream index received value should be used.
-                        */
-                       vstream->total_index_received = stream->total_index_received;
-               }
-
-               iter.iter.node = &stream->stream_n.node;
-               delret = lttng_ht_del(relay_streams_ht, &iter);
-               assert(!delret);
-               iter.iter.node = &stream->ctf_trace_node.node;
-               delret = lttng_ht_del(cmd->ctf_traces_ht, &iter);
-               assert(!delret);
-               call_rcu(&stream->rcu_node,
-                               deferred_free_stream);
-               DBG("Closed tracefile %d from close stream", stream->fd);
+               close_stream(stream, viewer_streams_ht, cmd->ctf_traces_ht);
        }
 
 end_unlock:
        }
 
 end_unlock:
@@ -1829,7 +1837,7 @@ end:
  */
 static
 int relay_process_data(struct relay_command *cmd,
  */
 static
 int relay_process_data(struct relay_command *cmd,
-               struct lttng_ht *indexes_ht)
+               struct lttng_ht *indexes_ht, struct lttng_ht *viewer_streams_ht)
 {
        int ret = 0, rotate_index = 0, index_created = 0;
        struct relay_stream *stream;
 {
        int ret = 0, rotate_index = 0, index_created = 0;
        struct relay_stream *stream;
@@ -2001,24 +2009,7 @@ int relay_process_data(struct relay_command *cmd,
 
        /* Check if we need to close the FD */
        if (close_stream_check(stream)) {
 
        /* Check if we need to close the FD */
        if (close_stream_check(stream)) {
-               int cret;
-               struct lttng_ht_iter iter;
-
-               cret = close(stream->fd);
-               if (cret < 0) {
-                       PERROR("close stream process data");
-               }
-
-               cret = close(stream->index_fd);
-               if (cret < 0) {
-                       PERROR("close stream index_fd");
-               }
-               iter.iter.node = &stream->stream_n.node;
-               ret = lttng_ht_del(relay_streams_ht, &iter);
-               assert(!ret);
-               call_rcu(&stream->rcu_node,
-                       deferred_free_stream);
-               DBG("Closed tracefile %d after recv data", stream->fd);
+               close_stream(stream, viewer_streams_ht, cmd->ctf_traces_ht);
        }
 
 end_rcu_unlock:
        }
 
 end_rcu_unlock:
@@ -2320,7 +2311,8 @@ restart:
                                                continue;
                                        }
 
                                                continue;
                                        }
 
-                                       ret = relay_process_data(relay_connection, indexes_ht);
+                                       ret = relay_process_data(relay_connection, indexes_ht,
+                                                       relay_ctx->viewer_streams_ht);
                                        /* connection closed */
                                        if (ret < 0) {
                                                relay_cleanup_poll_connection(&events, pollfd);
                                        /* connection closed */
                                        if (ret < 0) {
                                                relay_cleanup_poll_connection(&events, pollfd);
@@ -2360,9 +2352,15 @@ error:
                                        &iter, relay_connection, sessions_ht);
                }
        }
                                        &iter, relay_connection, sessions_ht);
                }
        }
-       rcu_read_unlock();
 error_poll_create:
 error_poll_create:
-       lttng_ht_destroy(indexes_ht);
+       {
+               struct relay_index *index;
+               cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) {
+                       relay_index_delete(index, indexes_ht);
+               }
+               lttng_ht_destroy(indexes_ht);
+       }
+       rcu_read_unlock();
 indexes_ht_error:
        lttng_ht_destroy(relay_connections_ht);
 relay_connections_ht_error:
 indexes_ht_error:
        lttng_ht_destroy(relay_connections_ht);
 relay_connections_ht_error:
index 920948760264405f16941f0069617513e9d27d6f..808cae236ded5d142a6c5b962b27e760c41eac2a 100644 (file)
@@ -25,6 +25,7 @@
 
 #include <common/common.h>
 #include <common/index/index.h>
 
 #include <common/common.h>
 #include <common/index/index.h>
+#include <common/kernel-consumer/kernel-consumer.h>
 #include <common/relayd/relayd.h>
 #include <common/ust-consumer/ust-consumer.h>
 
 #include <common/relayd/relayd.h>
 #include <common/ust-consumer/ust-consumer.h>
 
@@ -355,3 +356,138 @@ error:
        rcu_read_unlock();
        return ret;
 }
        rcu_read_unlock();
        return ret;
 }
+
+/*
+ * Synchronize the metadata using a given session ID. A successful acquisition
+ * of a metadata stream will trigger a request to the session daemon and a
+ * snapshot so the metadata thread can consume it.
+ *
+ * This function call is a rendez-vous point between the metadata thread and
+ * the data thread.
+ *
+ * Return 0 on success or else a negative value.
+ */
+int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
+               uint64_t session_id)
+{
+       int ret;
+       struct lttng_consumer_stream *metadata = NULL, *stream = NULL;
+       struct lttng_ht_iter iter;
+       struct lttng_ht *ht;
+
+       assert(ctx);
+
+       /* Ease our life a bit. */
+       ht = consumer_data.stream_list_ht;
+
+       rcu_read_lock();
+
+       /* Search the metadata associated with the session id of the given stream. */
+
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct,
+                       &session_id, &iter.iter, stream, node_session_id.node) {
+               if (stream->metadata_flag) {
+                       metadata = stream;
+                       break;
+               }
+       }
+       if (!metadata) {
+               ret = 0;
+               goto end_unlock_rcu;
+       }
+
+       /*
+        * In UST, since we have to write the metadata from the cache packet
+        * by packet, we might need to start this procedure multiple times
+        * until all the metadata from the cache has been extracted.
+        */
+       do {
+               /*
+                * Steps :
+                * - Lock the metadata stream
+                * - Check if metadata stream node was deleted before locking.
+                *   - if yes, release and return success
+                * - Check if new metadata is ready (flush + snapshot pos)
+                * - If nothing : release and return.
+                * - Lock the metadata_rdv_lock
+                * - Unlock the metadata stream
+                * - cond_wait on metadata_rdv to wait the wakeup from the
+                *   metadata thread
+                * - Unlock the metadata_rdv_lock
+                */
+               pthread_mutex_lock(&metadata->lock);
+
+               /*
+                * There is a possibility that we were able to acquire a reference on the
+                * stream from the RCU hash table but between then and now, the node might
+                * have been deleted just before the lock is acquired. Thus, after locking,
+                * we make sure the metadata node has not been deleted which means that the
+                * buffers are closed.
+                *
+                * In that case, there is no need to sync the metadata hence returning a
+                * success return code.
+                */
+               ret = cds_lfht_is_node_deleted(&metadata->node.node);
+               if (ret) {
+                       ret = 0;
+                       goto end_unlock_mutex;
+               }
+
+               switch (ctx->type) {
+               case LTTNG_CONSUMER_KERNEL:
+                       /*
+                        * Empty the metadata cache and flush the current stream.
+                        */
+                       ret = lttng_kconsumer_sync_metadata(metadata);
+                       break;
+               case LTTNG_CONSUMER32_UST:
+               case LTTNG_CONSUMER64_UST:
+                       /*
+                        * Ask the sessiond if we have new metadata waiting and update the
+                        * consumer metadata cache.
+                        */
+                       ret = lttng_ustconsumer_sync_metadata(ctx, metadata);
+                       break;
+               default:
+                       assert(0);
+                       ret = -1;
+                       break;
+               }
+               /*
+                * Error or no new metadata, we exit here.
+                */
+               if (ret <= 0 || ret == ENODATA) {
+                       goto end_unlock_mutex;
+               }
+
+               /*
+                * At this point, new metadata have been flushed, so we wait on the
+                * rendez-vous point for the metadata thread to wake us up when it
+                * finishes consuming the metadata and continue execution.
+                */
+
+               pthread_mutex_lock(&metadata->metadata_rdv_lock);
+
+               /*
+                * Release metadata stream lock so the metadata thread can process it.
+                */
+               pthread_mutex_unlock(&metadata->lock);
+
+               /*
+                * Wait on the rendez-vous point. Once woken up, it means the metadata was
+                * consumed and thus synchronization is achieved.
+                */
+               pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock);
+               pthread_mutex_unlock(&metadata->metadata_rdv_lock);
+       } while (ret == EAGAIN);
+
+       ret = 0;
+       goto end_unlock_rcu;
+
+end_unlock_mutex:
+       pthread_mutex_unlock(&metadata->lock);
+end_unlock_rcu:
+       rcu_read_unlock();
+       return ret;
+}
index 956bb6328692215984f5081102f48ae5f6b3de27..79efa721e80388a934343254e56a8ac3a602dd8b 100644 (file)
@@ -74,4 +74,7 @@ void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream);
 int consumer_stream_write_index(struct lttng_consumer_stream *stream,
                struct lttng_packet_index *index);
 
 int consumer_stream_write_index(struct lttng_consumer_stream *stream,
                struct lttng_packet_index *index);
 
+int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
+               uint64_t session_id);
+
 #endif /* LTTNG_CONSUMER_STREAM_H */
 #endif /* LTTNG_CONSUMER_STREAM_H */
index e2be05e7e731b8355235e3d040365545c5828bbc..0f5d4ba9655d6985a656b069fcb83843d310f3a9 100644 (file)
@@ -100,7 +100,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
                 * they are held while consumer_timer_switch_stop() is
                 * called.
                 */
                 * they are held while consumer_timer_switch_stop() is
                 * called.
                 */
-               ret = lttng_ustconsumer_request_metadata(ctx, channel, 1);
+               ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
                if (ret < 0) {
                        channel->switch_timer_error = 1;
                }
                if (ret < 0) {
                        channel->switch_timer_error = 1;
                }
@@ -186,6 +186,11 @@ static int check_ust_stream(struct lttng_consumer_stream *stream)
         * safely send the empty index.
         */
        pthread_mutex_lock(&stream->lock);
         * safely send the empty index.
         */
        pthread_mutex_lock(&stream->lock);
+       ret = cds_lfht_is_node_deleted(&stream->node.node);
+       if (ret) {
+               goto error_unlock;
+       }
+
        ret = ustctl_get_current_timestamp(stream->ustream, &ts);
        if (ret < 0) {
                ERR("Failed to get the current timestamp");
        ret = ustctl_get_current_timestamp(stream->ustream, &ts);
        if (ret < 0) {
                ERR("Failed to get the current timestamp");
@@ -194,7 +199,7 @@ static int check_ust_stream(struct lttng_consumer_stream *stream)
        ustctl_flush_buffer(stream->ustream, 1);
        ret = ustctl_snapshot(stream->ustream);
        if (ret < 0) {
        ustctl_flush_buffer(stream->ustream, 1);
        ret = ustctl_snapshot(stream->ustream);
        if (ret < 0) {
-               if (errno != EAGAIN) {
+               if (ret != -EAGAIN) {
                        ERR("Taking UST snapshot");
                        ret = -1;
                        goto error_unlock;
                        ERR("Taking UST snapshot");
                        ret = -1;
                        goto error_unlock;
index b8695698da7301b69b114fe373680b248294b486..2420d110b3e8f1449a01a3d41ca941e1099bf4d1 100644 (file)
@@ -520,6 +520,9 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                stream->metadata_flag = 1;
                /* Metadata is flat out. */
                strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
                stream->metadata_flag = 1;
                /* Metadata is flat out. */
                strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
+               /* Live rendez-vous point. */
+               pthread_cond_init(&stream->metadata_rdv, NULL);
+               pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
        } else {
                /* Format stream name to <channel_name>_<cpu_number> */
                ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
        } else {
                /* Format stream name to <channel_name>_<cpu_number> */
                ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
@@ -3062,6 +3065,9 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
        ssize_t ret;
 
        pthread_mutex_lock(&stream->lock);
        ssize_t ret;
 
        pthread_mutex_lock(&stream->lock);
+       if (stream->metadata_flag) {
+               pthread_mutex_lock(&stream->metadata_rdv_lock);
+       }
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -3078,6 +3084,10 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                break;
        }
 
                break;
        }
 
+       if (stream->metadata_flag) {
+               pthread_cond_broadcast(&stream->metadata_rdv);
+               pthread_mutex_unlock(&stream->metadata_rdv_lock);
+       }
        pthread_mutex_unlock(&stream->lock);
        return ret;
 }
        pthread_mutex_unlock(&stream->lock);
        return ret;
 }
index 2bf572303c94d7ce63b839eb820f00d04116de9f..aef7f560e4dcae78e62343eae1492de6ea85434b 100644 (file)
@@ -334,6 +334,12 @@ struct lttng_consumer_stream {
         * FD of the index file for this stream.
         */
        int index_fd;
         * FD of the index file for this stream.
         */
        int index_fd;
+
+       /*
+        * Rendez-vous point between data and metadata stream in live mode.
+        */
+       pthread_cond_t metadata_rdv;
+       pthread_mutex_t metadata_rdv_lock;
 };
 
 /*
 };
 
 /*
index 4618ccedcd750d18d651c4ace97477dd620f9843..d02e8502d99914c0acf5652d9835bd71f20b540e 100644 (file)
@@ -504,7 +504,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                } else {
                        ret = consumer_add_channel(new_channel, ctx);
                }
                } else {
                        ret = consumer_add_channel(new_channel, ctx);
                }
-               consumer_timer_live_start(new_channel, msg.u.channel.live_timer_interval);
+               if (CONSUMER_CHANNEL_TYPE_DATA) {
+                       consumer_timer_live_start(new_channel,
+                                       msg.u.channel.live_timer_interval);
+               }
 
                /* If we received an error in add_channel, we need to report it. */
                if (ret < 0) {
 
                /* If we received an error in add_channel, we need to report it. */
                if (ret < 0) {
@@ -902,6 +905,42 @@ static int get_index_values(struct lttng_packet_index *index, int infd)
 error:
        return ret;
 }
 error:
        return ret;
 }
+/*
+ * Sync metadata meaning request them to the session daemon and snapshot to the
+ * metadata thread can consumer them.
+ *
+ * Metadata stream lock MUST be acquired.
+ *
+ * Return 0 if new metadatda is available, EAGAIN if the metadata stream
+ * is empty or a negative value on error.
+ */
+int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata)
+{
+       int ret;
+
+       assert(metadata);
+
+       ret = kernctl_buffer_flush(metadata->wait_fd);
+       if (ret < 0) {
+               ERR("Failed to flush kernel stream");
+               goto end;
+       }
+
+       ret = kernctl_snapshot(metadata->wait_fd);
+       if (ret < 0) {
+               if (errno != EAGAIN) {
+                       ERR("Sync metadata, taking kernel snapshot failed.");
+                       goto end;
+               }
+               DBG("Sync metadata, no new kernel metadata");
+               /* No new metadata, exit. */
+               ret = ENODATA;
+               goto end;
+       }
+
+end:
+       return ret;
+}
 
 /*
  * Consume data on a file descriptor and write it on a trace file.
 
 /*
  * Consume data on a file descriptor and write it on a trace file.
@@ -1032,6 +1071,16 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                goto end;
        }
 
                goto end;
        }
 
+       if (stream->chan->live_timer_interval && !stream->metadata_flag) {
+               /*
+                * In live, block until all the metadata is sent.
+                */
+               err = consumer_stream_sync_metadata(ctx, stream->session_id);
+               if (err < 0) {
+                       goto end;
+               }
+       }
+
        err = consumer_stream_write_index(stream, &index);
        if (err < 0) {
                goto end;
        err = consumer_stream_write_index(stream, &index);
        if (err < 0) {
                goto end;
index d8f74ca1f429161c4ff9a3e78a2fabf3e7e002cb..1aad2733b28aa8def996b0a315e952eb29b43879 100644 (file)
@@ -32,5 +32,6 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx);
 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream);
 int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream);
                struct lttng_consumer_local_data *ctx);
 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream);
 int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream);
+int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata);
 
 #endif /* _LTTNG_KCONSUMER_H */
 
 #endif /* _LTTNG_KCONSUMER_H */
index 192217b4e0eea6557e02e25d4df648bc12c81987..2d3d89c7c66e4349aa9e5141a5229ade6ba32451 100644 (file)
@@ -800,7 +800,7 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id,
         * Ask the sessiond if we have new metadata waiting and update the
         * consumer metadata cache.
         */
         * Ask the sessiond if we have new metadata waiting and update the
         * consumer metadata cache.
         */
-       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0);
+       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
        if (ret < 0) {
                goto error;
        }
        if (ret < 0) {
                goto error;
        }
@@ -835,18 +835,13 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id,
                metadata_stream->tracefile_size_current = 0;
        }
 
                metadata_stream->tracefile_size_current = 0;
        }
 
-       pthread_mutex_lock(&metadata_channel->metadata_cache->lock);
-
        do {
                ret = lttng_consumer_read_subbuffer(metadata_stream, ctx);
                if (ret < 0) {
        do {
                ret = lttng_consumer_read_subbuffer(metadata_stream, ctx);
                if (ret < 0) {
-                       goto error_unlock;
+                       goto error_stream;
                }
        } while (ret > 0);
 
                }
        } while (ret > 0);
 
-error_unlock:
-       pthread_mutex_unlock(&metadata_channel->metadata_cache->lock);
-
 error_stream:
        /*
         * Clean up the stream completly because the next snapshot will use a new
 error_stream:
        /*
         * Clean up the stream completly because the next snapshot will use a new
@@ -1024,7 +1019,7 @@ error:
  */
 int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                uint64_t len, struct lttng_consumer_channel *channel,
  */
 int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                uint64_t len, struct lttng_consumer_channel *channel,
-               int timer)
+               int timer, int wait)
 {
        int ret, ret_code = LTTNG_OK;
        char *metadata_str;
 {
        int ret, ret_code = LTTNG_OK;
        char *metadata_str;
@@ -1061,6 +1056,9 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
        }
        pthread_mutex_unlock(&channel->metadata_cache->lock);
 
        }
        pthread_mutex_unlock(&channel->metadata_cache->lock);
 
+       if (!wait) {
+               goto end_free;
+       }
        while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
                DBG("Waiting for metadata to be flushed");
                usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
        while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
                DBG("Waiting for metadata to be flushed");
                usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
@@ -1253,10 +1251,11 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                        consumer_timer_switch_start(channel, attr.switch_timer_interval);
                        attr.switch_timer_interval = 0;
                        }
                        consumer_timer_switch_start(channel, attr.switch_timer_interval);
                        attr.switch_timer_interval = 0;
+               } else {
+                       consumer_timer_live_start(channel,
+                                       msg.u.ask_channel.live_timer_interval);
                }
 
                }
 
-               consumer_timer_live_start(channel, msg.u.ask_channel.live_timer_interval);
-
                /*
                 * Add the channel to the internal state AFTER all streams were created
                 * and successfully sent to session daemon. This way, all streams must
                /*
                 * Add the channel to the internal state AFTER all streams were created
                 * and successfully sent to session daemon. This way, all streams must
@@ -1410,7 +1409,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
 
                ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
                }
 
                ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
-                               len, channel, 0);
+                               len, channel, 0, 1);
                if (ret < 0) {
                        /* error receiving from sessiond */
                        goto error_fatal;
                if (ret < 0) {
                        /* error receiving from sessiond */
                        goto error_fatal;
@@ -1665,7 +1664,112 @@ error:
        return ret;
 }
 
        return ret;
 }
 
+/*
+ * Write up to one packet from the metadata cache to the channel.
+ *
+ * Returns the number of bytes pushed in the cache, or a negative value
+ * on error.
+ */
+static
+int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
+{
+       ssize_t write_len;
+       int ret;
+
+       pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+       if (stream->chan->metadata_cache->contiguous
+                       == stream->ust_metadata_pushed) {
+               ret = 0;
+               goto end;
+       }
+
+       write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
+                       &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
+                       stream->chan->metadata_cache->contiguous
+                       - stream->ust_metadata_pushed);
+       assert(write_len != 0);
+       if (write_len < 0) {
+               ERR("Writing one metadata packet");
+               ret = -1;
+               goto end;
+       }
+       stream->ust_metadata_pushed += write_len;
+
+       assert(stream->chan->metadata_cache->contiguous >=
+                       stream->ust_metadata_pushed);
+       ret = write_len;
+
+end:
+       pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
+       return ret;
+}
+
 
 
+/*
+ * Sync metadata meaning request them to the session daemon and snapshot to the
+ * metadata thread can consumer them.
+ *
+ * Metadata stream lock MUST be acquired.
+ *
+ * Return 0 if new metadatda is available, EAGAIN if the metadata stream
+ * is empty or a negative value on error.
+ */
+int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *metadata)
+{
+       int ret;
+       int retry = 0;
+
+       assert(ctx);
+       assert(metadata);
+
+       /*
+        * Request metadata from the sessiond, but don't wait for the flush
+        * because we locked the metadata thread.
+        */
+       ret = lttng_ustconsumer_request_metadata(ctx, metadata->chan, 0, 0);
+       if (ret < 0) {
+               goto end;
+       }
+
+       ret = commit_one_metadata_packet(metadata);
+       if (ret <= 0) {
+               goto end;
+       } else if (ret > 0) {
+               retry = 1;
+       }
+
+       ustctl_flush_buffer(metadata->ustream, 1);
+       ret = ustctl_snapshot(metadata->ustream);
+       if (ret < 0) {
+               if (errno != EAGAIN) {
+                       ERR("Sync metadata, taking UST snapshot");
+                       goto end;
+               }
+               DBG("No new metadata when syncing them.");
+               /* No new metadata, exit. */
+               ret = ENODATA;
+               goto end;
+       }
+
+       /*
+        * After this flush, we still need to extract metadata.
+        */
+       if (retry) {
+               ret = EAGAIN;
+       }
+
+end:
+       return ret;
+}
+
+/*
+ * Read subbuffer from the given stream.
+ *
+ * Stream lock MUST be acquired.
+ *
+ * Return 0 on success else a negative value.
+ */
 int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
 int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
@@ -1708,25 +1812,10 @@ retry:
                 * already been read.
                 */
                if (stream->metadata_flag) {
                 * already been read.
                 */
                if (stream->metadata_flag) {
-                       ssize_t write_len;
-
-                       if (stream->chan->metadata_cache->contiguous
-                                       == stream->ust_metadata_pushed) {
-                               ret = 0;
+                       ret = commit_one_metadata_packet(stream);
+                       if (ret <= 0) {
                                goto end;
                        }
                                goto end;
                        }
-
-                       write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
-                                       &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
-                                       stream->chan->metadata_cache->contiguous
-                                               - stream->ust_metadata_pushed);
-                       assert(write_len != 0);
-                       if (write_len < 0) {
-                               ERR("Writing one metadata packet");
-                               ret = -1;
-                               goto end;
-                       }
-                       stream->ust_metadata_pushed += write_len;
                        ustctl_flush_buffer(stream->ustream, 1);
                        goto retry;
                }
                        ustctl_flush_buffer(stream->ustream, 1);
                        goto retry;
                }
@@ -1795,6 +1884,16 @@ retry:
                goto end;
        }
 
                goto end;
        }
 
+       if (stream->chan->live_timer_interval && !stream->metadata_flag) {
+               /*
+                * In live, block until all the metadata is sent.
+                */
+               err = consumer_stream_sync_metadata(ctx, stream->session_id);
+               if (err < 0) {
+                       goto end;
+               }
+       }
+
        assert(!stream->metadata_flag);
        err = consumer_stream_write_index(stream, &index);
        if (err < 0) {
        assert(!stream->metadata_flag);
        err = consumer_stream_write_index(stream, &index);
        if (err < 0) {
@@ -1965,7 +2064,7 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
  * introduces deadlocks.
  */
 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
  * introduces deadlocks.
  */
 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_channel *channel, int timer)
+               struct lttng_consumer_channel *channel, int timer, int wait)
 {
        struct lttcomm_metadata_request_msg request;
        struct lttcomm_consumer_msg msg;
 {
        struct lttcomm_metadata_request_msg request;
        struct lttcomm_consumer_msg msg;
@@ -2059,7 +2158,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
        }
 
        ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
        }
 
        ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
-                       key, offset, len, channel, timer);
+                       key, offset, len, channel, timer, wait);
        if (ret_code >= 0) {
                /*
                 * Only send the status msg if the sessiond is alive meaning a positive
        if (ret_code >= 0) {
                /*
                 * Only send the status msg if the sessiond is alive meaning a positive
index c10cd13a2876152ed305c94c72ec64154276a373..bf005c30d372661d43d62ad103a575e1419a0f83 100644 (file)
@@ -55,9 +55,11 @@ void lttng_ustconsumer_close_metadata(struct lttng_ht *ht);
 void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream);
 int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                uint64_t len, struct lttng_consumer_channel *channel,
 void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream);
 int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                uint64_t len, struct lttng_consumer_channel *channel,
-               int timer);
+               int timer, int wait);
 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_channel *channel, int timer);
+               struct lttng_consumer_channel *channel, int timer, int wait);
+int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *metadata);
 
 #else /* HAVE_LIBLTTNG_UST_CTL */
 
 
 #else /* HAVE_LIBLTTNG_UST_CTL */
 
@@ -176,6 +178,12 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
 {
        return -ENOSYS;
 }
 {
        return -ENOSYS;
 }
+static inline
+int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *metadata)
+{
+       return -ENOSYS;
+}
 #endif /* HAVE_LIBLTTNG_UST_CTL */
 
 #endif /* _LTTNG_USTCONSUMER_H */
 #endif /* HAVE_LIBLTTNG_UST_CTL */
 
 #endif /* _LTTNG_USTCONSUMER_H */
This page took 0.041428 seconds and 4 git commands to generate.