Add UST snapshot support
authorJulien Desfossez <jdesfossez@efficios.com>
Tue, 18 Jun 2013 19:25:53 +0000 (15:25 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Thu, 4 Jul 2013 16:21:25 +0000 (12:21 -0400)
Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
12 files changed:
src/bin/lttng-sessiond/cmd.c
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/snapshot.c
src/bin/lttng-sessiond/snapshot.h
src/bin/lttng-sessiond/ust-consumer.c
src/common/consumer-stream.c
src/common/consumer-stream.h
src/common/consumer.c
src/common/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c
src/common/ust-consumer/ust-consumer.h

index ed4d6c86f1a0b569892b0144c6f055a73983fbb1..a6a81c5dbd79989b474ffa1ee5b5d3c5edc83389 100644 (file)
@@ -25,6 +25,7 @@
 #include <common/common.h>
 #include <common/sessiond-comm/sessiond-comm.h>
 #include <common/relayd/relayd.h>
+#include <common/utils.h>
 
 #include "channel.h"
 #include "consumer.h"
@@ -2403,6 +2404,14 @@ static int record_kernel_snapshot(struct ltt_kernel_session *ksess,
        assert(output);
        assert(session);
 
+       /* Get the datetime for the snapshot output directory. */
+       ret = utils_get_current_time_str("%Y%m%d-%H%M%S", output->datetime,
+                       sizeof(output->datetime));
+       if (!ret) {
+               ret = -EINVAL;
+               goto error;
+       }
+
        if (!output->kernel_sockets_copied) {
                ret = consumer_copy_sockets(output->consumer, ksess->consumer);
                if (ret < 0) {
@@ -2439,6 +2448,14 @@ static int record_ust_snapshot(struct ltt_ust_session *usess,
        assert(output);
        assert(session);
 
+       /* Get the datetime for the snapshot output directory. */
+       ret = utils_get_current_time_str("%Y%m%d-%H%M%S", output->datetime,
+                       sizeof(output->datetime));
+       if (!ret) {
+               ret = -EINVAL;
+               goto error;
+       }
+
        if (!output->ust_sockets_copied) {
                ret = consumer_copy_sockets(output->consumer, usess->consumer);
                if (ret < 0) {
@@ -2555,7 +2572,7 @@ int cmd_snapshot_record(struct ltt_session *session,
                        rcu_read_lock();
                        cds_lfht_for_each_entry(session->snapshot.output_ht->ht,
                                        &iter.iter, sout, node.node) {
-                               ret = record_ust_snapshot(usess, tmp_sout, session, wait);
+                               ret = record_ust_snapshot(usess, sout, session, wait);
                                if (ret < 0) {
                                        rcu_read_unlock();
                                        goto error;
index 36883766955e07847503236d7f62334364033c33..d3c561c9ec6932531fc3c46a1a464835dc45e12f 100644 (file)
@@ -28,7 +28,6 @@
 #include <common/common.h>
 #include <common/defaults.h>
 #include <common/uri.h>
-#include <common/utils.h>
 
 #include "consumer.h"
 #include "health.h"
@@ -752,9 +751,11 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
 
        memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid));
 
-       strncpy(msg->u.ask_channel.pathname, pathname,
-                       sizeof(msg->u.ask_channel.pathname));
-       msg->u.ask_channel.pathname[sizeof(msg->u.ask_channel.pathname)-1] = '\0';
+       if (pathname) {
+               strncpy(msg->u.ask_channel.pathname, pathname,
+                               sizeof(msg->u.ask_channel.pathname));
+               msg->u.ask_channel.pathname[sizeof(msg->u.ask_channel.pathname)-1] = '\0';
+       }
 
        strncpy(msg->u.ask_channel.name, name, sizeof(msg->u.ask_channel.name));
        msg->u.ask_channel.name[sizeof(msg->u.ask_channel.name) - 1] = '\0';
@@ -1198,7 +1199,6 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
                const char *session_path, int wait)
 {
        int ret;
-       char datetime[16];
        struct lttcomm_consumer_msg msg;
 
        assert(socket);
@@ -1208,13 +1208,6 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
 
        DBG("Consumer snapshot channel key %" PRIu64, key);
 
-       ret = utils_get_current_time_str("%Y%m%d-%H%M%S", datetime,
-                       sizeof(datetime));
-       if (!ret) {
-               ret = -EINVAL;
-               goto error;
-       }
-
        memset(&msg, 0, sizeof(msg));
        msg.cmd_type = LTTNG_CONSUMER_SNAPSHOT_CHANNEL;
        msg.u.snapshot_channel.key = key;
@@ -1226,7 +1219,7 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
                msg.u.snapshot_channel.use_relayd = 1;
                ret = snprintf(msg.u.snapshot_channel.pathname,
                                sizeof(msg.u.snapshot_channel.pathname), "%s/%s-%s%s",
-                               output->consumer->subdir, output->name, datetime,
+                               output->consumer->subdir, output->name, output->datetime,
                                session_path);
                if (ret < 0) {
                        ret = -LTTNG_ERR_NOMEM;
@@ -1235,8 +1228,8 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
        } else {
                ret = snprintf(msg.u.snapshot_channel.pathname,
                                sizeof(msg.u.snapshot_channel.pathname), "%s/%s-%s%s",
-                               output->consumer->dst.trace_path, output->name, datetime,
-                               session_path);
+                               output->consumer->dst.trace_path, output->name,
+                               output->datetime, session_path);
                if (ret < 0) {
                        ret = -LTTNG_ERR_NOMEM;
                        goto error;
index 5265cf5575d692f94c4f7cde7d1c3d6ea59ea40a..b14a398c343ba58d2cab3a50b726f2d33475b515 100644 (file)
@@ -214,11 +214,6 @@ error:
        return output;
 }
 
-struct snapshot *snapshot_alloc(void)
-{
-       return zmalloc(sizeof(struct snapshot));
-}
-
 /*
  * Initialized a snapshot object that was already allocated.
  *
index 91215272ac83f3316348c0c10ae098c6b26b5fbf..bf594b5fe3f79d7cb3bd99cdd67afec2b483a9a5 100644 (file)
@@ -36,6 +36,12 @@ struct snapshot_output {
        struct consumer_output *consumer;
        int kernel_sockets_copied;
        int ust_sockets_copied;
+       /*
+        * Contains the string with "<date>-<time>" for when the snapshot command
+        * is triggered. This is to make sure every streams will use the same time
+        * for the directory output.
+        */
+       char datetime[16];
 
        /* Indexed by ID. */
        struct lttng_ht_node_ulong node;
index d85e32aa0ce9f2a011887fd4b9a1c70e541ccca0..9f3557cd0d02efa77f0e41be54bcc06d670b210e 100644 (file)
@@ -118,10 +118,12 @@ static int ask_channel_creation(struct ust_app_session *ua_sess,
        DBG2("Asking UST consumer for channel");
 
        /* Get and create full trace path of session. */
-       pathname = setup_trace_path(consumer, ua_sess);
-       if (!pathname) {
-               ret = -1;
-               goto error;
+       if (ua_sess->output_traces) {
+               pathname = setup_trace_path(consumer, ua_sess);
+               if (!pathname) {
+                       ret = -1;
+                       goto error;
+               }
        }
 
        /* Depending on the buffer type, a different channel key is used. */
@@ -176,7 +178,9 @@ static int ask_channel_creation(struct ust_app_session *ua_sess,
        /* Communication protocol error. */
        assert(key == ua_chan->key);
        /* We need at least one where 1 stream for 1 cpu. */
-       assert(ua_chan->expected_stream_count > 0);
+       if (ua_sess->output_traces) {
+               assert(ua_chan->expected_stream_count > 0);
+       }
 
        DBG2("UST ask channel %" PRIu64 " successfully done with %u stream(s)", key,
                        ua_chan->expected_stream_count);
index b5ab6c6bee2dc38ef615ff84f87129354abf86a2..027d751508cf69b76411beae1626d3c0fd847aac 100644 (file)
@@ -19,6 +19,7 @@
 
 #define _GNU_SOURCE
 #include <assert.h>
+#include <inttypes.h>
 #include <sys/mman.h>
 #include <unistd.h>
 
@@ -80,6 +81,7 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
                        uatomic_read(&relayd->destroy_flag)) {
                consumer_destroy_relayd(relayd);
        }
+       stream->net_seq_idx = (uint64_t) -1ULL;
 }
 
 /*
@@ -110,11 +112,11 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)
                        if (ret) {
                                PERROR("close");
                        }
+                       stream->wait_fd = -1;
                }
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               lttng_ustconsumer_del_stream(stream);
                break;
        default:
                ERR("Unknown consumer_data type");
@@ -127,6 +129,7 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)
                if (ret) {
                        PERROR("close");
                }
+               stream->out_fd = -1;
        }
 
        /* Check and cleanup relayd if needed. */
@@ -151,6 +154,8 @@ void consumer_stream_delete(struct lttng_consumer_stream *stream,
        struct lttng_ht_iter iter;
 
        assert(stream);
+       /* Should NEVER be called not in monitor mode. */
+       assert(stream->chan->monitor);
 
        rcu_read_lock();
 
@@ -177,15 +182,9 @@ void consumer_stream_delete(struct lttng_consumer_stream *stream,
 
        rcu_read_unlock();
 
-       /*
-        * For a *non* monitored stream, we MUST NOT decrement or else the data
-        * thread will use the wrong value or stream for its local stream set.
-        */
-       if (stream->chan->monitor) {
-               /* Decrement the stream count of the global consumer data. */
-               assert(consumer_data.stream_count > 0);
-               consumer_data.stream_count--;
-       }
+       /* Decrement the stream count of the global consumer data. */
+       assert(consumer_data.stream_count > 0);
+       consumer_data.stream_count--;
 }
 
 /*
@@ -199,44 +198,110 @@ void consumer_stream_free(struct lttng_consumer_stream *stream)
 }
 
 /*
- * Destroy a stream completely. This will delete, close and free the stream.
- * Once return, the stream is NO longer usable. Its channel may get destroyed
- * if conditions are met.
- *
- * This MUST be called WITHOUT the consumer data and stream lock acquired.
+ * Destroy the stream's buffers of the tracer.
  */
-void consumer_stream_destroy(struct lttng_consumer_stream *stream,
-               struct lttng_ht *ht)
+void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream)
 {
-       struct lttng_consumer_channel *free_chan = NULL;
+       assert(stream);
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               lttng_ustconsumer_del_stream(stream);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+       }
+}
 
+/*
+ * Destroy a stream in no monitor mode.
+ *
+ * We need a separate function because this can be called inside a destroy
+ * channel path which have the consumer data lock acquired. Also, in no monitor
+ * mode, the channel refcount is NOT incremented per stream since the ownership
+ * of those streams are INSIDE the channel making the lazy destroy channel not
+ * possible for a non monitor stream.
+ *
+ * Furthermore, there is no need to delete the stream from the global hash
+ * table so we avoid useless calls.
+ */
+static void destroy_no_monitor(struct lttng_consumer_stream *stream)
+{
        assert(stream);
 
-       DBG("Consumer stream destroy - wait_fd: %d", stream->wait_fd);
+       DBG("Consumer stream destroy unmonitored key: %" PRIu64, stream->key);
+
+       /* Destroy tracer buffers of the stream. */
+       consumer_stream_destroy_buffers(stream);
+       /* Close down everything including the relayd if one. */
+       consumer_stream_close(stream);
+}
 
-       pthread_mutex_lock(&consumer_data.lock);
-       pthread_mutex_lock(&stream->lock);
+/*
+ * Destroy a stream in monitor mode.
+ */
+static void destroy_monitor(struct lttng_consumer_stream *stream,
+               struct lttng_ht *ht)
+{
+       assert(stream);
+
+       DBG("Consumer stream destroy monitored key: %" PRIu64, stream->key);
 
        /* Remove every reference of the stream in the consumer. */
        consumer_stream_delete(stream, ht);
-
+       /* Destroy tracer buffers of the stream. */
+       consumer_stream_destroy_buffers(stream);
        /* Close down everything including the relayd if one. */
        consumer_stream_close(stream);
+}
 
-       /* Update refcount of channel and see if we need to destroy it. */
-       if (!uatomic_sub_return(&stream->chan->refcount, 1)
-                       && !uatomic_read(&stream->chan->nb_init_stream_left)) {
-               free_chan = stream->chan;
-       }
+/*
+ * Destroy a stream completely. This will delete, close and free the stream.
+ * Once return, the stream is NO longer usable. Its channel may get destroyed
+ * if conditions are met for a monitored stream.
+ *
+ * This MUST be called WITHOUT the consumer data and stream lock acquired if
+ * the stream is in _monitor_ mode else it does not matter.
+ */
+void consumer_stream_destroy(struct lttng_consumer_stream *stream,
+               struct lttng_ht *ht)
+{
+       assert(stream);
+
+       /* Stream is in monitor mode. */
+       if (stream->chan->monitor) {
+               struct lttng_consumer_channel *free_chan = NULL;
 
-       /* Indicates that the consumer data state MUST be updated after this. */
-       consumer_data.need_update = 1;
+               pthread_mutex_lock(&consumer_data.lock);
+               pthread_mutex_lock(&stream->lock);
 
-       pthread_mutex_unlock(&stream->lock);
-       pthread_mutex_unlock(&consumer_data.lock);
+               destroy_monitor(stream, ht);
 
-       if (free_chan) {
-               consumer_del_channel(free_chan);
+               /* Update refcount of channel and see if we need to destroy it. */
+               if (!uatomic_sub_return(&stream->chan->refcount, 1)
+                               && !uatomic_read(&stream->chan->nb_init_stream_left)) {
+                       free_chan = stream->chan;
+               }
+
+               /* Indicates that the consumer data state MUST be updated after this. */
+               consumer_data.need_update = 1;
+
+               pthread_mutex_unlock(&stream->lock);
+               pthread_mutex_unlock(&consumer_data.lock);
+
+               if (free_chan) {
+                       consumer_del_channel(free_chan);
+               }
+       } else {
+               /*
+                * No monitor mode the stream's ownership is in its channel thus we
+                * don't have to handle the channel refcount nor the lazy deletion.
+                */
+               destroy_no_monitor(stream);
        }
 
        /* Free stream within a RCU call. */
index 3036b2c77e6512fd61b4128e131b21e9d24fabd3..3096e1e7801d535084bfb3fd7ef076d0d11237a9 100644 (file)
@@ -62,4 +62,10 @@ void consumer_stream_free(struct lttng_consumer_stream *stream);
 void consumer_stream_destroy(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht);
 
+/*
+ * Destroy the stream's buffers on the tracer side. This is also called in a
+ * stream destroy.
+ */
+void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream);
+
 #endif /* LTTNG_CONSUMER_STREAM_H */
index 1f96fa9d460c5a97eac61a9c79acadbf239a7e91..0005b1039cb27d6ced5dfac3537e72bbf737fce0 100644 (file)
@@ -301,8 +301,11 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
                                send_node) {
                        cds_list_del(&stream->send_node);
-                       lttng_ustconsumer_del_stream(stream);
-                       free(stream);
+                       /*
+                        * Once a stream is added to this list, the buffers were created so
+                        * we have a guarantee that this call will succeed.
+                        */
+                       consumer_stream_destroy(stream, NULL);
                }
                lttng_ustconsumer_del_channel(channel);
                break;
@@ -312,25 +315,6 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                goto end;
        }
 
-       /* Empty no monitor streams list. */
-       if (!channel->monitor) {
-               struct lttng_consumer_stream *stream, *stmp;
-
-               /*
-                * So, these streams are not visible to any data thread. This is why we
-                * close and free them because they were never added to any data
-                * structure apart from this one.
-                */
-               cds_list_for_each_entry_safe(stream, stmp,
-                               &channel->stream_no_monitor_list.head, no_monitor_node) {
-                       cds_list_del(&stream->no_monitor_node);
-                       /* Close everything in that stream. */
-                       consumer_stream_close(stream);
-                       /* Free the ressource. */
-                       consumer_stream_free(stream);
-               }
-       }
-
        rcu_read_lock();
        iter.iter.node = &channel->node.node;
        ret = lttng_ht_del(consumer_data.channel_ht, &iter);
@@ -693,6 +677,66 @@ error:
        return relayd;
 }
 
+/*
+ * Find a relayd and send the stream
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
+               char *path)
+{
+       int ret = 0;
+       struct consumer_relayd_sock_pair *relayd;
+
+       assert(stream);
+       assert(stream->net_seq_idx != -1ULL);
+       assert(path);
+
+       /* The stream is not metadata. Get relayd reference if exists. */
+       rcu_read_lock();
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (relayd != NULL) {
+               /* Add stream on the relayd */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_add_stream(&relayd->control_sock, stream->name,
+                               path, &stream->relayd_stream_id,
+                               stream->chan->tracefile_size, stream->chan->tracefile_count);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               if (ret < 0) {
+                       goto end;
+               }
+               uatomic_inc(&relayd->refcount);
+       } else {
+               ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
+                               stream->key, stream->net_seq_idx);
+               ret = -1;
+               goto end;
+       }
+
+       DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
+                       stream->name, stream->key, stream->net_seq_idx);
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Find a relayd and close the stream
+ */
+void close_relayd_stream(struct lttng_consumer_stream *stream)
+{
+       struct consumer_relayd_sock_pair *relayd;
+
+       /* The stream is not metadata. Get relayd reference if exists. */
+       rcu_read_lock();
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (relayd) {
+               consumer_stream_relayd_close(stream, relayd);
+       }
+       rcu_read_unlock();
+}
+
 /*
  * Handle stream for relayd transmission if the stream applies for network
  * streaming where the net sequence index is set.
@@ -816,7 +860,6 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->wait_fd = -1;
 
        CDS_INIT_LIST_HEAD(&channel->streams.head);
-       CDS_INIT_LIST_HEAD(&channel->stream_no_monitor_list.head);
 
        DBG("Allocated channel (key %" PRIu64 ")", channel->key)
 
@@ -856,7 +899,7 @@ end:
        pthread_mutex_unlock(&consumer_data.lock);
 
        if (!ret && channel->wait_fd != -1 &&
-                       channel->metadata_stream == NULL) {
+                       channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
                notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
        }
        return ret;
@@ -2744,7 +2787,6 @@ restart:
                                lttng_poll_del(&events, chan->wait_fd);
                                ret = lttng_ht_del(channel_ht, &iter);
                                assert(ret == 0);
-                               assert(cds_list_empty(&chan->streams.head));
                                consumer_close_channel_streams(chan);
 
                                /* Release our own refcount */
index fa503ea111836c18335eb7863123275daa2a6edc..4cf6e910d5ef44ad2ba1967b897458632aca9e10 100644 (file)
@@ -140,12 +140,6 @@ struct lttng_consumer_channel {
         */
        struct stream_list streams;
 
-       /*
-        * List of streams in no monitor mode for this channel. Used ONLY for
-        * snapshots recording.
-        */
-       struct stream_list stream_no_monitor_list;
-
        /*
         * Set if the channel is metadata. We keep a reference to the stream
         * because we have to flush data once pushed by the session daemon. For a
@@ -265,9 +259,6 @@ struct lttng_consumer_stream {
        /* On-disk circular buffer */
        uint64_t tracefile_size_current;
        uint64_t tracefile_count_current;
-
-       /* Node for the no monitor stream list in a channel. */
-       struct cds_list_head no_monitor_node;
 };
 
 /*
@@ -512,6 +503,8 @@ void consumer_del_channel(struct lttng_consumer_channel *channel);
 struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
                uint64_t net_seq_idx);
 struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key);
+int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path);
+void close_relayd_stream(struct lttng_consumer_stream *stream);
 struct lttng_consumer_channel *consumer_find_channel(uint64_t key);
 int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
                size_t data_size);
index 02198bd1b9e854f26a8614c5f76fbf76a36835f1..a6306291af17f153a35c56aa300a4bd51c3ed03b 100644 (file)
@@ -104,72 +104,6 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
        return ret;
 }
 
-/*
- * Find a relayd and send the stream
- *
- * Returns 0 on success, < 0 on error
- */
-static int send_relayd_stream(struct lttng_consumer_stream *stream,
-               char *path)
-{
-       int ret = 0;
-       const char *stream_path;
-       struct consumer_relayd_sock_pair *relayd;
-
-       assert(stream);
-       assert(stream->net_seq_idx != -1ULL);
-
-       if (path != NULL) {
-               stream_path = path;
-       } else {
-               stream_path = stream->chan->pathname;
-       }
-
-       /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               /* Add stream on the relayd */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               ret = relayd_add_stream(&relayd->control_sock, stream->name,
-                               stream_path, &stream->relayd_stream_id,
-                               stream->chan->tracefile_size, stream->chan->tracefile_count);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               if (ret < 0) {
-                       goto end;
-               }
-               uatomic_inc(&relayd->refcount);
-       } else {
-               ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
-                               stream->key, stream->net_seq_idx);
-               ret = -1;
-               goto end;
-       }
-
-       DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
-                       stream->name, stream->key, stream->net_seq_idx);
-
-end:
-       rcu_read_unlock();
-       return ret;
-}
-
-/*
- * Find a relayd and close the stream
- */
-static void close_relayd_stream(struct lttng_consumer_stream *stream)
-{
-       struct consumer_relayd_sock_pair *relayd;
-
-       /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd) {
-               consumer_stream_relayd_close(stream, relayd);
-       }
-       rcu_read_unlock();
-}
-
 /*
  * Take a snapshot of all the stream of a channel
  *
@@ -201,8 +135,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                goto end;
        }
 
-       cds_list_for_each_entry(stream, &channel->stream_no_monitor_list.head,
-                       no_monitor_node) {
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
                /*
                 * Lock stream because we are about to change its state.
                 */
@@ -215,14 +148,15 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                stream->net_seq_idx = relayd_id;
                channel->relayd_id = relayd_id;
                if (relayd_id != (uint64_t) -1ULL) {
-                       ret = send_relayd_stream(stream, path);
+                       ret = consumer_send_relayd_stream(stream, path);
                        if (ret < 0) {
                                ERR("sending stream to relayd");
                                goto end_unlock;
                        }
                } else {
                        ret = utils_create_stream_file(path, stream->name,
-                                       stream->chan->tracefile_size, stream->tracefile_count_current,
+                                       stream->chan->tracefile_size,
+                                       stream->tracefile_count_current,
                                        stream->uid, stream->gid);
                        if (ret < 0) {
                                ERR("utils_create_stream_file");
@@ -238,7 +172,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
 
                ret = kernctl_buffer_flush(stream->wait_fd);
                if (ret < 0) {
-                       ERR("Failed to flush kernel metadata stream");
+                       ERR("Failed to flush kernel stream");
                        goto end_unlock;
                }
 
@@ -391,7 +325,7 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
        }
 
        if (use_relayd) {
-               ret = send_relayd_stream(metadata_stream, path);
+               ret = consumer_send_relayd_stream(metadata_stream, path);
                if (ret < 0) {
                        goto error;
                }
@@ -685,12 +619,11 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        DBG("Kernel consumer add stream %s in no monitor mode with "
                                        "relayd id %" PRIu64, new_stream->name,
                                        new_stream->net_seq_idx);
-                       cds_list_add(&new_stream->no_monitor_node,
-                                       &channel->stream_no_monitor_list.head);
+                       cds_list_add(&new_stream->send_node, &channel->streams.head);
                        break;
                }
 
-               ret = send_relayd_stream(new_stream, NULL);
+               ret = consumer_send_relayd_stream(new_stream, NULL);
                if (ret < 0) {
                        consumer_stream_free(new_stream);
                        goto end_nosignal;
index 18eb507dec003a08381f6c6a17f6da9997769850..f318af153ca3aca1dcdd1573590d7447de61490c 100644 (file)
@@ -37,6 +37,7 @@
 #include <common/relayd/relayd.h>
 #include <common/compat/fcntl.h>
 #include <common/consumer-metadata-cache.h>
+#include <common/consumer-stream.h>
 #include <common/consumer-timer.h>
 #include <common/utils.h>
 
@@ -284,7 +285,9 @@ static int create_ust_streams(struct lttng_consumer_channel *channel,
                 * Increment channel refcount since the channel reference has now been
                 * assigned in the allocation process above.
                 */
-               uatomic_inc(&stream->chan->refcount);
+               if (stream->chan->monitor) {
+                       uatomic_inc(&stream->chan->refcount);
+               }
 
                /*
                 * Order is important this is why a list is used. On error, the caller
@@ -503,18 +506,27 @@ static int ask_channel(struct lttng_consumer_local_data *ctx, int sock,
        /* The reply msg status is handled in the following call. */
        ret = create_ust_channel(attr, &channel->uchan);
        if (ret < 0) {
-               goto error;
+               goto end;
        }
 
        channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan);
 
+       /*
+        * For the snapshots (no monitor), we create the metadata streams
+        * on demand, not during the channel creation.
+        */
+       if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && !channel->monitor) {
+               ret = 0;
+               goto end;
+       }
+
        /* Open all streams for this channel. */
        ret = create_ust_streams(channel, ctx);
        if (ret < 0) {
-               goto error;
+               goto end;
        }
 
-error:
+end:
        return ret;
 }
 
@@ -694,7 +706,17 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
        if (!metadata) {
                ERR("UST consumer push metadata %" PRIu64 " not found", key);
                ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
-               goto error_find;
+               goto end;
+       }
+
+       /*
+        * In no monitor mode, the metadata channel has no stream(s) so skip the
+        * ownership transfer to the metadata thread.
+        */
+       if (!metadata->monitor) {
+               DBG("Metadata channel in no monitor");
+               ret = 0;
+               goto end;
        }
 
        /*
@@ -726,7 +748,8 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
        /* List MUST be empty after or else it could be reused. */
        assert(cds_list_empty(&metadata->streams.head));
 
-       return 0;
+       ret = 0;
+       goto end;
 
 error:
        /*
@@ -736,7 +759,268 @@ error:
         * will make sure to clean that list.
         */
        consumer_del_channel(metadata);
-error_find:
+end:
+       return ret;
+}
+
+/*
+ * Snapshot the whole metadata.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret = 0;
+       ssize_t write_len;
+       uint64_t total_len = 0;
+       struct lttng_consumer_channel *metadata_channel;
+       struct lttng_consumer_stream *metadata_stream;
+
+       assert(path);
+       assert(ctx);
+
+       DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s",
+                       key, path);
+
+       rcu_read_lock();
+
+       metadata_channel = consumer_find_channel(key);
+       if (!metadata_channel) {
+               ERR("UST snapshot metadata channel not found for key %lu", key);
+               ret = -1;
+               goto error;
+       }
+       assert(!metadata_channel->monitor);
+
+       /*
+        * Ask the sessiond if we have new metadata waiting and update the
+        * consumer metadata cache.
+        */
+       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel);
+       if (ret < 0) {
+               goto error;
+       }
+
+       /*
+        * The metadata stream is NOT created in no monitor mode when the channel
+        * is created on a sessiond ask channel command.
+        */
+       ret = create_ust_streams(metadata_channel, ctx);
+       if (ret < 0) {
+               goto error;
+       }
+
+       metadata_stream = metadata_channel->metadata_stream;
+       assert(metadata_stream);
+
+       if (relayd_id != (uint64_t) -1ULL) {
+               metadata_stream->net_seq_idx = relayd_id;
+               ret = consumer_send_relayd_stream(metadata_stream, path);
+               if (ret < 0) {
+                       goto error_stream;
+               }
+       } else {
+               ret = utils_create_stream_file(path, metadata_stream->name,
+                               metadata_stream->chan->tracefile_size,
+                               metadata_stream->tracefile_count_current,
+                               metadata_stream->uid, metadata_stream->gid);
+               if (ret < 0) {
+                       goto error_stream;
+               }
+               metadata_stream->out_fd = ret;
+               metadata_stream->tracefile_size_current = 0;
+       }
+
+       pthread_mutex_lock(&metadata_channel->metadata_cache->lock);
+       while (total_len < metadata_channel->metadata_cache->total_bytes_written) {
+               /*
+                * Write at most one packet of metadata into the channel
+                * to avoid blocking here.
+                */
+               write_len = ustctl_write_one_packet_to_channel(metadata_channel->uchan,
+                               metadata_channel->metadata_cache->data,
+                               metadata_channel->metadata_cache->total_bytes_written);
+               if (write_len < 0) {
+                       ERR("UST consumer snapshot writing metadata packet");
+                       ret = -1;
+                       goto error_unlock;
+               }
+               total_len += write_len;
+
+               DBG("Written %" PRIu64 " bytes to metadata (left: %" PRIu64 ")",
+                               write_len,
+                               metadata_channel->metadata_cache->total_bytes_written - write_len);
+               ustctl_flush_buffer(metadata_stream->ustream, 1);
+               ret = lttng_consumer_read_subbuffer(metadata_stream, ctx);
+               if (ret < 0) {
+                       goto error_unlock;
+               }
+       }
+
+error_unlock:
+       pthread_mutex_unlock(&metadata_channel->metadata_cache->lock);
+
+error_stream:
+       /*
+        * Clean up the stream completly because the next snapshot will use a new
+        * metadata stream.
+        */
+       cds_list_del(&metadata_stream->send_node);
+       consumer_stream_destroy(metadata_stream, NULL);
+       metadata_channel->metadata_stream = NULL;
+
+error:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Take a snapshot of all the stream of a channel.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret;
+       unsigned use_relayd = 0;
+       unsigned long consumed_pos, produced_pos;
+       struct lttng_consumer_channel *channel;
+       struct lttng_consumer_stream *stream;
+
+       assert(path);
+       assert(ctx);
+
+       rcu_read_lock();
+
+       if (relayd_id != (uint64_t) -1ULL) {
+               use_relayd = 1;
+       }
+
+       channel = consumer_find_channel(key);
+       if (!channel) {
+               ERR("UST snapshot channel not found for key %lu", key);
+               ret = -1;
+               goto error;
+       }
+       assert(!channel->monitor);
+       DBG("UST consumer snapshot channel %lu", key);
+
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+               /* Lock stream because we are about to change its state. */
+               pthread_mutex_lock(&stream->lock);
+               stream->net_seq_idx = relayd_id;
+
+               if (use_relayd) {
+                       ret = consumer_send_relayd_stream(stream, path);
+                       if (ret < 0) {
+                               goto error_unlock;
+                       }
+               } else {
+                       ret = utils_create_stream_file(path, stream->name,
+                                       stream->chan->tracefile_size,
+                                       stream->tracefile_count_current,
+                                       stream->uid, stream->gid);
+                       if (ret < 0) {
+                               goto error_unlock;
+                       }
+                       stream->out_fd = ret;
+                       stream->tracefile_size_current = 0;
+
+                       DBG("UST consumer snapshot stream %s/%s (%" PRIu64 ")", path,
+                                       stream->name, stream->key);
+               }
+
+               ustctl_flush_buffer(stream->ustream, 1);
+
+               ret = lttng_ustconsumer_take_snapshot(stream);
+               if (ret < 0) {
+                       ERR("Taking UST snapshot");
+                       goto error_unlock;
+               }
+
+               ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
+               if (ret < 0) {
+                       ERR("Produced UST snapshot position");
+                       goto error_unlock;
+               }
+
+               ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
+               if (ret < 0) {
+                       ERR("Consumerd UST snapshot position");
+                       goto error_unlock;
+               }
+
+               while (consumed_pos < produced_pos) {
+                       ssize_t read_len;
+                       unsigned long len, padded_len;
+
+                       DBG("UST consumer taking snapshot at pos %lu", consumed_pos);
+
+                       ret = ustctl_get_subbuf(stream->ustream, &consumed_pos);
+                       if (ret < 0) {
+                               if (ret != -EAGAIN) {
+                                       PERROR("ustctl_get_subbuf snapshot");
+                                       goto error_close_stream;
+                               }
+                               DBG("UST consumer get subbuf failed. Skipping it.");
+                               consumed_pos += stream->max_sb_size;
+                               continue;
+                       }
+
+                       ret = ustctl_get_subbuf_size(stream->ustream, &len);
+                       if (ret < 0) {
+                               ERR("Snapshot ustctl_get_subbuf_size");
+                               goto error_put_subbuf;
+                       }
+
+                       ret = ustctl_get_padded_subbuf_size(stream->ustream, &padded_len);
+                       if (ret < 0) {
+                               ERR("Snapshot ustctl_get_padded_subbuf_size");
+                               goto error_put_subbuf;
+                       }
+
+                       read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
+                                       padded_len - len);
+                       if (use_relayd) {
+                               if (read_len != len) {
+                                       ret = -1;
+                                       goto error_put_subbuf;
+                               }
+                       } else {
+                               if (read_len != padded_len) {
+                                       ret = -1;
+                                       goto error_put_subbuf;
+                               }
+                       }
+
+                       ret = ustctl_put_subbuf(stream->ustream);
+                       if (ret < 0) {
+                               ERR("Snapshot ustctl_put_subbuf");
+                               goto error_close_stream;
+                       }
+                       consumed_pos += stream->max_sb_size;
+               }
+
+               /* Simply close the stream so we can use it on the next snapshot. */
+               consumer_stream_close(stream);
+               pthread_mutex_unlock(&stream->lock);
+       }
+
+       rcu_read_unlock();
+       return 0;
+
+error_put_subbuf:
+       if (ustctl_put_subbuf(stream->ustream) < 0) {
+               ERR("Snapshot ustctl_put_subbuf");
+       }
+error_close_stream:
+       consumer_stream_close(stream);
+error_unlock:
+       pthread_mutex_unlock(&stream->lock);
+error:
+       rcu_read_unlock();
        return ret;
 }
 
@@ -940,6 +1224,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
                attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
                attr.chan_id = msg.u.ask_channel.chan_id;
+               attr.output = msg.u.ask_channel.output;
                memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
 
                /* Translate and save channel type. */
@@ -1043,6 +1328,14 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto error_fatal;
                }
 
+               /*
+                * In no monitor mode, the streams ownership is kept inside the channel
+                * so don't send them to the data thread.
+                */
+               if (!channel->monitor) {
+                       goto end_msg_sessiond;
+               }
+
                ret = send_streams_to_thread(channel, ctx);
                if (ret < 0) {
                        /*
@@ -1053,7 +1346,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                /* List MUST be empty after or else it could be reused. */
                assert(cds_list_empty(&channel->streams.head));
-
                goto end_msg_sessiond;
        }
        case LTTNG_CONSUMER_DESTROY_CHANNEL:
@@ -1142,6 +1434,26 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
        {
+               if (msg.u.snapshot_channel.metadata) {
+                       ret = snapshot_metadata(msg.u.snapshot_channel.key,
+                                       msg.u.snapshot_channel.pathname,
+                                       msg.u.snapshot_channel.relayd_id,
+                                       ctx);
+                       if (ret < 0) {
+                               ERR("Snapshot metadata failed");
+                               ret_code = LTTNG_ERR_UST_META_FAIL;
+                       }
+               } else {
+                       ret = snapshot_channel(msg.u.snapshot_channel.key,
+                                       msg.u.snapshot_channel.pathname,
+                                       msg.u.snapshot_channel.relayd_id,
+                                       ctx);
+                       if (ret < 0) {
+                               ERR("Snapshot channel failed");
+                               ret_code = LTTNG_ERR_UST_CHAN_FAIL;
+                       }
+               }
+
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
@@ -1249,6 +1561,21 @@ int lttng_ustconsumer_get_produced_snapshot(
        return ustctl_snapshot_get_produced(stream->ustream, pos);
 }
 
+/*
+ * Get the consumed position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_ustconsumer_get_consumed_snapshot(
+               struct lttng_consumer_stream *stream, unsigned long *pos)
+{
+       assert(stream);
+       assert(stream->ustream);
+       assert(pos);
+
+       return ustctl_snapshot_get_consumed(stream->ustream, pos);
+}
+
 /*
  * Called when the stream signal the consumer that it has hang up.
  */
@@ -1379,8 +1706,10 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
 {
        int ret;
 
+       assert(stream);
+
        /* Don't create anything if this is set for streaming. */
-       if (stream->net_seq_idx == (uint64_t) -1ULL) {
+       if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) {
                ret = utils_create_stream_file(stream->chan->pathname, stream->name,
                                stream->chan->tracefile_size, stream->tracefile_count_current,
                                stream->uid, stream->gid);
index 9f0b22ddeffa7e775503957b253e098ea535b490..191cdabf8c2569211a665cd47e0a072ede574408 100644 (file)
@@ -30,6 +30,8 @@ int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream);
 
 int lttng_ustconsumer_get_produced_snapshot(
                struct lttng_consumer_stream *stream, unsigned long *pos);
+int lttng_ustconsumer_get_consumed_snapshot(
+               struct lttng_consumer_stream *stream, unsigned long *pos);
 
 int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll);
This page took 0.04633 seconds and 4 git commands to generate.