Move metadata creation into lttng-sessiond and lttng-consumed
authorDavid Goulet <dgoulet@efficios.com>
Tue, 26 Feb 2013 17:46:09 +0000 (12:46 -0500)
committerDavid Goulet <dgoulet@efficios.com>
Fri, 1 Mar 2013 18:05:33 +0000 (13:05 -0500)
The metadata is completely generated in the session daemon and pushed to
the consumer which writes the full string to a ustctl channel object. On
each event/channel notification from the tracer, the metadata is
generated on the spot and kept in the UST registry of the session
daemon.

On stop and destroy command, the metadata is pushed to the consumer. On
application unregistration as well, the metadata is handled.

Needs to be use in locked-step with LTTng-UST:
"Move metadata creation into lttng-sessiond and lttng-consumed"

Acked-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
21 files changed:
src/bin/lttng-sessiond/cmd.c
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/main.c
src/bin/lttng-sessiond/ust-app.c
src/bin/lttng-sessiond/ust-app.h
src/bin/lttng-sessiond/ust-consumer.c
src/bin/lttng-sessiond/ust-consumer.h
src/bin/lttng-sessiond/ust-metadata.c
src/bin/lttng-sessiond/ust-registry.h
src/bin/lttng-sessiond/ust-thread.c
src/common/consumer.c
src/common/consumer.h
src/common/hashtable/hashtable.c
src/common/hashtable/hashtable.h
src/common/hashtable/utils.c
src/common/hashtable/utils.h
src/common/kernel-consumer/kernel-consumer.c
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c
src/common/ust-consumer/ust-consumer.h

index a47c504c718736e86425bd4c58d95134ec4f8b9c..1a8fbba1eea75f30f4559274a3790f1e1ba599cc 100644 (file)
 /*
  * Used to keep a unique index for each relayd socket created where this value
  * is associated with streams on the consumer so it can match the right relayd
- * to send to.
- *
- * This value should be incremented atomically for safety purposes and future
- * possible concurrent access.
+ * to send to. It must be accessed with the relayd_net_seq_idx_lock
+ * held.
  */
-static unsigned int relayd_net_seq_idx;
+static pthread_mutex_t relayd_net_seq_idx_lock = PTHREAD_MUTEX_INITIALIZER;
+static uint64_t relayd_net_seq_idx;
 
 /*
  * Create a session path used by list_lttng_sessions for the case that the
@@ -566,15 +565,15 @@ static int send_consumer_relayd_socket(int domain, struct ltt_session *session,
        }
 
        /* Set the network sequence index if not set. */
-       if (consumer->net_seq_index == -1) {
+       if (consumer->net_seq_index == (uint64_t) -1ULL) {
+               pthread_mutex_lock(&relayd_net_seq_idx_lock);
                /*
                 * Increment net_seq_idx because we are about to transfer the
                 * new relayd socket to the consumer.
+                * Assign unique key so the consumer can match streams.
                 */
-               uatomic_inc(&relayd_net_seq_idx);
-               /* Assign unique key so the consumer can match streams */
-               uatomic_set(&consumer->net_seq_index,
-                               uatomic_read(&relayd_net_seq_idx));
+               consumer->net_seq_index = ++relayd_net_seq_idx;
+               pthread_mutex_unlock(&relayd_net_seq_idx_lock);
        }
 
        /* Send relayd socket to consumer. */
@@ -2136,10 +2135,12 @@ error:
 void cmd_init(void)
 {
        /*
-        * Set network sequence index to 1 for streams to match a relayd socket on
-        * the consumer side.
+        * Set network sequence index to 1 for streams to match a relayd
+        * socket on the consumer side.
         */
-       uatomic_set(&relayd_net_seq_idx, 1);
+       pthread_mutex_lock(&relayd_net_seq_idx_lock);
+       relayd_net_seq_idx = 1;
+       pthread_mutex_unlock(&relayd_net_seq_idx_lock);
 
        DBG("Command subsystem initialized");
 }
index ff5360aae755aa082ec6517cb6c4f10df99c9d9d..92abcf21d07257ce4cfa83ad44dbf79bd833508d 100644 (file)
@@ -23,6 +23,7 @@
 #include <sys/stat.h>
 #include <sys/types.h>
 #include <unistd.h>
+#include <inttypes.h>
 
 #include <common/common.h>
 #include <common/defaults.h>
@@ -76,7 +77,7 @@ end:
  * negative value is sent back and both parameters are untouched.
  */
 int consumer_recv_status_channel(struct consumer_socket *sock,
-               unsigned long *key, unsigned int *stream_count)
+               uint64_t *key, unsigned int *stream_count)
 {
        int ret;
        struct lttcomm_consumer_status_channel reply;
@@ -359,7 +360,7 @@ struct consumer_output *consumer_create_output(enum consumer_dst_type type)
        /* By default, consumer output is enabled */
        output->enabled = 1;
        output->type = type;
-       output->net_seq_index = -1;
+       output->net_seq_index = (uint64_t) -1ULL;
 
        output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
 
@@ -622,8 +623,8 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                const char *name,
                uid_t uid,
                gid_t gid,
-               int relayd_id,
-               unsigned long key,
+               uint64_t relayd_id,
+               uint64_t key,
                unsigned char *uuid)
 {
        assert(msg);
@@ -660,12 +661,12 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
  */
 void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                enum lttng_consumer_command cmd,
-               int channel_key,
+               uint64_t channel_key,
                uint64_t session_id,
                const char *pathname,
                uid_t uid,
                gid_t gid,
-               int relayd_id,
+               uint64_t relayd_id,
                const char *name,
                unsigned int nb_init_streams,
                enum lttng_event_output output,
@@ -700,8 +701,8 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
  */
 void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
                enum lttng_consumer_command cmd,
-               int channel_key,
-               int stream_key,
+               uint64_t channel_key,
+               uint64_t stream_key,
                int cpu)
 {
        assert(msg);
@@ -758,7 +759,7 @@ error:
  */
 int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
                struct lttcomm_sock *sock, struct consumer_output *consumer,
-               enum lttng_stream_type type, unsigned int session_id)
+               enum lttng_stream_type type, uint64_t session_id)
 {
        int ret;
        struct lttcomm_consumer_msg msg;
@@ -865,7 +866,7 @@ error:
  * This function has a different behavior with the consumer i.e. that it waits
  * for a reply from the consumer if yes or no the data is pending.
  */
-int consumer_is_data_pending(unsigned int id,
+int consumer_is_data_pending(uint64_t session_id,
                struct consumer_output *consumer)
 {
        int ret;
@@ -878,9 +879,9 @@ int consumer_is_data_pending(unsigned int id,
 
        msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
 
-       msg.u.data_pending.session_id = (uint64_t) id;
+       msg.u.data_pending.session_id = session_id;
 
-       DBG3("Consumer data pending for id %u", id);
+       DBG3("Consumer data pending for id %" PRIu64, session_id);
 
        /* Send command for each consumer */
        rcu_read_lock();
@@ -924,8 +925,8 @@ int consumer_is_data_pending(unsigned int id,
        }
        rcu_read_unlock();
 
-       DBG("Consumer data is %s pending for session id %u",
-                       ret_code == 1 ? "" : "NOT", id);
+       DBG("Consumer data is %s pending for session id %" PRIu64,
+                       ret_code == 1 ? "" : "NOT", session_id);
        return ret_code;
 
 error_unlock:
index af337baa386b61f81e587609a03d3e7ae800d1a2..3616d467cd4e76e65246ce5c2e07cbf80e75434e 100644 (file)
@@ -126,7 +126,7 @@ struct consumer_output {
         * side. It tells the consumer which streams goes to which relayd with this
         * index. The relayd sockets are index with it on the consumer side.
         */
-       int net_seq_index;
+       uint64_t net_seq_index;
 
        /*
         * Subdirectory path name used for both local and network consumer.
@@ -170,12 +170,12 @@ int consumer_send_channel(struct consumer_socket *sock,
                struct lttcomm_consumer_msg *msg);
 int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
                struct lttcomm_sock *sock, struct consumer_output *consumer,
-               enum lttng_stream_type type, unsigned int session_id);
+               enum lttng_stream_type type, uint64_t session_id);
 int consumer_send_destroy_relayd(struct consumer_socket *sock,
                struct consumer_output *consumer);
 int consumer_recv_status_reply(struct consumer_socket *sock);
 int consumer_recv_status_channel(struct consumer_socket *sock,
-               unsigned long *key, unsigned int *stream_count);
+               uint64_t *key, unsigned int *stream_count);
 void consumer_output_send_destroy_relayd(struct consumer_output *consumer);
 int consumer_create_socket(struct consumer_data *data,
                struct consumer_output *output);
@@ -195,27 +195,27 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                const char *name,
                uid_t uid,
                gid_t gid,
-               int relayd_id,
-               unsigned long key,
+               uint64_t relayd_id,
+               uint64_t key,
                unsigned char *uuid);
 void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
                enum lttng_consumer_command cmd,
-               int channel_key,
-               int stream_key,
+               uint64_t channel_key,
+               uint64_t stream_key,
                int cpu);
 void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                enum lttng_consumer_command cmd,
-               int channel_key,
+               uint64_t channel_key,
                uint64_t session_id,
                const char *pathname,
                uid_t uid,
                gid_t gid,
-               int relayd_id,
+               uint64_t relayd_id,
                const char *name,
                unsigned int nb_init_streams,
                enum lttng_event_output output,
                int type);
-int consumer_is_data_pending(unsigned int id,
+int consumer_is_data_pending(uint64_t session_id,
                struct consumer_output *consumer);
 
 #endif /* _CONSUMER_H */
index 2698fb46cb47e2d76ce1f0f22adba8155620f4a5..f7bb53ef7ead038e479964b6e11bb2259eeb1cd7 100644 (file)
@@ -1244,41 +1244,25 @@ error_testpoint:
 }
 
 /*
- * Send the application sockets (cmd and notify) to the respective threads.
- * This is called from the dispatch UST registration thread once all sockets
- * are set for the application.
+ * Send a socket to a thread This is called from the dispatch UST registration
+ * thread once all sockets are set for the application.
  *
  * On success, return 0 else a negative value being the errno message of the
  * write().
  */
-static int send_app_sockets_to_threads(struct ust_app *app)
+static int send_socket_to_thread(int fd, int sock)
 {
        int ret;
 
-       assert(app);
        /* Sockets MUST be set or else this should not have been called. */
-       assert(app->sock >= 0);
-       assert(app->notify_sock >= 0);
-       assert(apps_cmd_pipe[1] >= 0);
-       assert(apps_cmd_notify_pipe[1] >= 0);
+       assert(fd >= 0);
+       assert(sock >= 0);
 
        do {
-               ret = write(apps_cmd_pipe[1], &app->sock, sizeof(app->sock));
+               ret = write(fd, &sock, sizeof(sock));
        } while (ret < 0 && errno == EINTR);
-       if (ret < 0 || ret != sizeof(app->sock)) {
-               PERROR("write apps cmd pipe %d", apps_cmd_pipe[1]);
-               if (ret < 0) {
-                       ret = -errno;
-               }
-               goto error;
-       }
-
-       do {
-               ret = write(apps_cmd_notify_pipe[1], &app->notify_sock,
-                               sizeof(app->notify_sock));
-       } while (ret < 0 && errno == EINTR);
-       if (ret < 0 || ret != sizeof(app->notify_sock)) {
-               PERROR("write apps notify cmd pipe %d", apps_cmd_notify_pipe[1]);
+       if (ret < 0 || ret != sizeof(sock)) {
+               PERROR("write apps pipe %d", fd);
                if (ret < 0) {
                        ret = -errno;
                }
@@ -1303,7 +1287,7 @@ static void *thread_dispatch_ust_registration(void *data)
        struct {
                struct ust_app *app;
                struct cds_list_head head;
-       } *wait_node = NULL;
+       } *wait_node = NULL, *tmp_wait_node;
 
        CDS_LIST_HEAD(wait_queue);
 
@@ -1349,6 +1333,8 @@ static void *thread_dispatch_ust_registration(void *data)
                                        if (ret < 0) {
                                                PERROR("close ust sock dispatch %d", ust_cmd->sock);
                                        }
+                                       lttng_fd_put(1, LTTNG_FD_APPS);
+                                       free(wait_node);
                                        continue;
                                }
                                /*
@@ -1368,7 +1354,8 @@ static void *thread_dispatch_ust_registration(void *data)
                                 * Look for the application in the local wait queue and set the
                                 * notify socket if found.
                                 */
-                               cds_list_for_each_entry(wait_node, &wait_queue, head) {
+                               cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
+                                               &wait_queue, head) {
                                        if (wait_node->app->pid == ust_cmd->reg_msg.pid) {
                                                wait_node->app->notify_sock = ust_cmd->sock;
                                                cds_list_del(&wait_node->head);
@@ -1381,10 +1368,6 @@ static void *thread_dispatch_ust_registration(void *data)
                        }
 
                        if (app) {
-                               ret = send_app_sockets_to_threads(app);
-                               if (ret < 0) {
-                                       goto error;
-                               }
                                /*
                                 * @session_lock_list
                                 *
@@ -1394,29 +1377,52 @@ static void *thread_dispatch_ust_registration(void *data)
                                 */
                                session_lock_list();
                                rcu_read_lock();
+
                                /*
                                 * Add application to the global hash table. This needs to be
                                 * done before the update to the UST registry can locate the
                                 * application.
                                 */
                                ust_app_add(app);
-                               /*
-                                * Get app version.
-                                */
-                               ret = ust_app_version(app);
-                               if (ret) {
-                                       ERR("Unable to get app version");
+
+                               /* Set app version. This call will print an error if needed. */
+                               (void) ust_app_version(app);
+
+                               /* Send notify socket through the notify pipe. */
+                               ret = send_socket_to_thread(apps_cmd_notify_pipe[1],
+                                               app->notify_sock);
+                               if (ret < 0) {
+                                       rcu_read_unlock();
+                                       session_unlock_list();
+                                       /* No notify thread, stop the UST tracing. */
+                                       goto error;
                                }
+
                                /*
                                 * Update newly registered application with the tracing
                                 * registry info already enabled information.
                                 */
                                update_ust_app(app->sock);
-                               ret = ust_app_register_done(app->sock);
+
+                               /*
+                                * Don't care about return value. Let the manage apps threads
+                                * handle app unregistration upon socket close.
+                                */
+                               (void) ust_app_register_done(app->sock);
+
+                               /*
+                                * Even if the application socket has been closed, send the app
+                                * to the thread and unregistration will take place at that
+                                * place.
+                                */
+                               ret = send_socket_to_thread(apps_cmd_pipe[1], app->sock);
                                if (ret < 0) {
-                                       /* Remove application from the registry. */
-                                       ust_app_unregister(app->sock);
+                                       rcu_read_unlock();
+                                       session_unlock_list();
+                                       /* No apps. thread, stop the UST tracing. */
+                                       goto error;
                                }
+
                                rcu_read_unlock();
                                session_unlock_list();
                        } else {
@@ -1425,6 +1431,7 @@ static void *thread_dispatch_ust_registration(void *data)
                                if (ret < 0) {
                                        PERROR("close ust_cmd sock");
                                }
+                               lttng_fd_put(1, LTTNG_FD_APPS);
                        }
                        free(ust_cmd);
                } while (node != NULL);
@@ -1434,6 +1441,13 @@ static void *thread_dispatch_ust_registration(void *data)
        }
 
 error:
+       /* Clean up wait queue. */
+       cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
+                       &wait_queue, head) {
+               cds_list_del(&wait_node->head);
+               free(wait_node);
+       }
+
        DBG("Dispatch thread dying");
        return NULL;
 }
@@ -1561,6 +1575,7 @@ static void *thread_registration_apps(void *data)
                                                sock = -1;
                                                continue;
                                        }
+
                                        health_code_update();
                                        ret = ust_app_recv_registration(sock, &ust_cmd->reg_msg);
                                        if (ret < 0) {
index 371f1600bb16a9eebcdc96c211e1dd9a96471b3f..72887633c36ce31b662c92c9240a164921bc6973 100644 (file)
@@ -73,6 +73,10 @@ static struct consumer_socket *find_consumer_socket_by_bitness(int bits,
        }
 
        socket = consumer_find_socket(consumer_fd, consumer);
+       if (!socket) {
+               ERR("Consumer socket fd %d not found in consumer obj %p",
+                               consumer_fd, consumer);
+       }
 
 end:
        return socket;
@@ -164,6 +168,28 @@ static void add_unique_ust_app_event(struct ust_app_channel *ua_chan,
        assert(node_ptr == &event->node.node);
 }
 
+/*
+ * Close the notify socket from the given RCU head object. This MUST be called
+ * through a call_rcu().
+ */
+static void close_notify_sock_rcu(struct rcu_head *head)
+{
+       int ret;
+       struct ust_app_notify_sock_obj *obj =
+               caa_container_of(head, struct ust_app_notify_sock_obj, head);
+
+       /* Must have a valid fd here. */
+       assert(obj->fd >= 0);
+
+       ret = close(obj->fd);
+       if (ret) {
+               ERR("close notify sock %d RCU", obj->fd);
+       }
+       lttng_fd_put(LTTNG_FD_APPS, 1);
+
+       free(obj);
+}
+
 /*
  * Delete ust context safely. RCU read lock must be held before calling
  * this function.
@@ -292,6 +318,146 @@ void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan,
        free(ua_chan);
 }
 
+/*
+ * For a given application and session, push metadata to consumer. The session
+ * lock MUST be acquired here before calling this.
+ *
+ * Return 0 on success else a negative error.
+ */
+static int push_metadata(struct ust_app *app, struct ust_app_session *ua_sess)
+{
+       int ret;
+       char *metadata_str = NULL;
+       size_t len, offset;
+       struct consumer_socket *socket;
+
+       assert(app);
+       assert(ua_sess);
+
+       if (!ua_sess->consumer || !ua_sess->metadata) {
+               /* No consumer means no stream associated so just return gracefully. */
+               ret = 0;
+               goto end;
+       }
+
+       rcu_read_lock();
+
+       /* Get consumer socket to use to push the metadata.*/
+       socket = find_consumer_socket_by_bitness(app->bits_per_long,
+                       ua_sess->consumer);
+       if (!socket) {
+               ret = -1;
+               goto error_rcu_unlock;
+       }
+
+       /*
+        * TODO: Currently, we hold the socket lock around sampling of the next
+        * metadata segment to ensure we send metadata over the consumer socket in
+        * the correct order. This makes the registry lock nest inside the socket
+        * lock.
+        *
+        * Please note that this is a temporary measure: we should move this lock
+        * back into ust_consumer_push_metadata() when the consumer gets the
+        * ability to reorder the metadata it receives.
+        */
+       pthread_mutex_lock(socket->lock);
+       pthread_mutex_lock(&ua_sess->registry.lock);
+
+       offset = ua_sess->registry.metadata_len_sent;
+       len = ua_sess->registry.metadata_len - ua_sess->registry.metadata_len_sent;
+       if (len == 0) {
+               DBG3("No metadata to push for session id %d", ua_sess->id);
+               ret = 0;
+               goto error_reg_unlock;
+       }
+       assert(len > 0);
+
+       /* Allocate only what we have to send. */
+       metadata_str = zmalloc(len);
+       if (!metadata_str) {
+               PERROR("zmalloc ust app metadata string");
+               ret = -ENOMEM;
+               goto error_reg_unlock;
+       }
+       /* Copy what we haven't send out. */
+       memcpy(metadata_str, ua_sess->registry.metadata + offset, len);
+
+       pthread_mutex_unlock(&ua_sess->registry.lock);
+
+       ret = ust_consumer_push_metadata(socket, ua_sess, metadata_str, len,
+                       offset);
+       if (ret < 0) {
+               pthread_mutex_unlock(socket->lock);
+               goto error_rcu_unlock;
+       }
+
+       /* Update len sent of the registry. */
+       pthread_mutex_lock(&ua_sess->registry.lock);
+       ua_sess->registry.metadata_len_sent += len;
+       pthread_mutex_unlock(&ua_sess->registry.lock);
+       pthread_mutex_unlock(socket->lock);
+
+       rcu_read_unlock();
+       free(metadata_str);
+       return 0;
+
+error_reg_unlock:
+       pthread_mutex_unlock(&ua_sess->registry.lock);
+       pthread_mutex_unlock(socket->lock);
+error_rcu_unlock:
+       rcu_read_unlock();
+       free(metadata_str);
+end:
+       return ret;
+}
+
+/*
+ * Send to the consumer a close metadata command for the given session. Once
+ * done, the metadata channel is deleted and the session metadata pointer is
+ * nullified. The session lock MUST be acquired here unless the application is
+ * in the destroy path.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int close_metadata(struct ust_app *app, struct ust_app_session *ua_sess)
+{
+       int ret;
+       struct consumer_socket *socket;
+
+       assert(app);
+       assert(ua_sess);
+
+       /* Ignore if no metadata. Valid since it can be called on unregister. */
+       if (!ua_sess->metadata) {
+               ret = 0;
+               goto error;
+       }
+
+       rcu_read_lock();
+
+       /* Get consumer socket to use to push the metadata.*/
+       socket = find_consumer_socket_by_bitness(app->bits_per_long,
+                       ua_sess->consumer);
+       if (!socket) {
+               ret = -1;
+               goto error_rcu_unlock;
+       }
+
+       ret = ust_consumer_close_metadata(socket, ua_sess->metadata);
+       if (ret < 0) {
+               goto error_rcu_unlock;
+       }
+
+error_rcu_unlock:
+       /* Destroy metadata on our side since we must not use it anymore. */
+       delete_ust_app_channel(-1, ua_sess->metadata, app);
+       ua_sess->metadata = NULL;
+
+       rcu_read_unlock();
+error:
+       return ret;
+}
+
 /*
  * Delete ust app session safely. RCU read lock must be held before calling
  * this function.
@@ -304,8 +470,14 @@ void delete_ust_app_session(int sock, struct ust_app_session *ua_sess,
        struct lttng_ht_iter iter;
        struct ust_app_channel *ua_chan;
 
+       assert(ua_sess);
+
        if (ua_sess->metadata) {
-               delete_ust_app_channel(sock, ua_sess->metadata, app);
+               /* Push metadata for application before freeing the application. */
+               (void) push_metadata(app, ua_sess);
+
+               /* And ask to close it for this session. */
+               (void) close_metadata(app, ua_sess);
        }
 
        cds_lfht_for_each_entry(ua_sess->channels->ht, &iter.iter, ua_chan,
@@ -888,8 +1060,8 @@ error:
 }
 
 /*
- * Create the specified channel onto the UST tracer for a UST session.
- * Called with UST app session lock held.
+ * Create the specified channel onto the UST tracer for a UST session. This
+ * MUST be called with UST app session lock held.
  *
  * Return 0 on success. On error, a negative value is returned.
  */
@@ -907,6 +1079,7 @@ static int create_ust_channel(struct ust_app *app,
        assert(ua_chan);
        assert(consumer);
 
+       rcu_read_lock();
        health_code_update();
 
        /* Get the right consumer socket for the application. */
@@ -929,7 +1102,7 @@ static int create_ust_channel(struct ust_app *app,
 
        /*
         * Compute the number of fd needed before receiving them. It must be 2 per
-        * stream.
+        * stream (2 being the default value here).
         */
        nb_fd = DEFAULT_UST_STREAM_FD_NUM * ua_chan->expected_stream_count;
 
@@ -957,6 +1130,8 @@ static int create_ust_channel(struct ust_app *app,
                goto error;
        }
 
+       health_code_update();
+
        /* Send all streams to application. */
        cds_list_for_each_entry_safe(stream, stmp, &ua_chan->streams.head, list) {
                ret = ust_consumer_send_stream_to_ust(app, ua_chan, stream);
@@ -986,6 +1161,7 @@ static int create_ust_channel(struct ust_app *app,
                }
        }
 
+       rcu_read_unlock();
        return 0;
 
 error_destroy:
@@ -1000,6 +1176,7 @@ error_fd_get:
        (void) ust_consumer_destroy_channel(socket, ua_chan);
 error:
        health_code_update();
+       rcu_read_unlock();
        return ret;
 }
 
@@ -1323,10 +1500,19 @@ static int create_ust_app_session(struct ltt_ust_session *usess,
                DBG2("UST app session created successfully with handle %d", ret);
        }
 
+       /*
+        * Assign consumer if not already set. For one application, there is only
+        * one possible consumer has of now.
+        */
+       if (!ua_sess->consumer) {
+               ua_sess->consumer = usess->consumer;
+       }
+
        *ua_sess_ptr = ua_sess;
        if (is_created) {
                *is_created = created;
        }
+
        /* Everything went well. */
        ret = 0;
 
@@ -1590,9 +1776,11 @@ static int create_ust_app_metadata(struct ust_app_session *ua_sess,
 {
        int ret = 0;
        struct ust_app_channel *metadata;
+       struct consumer_socket *socket;
 
        assert(ua_sess);
        assert(app);
+       assert(consumer);
 
        if (ua_sess->metadata) {
                /* Already exist. Return success. */
@@ -1616,19 +1804,43 @@ static int create_ust_app_metadata(struct ust_app_session *ua_sess,
        metadata->attr.output = LTTNG_UST_MMAP;
        metadata->attr.type = LTTNG_UST_CHAN_METADATA;
 
-       ret = create_ust_channel(app, ua_sess, metadata, consumer);
+       /* Get the right consumer socket for the application. */
+       socket = find_consumer_socket_by_bitness(app->bits_per_long, consumer);
+       if (!socket) {
+               ret = -EINVAL;
+               goto error_consumer;
+       }
+
+       /*
+        * Ask the metadata channel creation to the consumer. The metadata object
+        * will be created by the consumer and kept their. However, the stream is
+        * never added or monitored until we do a first push metadata to the
+        * consumer.
+        */
+       ret = ust_consumer_ask_channel(ua_sess, metadata, consumer, socket);
+       if (ret < 0) {
+               goto error_consumer;
+       }
+
+       /*
+        * The setup command will make the metadata stream be sent to the relayd,
+        * if applicable, and the thread managing the metadatas. This is important
+        * because after this point, if an error occurs, the only way the stream
+        * can be deleted is to be monitored in the consumer.
+        */
+       ret = ust_consumer_setup_metadata(socket, metadata);
        if (ret < 0) {
-               goto error_create;
+               goto error_consumer;
        }
 
        ua_sess->metadata = metadata;
 
-       DBG2("UST metadata opened for app pid %d", app->pid);
+       DBG2("UST metadata created for app pid %d", app->pid);
 
 end:
        return 0;
-error_create:
-       delete_ust_app_channel(metadata->is_sent ? app->sock : -1, metadata, app);
+error_consumer:
+       delete_ust_app_channel(-1, metadata, app);
 error:
        return ret;
 }
@@ -1642,10 +1854,12 @@ struct lttng_ht *ust_app_get_ht(void)
 }
 
 /*
- * Return ust app pointer or NULL if not found.
+ * Return ust app pointer or NULL if not found. RCU read side lock MUST be
+ * acquired before calling this function.
  */
 struct ust_app *ust_app_find_by_pid(pid_t pid)
 {
+       struct ust_app *app = NULL;
        struct lttng_ht_node_ulong *node;
        struct lttng_ht_iter iter;
 
@@ -1658,13 +1872,19 @@ struct ust_app *ust_app_find_by_pid(pid_t pid)
 
        DBG2("Found UST app by pid %d", pid);
 
-       return caa_container_of(node, struct ust_app, pid_n);
+       app = caa_container_of(node, struct ust_app, pid_n);
 
 error:
-       rcu_read_unlock();
-       return NULL;
+       return app;
 }
 
+/*
+ * Allocate and init an UST app object using the registration information and
+ * the command socket. This is called when the command socket connects to the
+ * session daemon.
+ *
+ * The object is returned on success or else NULL.
+ */
 struct ust_app *ust_app_create(struct ust_register_msg *msg, int sock)
 {
        struct ust_app *lta = NULL;
@@ -1693,7 +1913,6 @@ struct ust_app *ust_app_create(struct ust_register_msg *msg, int sock)
        lta->ppid = msg->ppid;
        lta->uid = msg->uid;
        lta->gid = msg->gid;
-       lta->compatible = 0;  /* Not compatible until proven */
 
        lta->bits_per_long = msg->bits_per_long;
        lta->uint8_t_alignment = msg->uint8_t_alignment;
@@ -1705,11 +1924,19 @@ struct ust_app *ust_app_create(struct ust_register_msg *msg, int sock)
 
        lta->v_major = msg->major;
        lta->v_minor = msg->minor;
-       strncpy(lta->name, msg->name, sizeof(lta->name));
-       lta->name[LTTNG_UST_ABI_PROCNAME_LEN] = '\0';
        lta->sessions = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
        lta->ust_objd = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
        lta->notify_sock = -1;
+
+       /* Copy name and make sure it's NULL terminated. */
+       strncpy(lta->name, msg->name, sizeof(lta->name));
+       lta->name[UST_APP_PROCNAME_LEN] = '\0';
+
+       /*
+        * Before this can be called, when receiving the registration information,
+        * the application compatibility is checked. So, at this point, the
+        * application can work with this session daemon.
+        */
        lta->compatible = 1;
 
        lta->pid = msg->pid;
@@ -1723,6 +1950,9 @@ error:
        return lta;
 }
 
+/*
+ * For a given application object, add it to every hash table.
+ */
 void ust_app_add(struct ust_app *app)
 {
        assert(app);
@@ -1748,16 +1978,35 @@ void ust_app_add(struct ust_app *app)
        lttng_ht_add_unique_ulong(ust_app_ht_by_notify_sock, &app->notify_sock_n);
 
        DBG("App registered with pid:%d ppid:%d uid:%d gid:%d sock:%d name:%s "
-                       "(version %d.%d)", app->pid, app->ppid, app->uid, app->gid,
-                       app->sock, app->name, app->v_major, app->v_minor);
+                       "notify_sock:%d (version %d.%d)", app->pid, app->ppid, app->uid,
+                       app->gid, app->sock, app->name, app->notify_sock, app->v_major,
+                       app->v_minor);
 
        rcu_read_unlock();
 }
 
+/*
+ * Set the application version into the object.
+ *
+ * Return 0 on success else a negative value either an errno code or a
+ * LTTng-UST error code.
+ */
 int ust_app_version(struct ust_app *app)
 {
+       int ret;
+
        assert(app);
-       return ustctl_tracer_version(app->sock, &app->version);
+
+       ret = ustctl_tracer_version(app->sock, &app->version);
+       if (ret < 0) {
+               if (ret != -LTTNG_UST_ERR_EXITING && ret != -EPIPE) {
+                       ERR("UST app %d verson failed with ret %d", app->sock, ret);
+               } else {
+                       DBG3("UST app %d verion failed. Application is dead", app->sock);
+               }
+       }
+
+       return ret;
 }
 
 /*
@@ -1788,9 +2037,14 @@ void ust_app_unregister(int sock)
        ret = lttng_ht_del(ust_app_ht_by_sock, &iter);
        assert(!ret);
 
-       /* Remove application from notify hash table */
+       /*
+        * Remove application from notify hash table. The thread handling the
+        * notify socket could have deleted the node so ignore on error because
+        * either way it's valid. The close of that socket is handled by the other
+        * thread.
+        */
        iter.iter.node = &lta->notify_sock_n.node;
-       ret = lttng_ht_del(ust_app_ht_by_notify_sock, &iter);
+       (void) lttng_ht_del(ust_app_ht_by_notify_sock, &iter);
 
        /*
         * Ignore return value since the node might have been removed before by an
@@ -1817,7 +2071,24 @@ void ust_app_unregister(int sock)
                 * Add session to list for teardown. This is safe since at this point we
                 * are the only one using this list.
                 */
+               pthread_mutex_lock(&ua_sess->lock);
+
+               /*
+                * Normally, this is done in the delete session process which is
+                * executed in the call rcu below. However, upon registration we can't
+                * afford to wait for the grace period before pushing data or else the
+                * data pending feature can race between the unregistration and stop
+                * command where the data pending command is sent *before* the grace
+                * period ended.
+                *
+                * The close metadata below nullifies the metadata pointer in the
+                * session so the delete session will NOT push/close a second time.
+                */
+               (void) push_metadata(lta, ua_sess);
+               (void) close_metadata(lta, ua_sess);
+
                cds_list_add(&ua_sess->teardown_node, &lta->teardown_head);
+               pthread_mutex_unlock(&ua_sess->lock);
        }
 
        /* Free memory */
@@ -2060,9 +2331,17 @@ void ust_app_clean_list(void)
                assert(!ret);
        }
 
+       /* Cleanup notify socket hash table */
+       cds_lfht_for_each_entry(ust_app_ht_by_notify_sock->ht, &iter.iter, app,
+                       notify_sock_n.node) {
+               ret = lttng_ht_del(ust_app_ht_by_notify_sock, &iter);
+               assert(!ret);
+       }
+
        /* Destroy is done only when the ht is empty */
        lttng_ht_destroy(ust_app_ht);
        lttng_ht_destroy(ust_app_ht_by_sock);
+       lttng_ht_destroy(ust_app_ht_by_notify_sock);
 
        rcu_read_unlock();
 }
@@ -2625,14 +2904,16 @@ int ust_app_stop_trace(struct ltt_ust_session *usess, struct ust_app *app)
        rcu_read_lock();
 
        if (!app->compatible) {
-               goto end;
+               goto end_no_session;
        }
 
        ua_sess = lookup_session_by_app(usess, app);
        if (ua_sess == NULL) {
-               goto end;
+               goto end_no_session;
        }
 
+       pthread_mutex_lock(&ua_sess->lock);
+
        /*
         * If started = 0, it means that stop trace has been called for a session
         * that was never started. It's possible since we can have a fail start
@@ -2682,7 +2963,7 @@ int ust_app_stop_trace(struct ltt_ust_session *usess, struct ust_app *app)
                                DBG3("UST app failed to flush %s. Application is dead.",
                                                ua_chan->name);
                                /* No need to continue. */
-                               goto end;
+                               break;
                        }
                        /* Continuing flushing all buffers */
                        continue;
@@ -2691,25 +2972,19 @@ int ust_app_stop_trace(struct ltt_ust_session *usess, struct ust_app *app)
 
        health_code_update();
 
-       assert(ua_sess->metadata->is_sent);
-       /* Flush all buffers before stopping */
-       ret = ustctl_sock_flush_buffer(app->sock, ua_sess->metadata->obj);
+       ret = push_metadata(app, ua_sess);
        if (ret < 0) {
-               if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
-                       ERR("UST app PID %d metadata flush failed with ret %d", app->pid,
-                                       ret);
-                       goto error_rcu_unlock;
-               } else {
-                       DBG3("UST app failed to flush metadata. Application is dead.");
-               }
+               goto error_rcu_unlock;
        }
 
-end:
+       pthread_mutex_unlock(&ua_sess->lock);
+end_no_session:
        rcu_read_unlock();
        health_code_update();
        return 0;
 
 error_rcu_unlock:
+       pthread_mutex_unlock(&ua_sess->lock);
        rcu_read_unlock();
        health_code_update();
        return -1;
@@ -2752,7 +3027,6 @@ static int destroy_trace(struct ltt_ust_session *usess, struct ust_app *app)
                ERR("UST app wait quiescent failed for app pid %d ret %d",
                                app->pid, ret);
        }
-
 end:
        rcu_read_unlock();
        health_code_update();
@@ -2801,6 +3075,7 @@ int ust_app_stop_trace_all(struct ltt_ust_session *usess)
        cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
                ret = ust_app_stop_trace(usess, app);
                if (ret < 0) {
+                       ERR("UST app stop trace failed with ret %d", ret);
                        /* Continue to next apps even on error */
                        continue;
                }
@@ -2860,7 +3135,11 @@ void ust_app_global_update(struct ltt_ust_session *usess, int sock)
 
        app = find_app_by_sock(sock);
        if (app == NULL) {
-               ERR("Failed to find app sock %d", sock);
+               /*
+                * Application can be unregistered before so this is possible hence
+                * simply stopping the update.
+                */
+               DBG3("UST app update failed to find app sock %d", sock);
                goto error;
        }
 
@@ -3212,6 +3491,11 @@ error:
        return ret;
 }
 
+/*
+ * Return a ust app channel object using the application object and the channel
+ * object descriptor has a key. If not found, NULL is returned. A RCU read side
+ * lock MUST be acquired before calling this function.
+ */
 static struct ust_app_channel *find_channel_by_objd(struct ust_app *app,
                int objd)
 {
@@ -3234,6 +3518,14 @@ error:
        return ua_chan;
 }
 
+/*
+ * Reply to a register channel notification from an application on the notify
+ * socket. The channel metadata is also created.
+ *
+ * The session UST registry lock is acquired in this function.
+ *
+ * On success 0 is returned else a negative value.
+ */
 static int reply_ust_register_channel(int sock, int sobjd, int cobjd,
                size_t nr_fields, struct ustctl_field *fields)
 {
@@ -3248,7 +3540,12 @@ static int reply_ust_register_channel(int sock, int sobjd, int cobjd,
 
        /* Lookup application. If not found, there is a code flow error. */
        app = find_app_by_notify_sock(sock);
-       assert(app);
+       if (!app) {
+               DBG("Application socket %d is being teardown. Abort event notify",
+                               sock);
+               ret = 0;
+               goto error_rcu_unlock;
+       }
 
        /* Lookup channel by UST object descriptor. Should always be found. */
        ua_chan = find_channel_by_objd(app, cobjd);
@@ -3311,10 +3608,20 @@ reply:
 
 error:
        pthread_mutex_unlock(&ua_sess->registry.lock);
+error_rcu_unlock:
        rcu_read_unlock();
        return ret;
 }
 
+/*
+ * Add event to the UST channel registry. When the event is added to the
+ * registry, the metadata is also created. Once done, this replies to the
+ * application with the appropriate error code.
+ *
+ * The session UST registry lock is acquired in the function.
+ *
+ * On success 0 is returned else a negative value.
+ */
 static int add_event_ust_registry(int sock, int sobjd, int cobjd, char *name,
                char *sig, size_t nr_fields, struct ustctl_field *fields, int loglevel,
                char *model_emf_uri)
@@ -3329,7 +3636,12 @@ static int add_event_ust_registry(int sock, int sobjd, int cobjd, char *name,
 
        /* Lookup application. If not found, there is a code flow error. */
        app = find_app_by_notify_sock(sock);
-       assert(app);
+       if (!app) {
+               DBG("Application socket %d is being teardown. Abort event notify",
+                               sock);
+               ret = 0;
+               goto error_rcu_unlock;
+       }
 
        /* Lookup channel by UST object descriptor. Should always be found. */
        ua_chan = find_channel_by_objd(app, cobjd);
@@ -3339,8 +3651,9 @@ static int add_event_ust_registry(int sock, int sobjd, int cobjd, char *name,
 
        pthread_mutex_lock(&ua_sess->registry.lock);
 
-       ret_code = ust_registry_create_event(&ua_sess->registry, &ua_chan->registry, sobjd, cobjd,
-                       name, sig, nr_fields, fields, loglevel, model_emf_uri, &event_id);
+       ret_code = ust_registry_create_event(&ua_sess->registry,
+                       &ua_chan->registry, sobjd, cobjd, name, sig, nr_fields, fields,
+                       loglevel, model_emf_uri, &event_id);
 
        /*
         * The return value is returned to ustctl so in case of an error, the
@@ -3361,12 +3674,20 @@ static int add_event_ust_registry(int sock, int sobjd, int cobjd, char *name,
                goto error;
        }
 
+       DBG3("UST registry event %s has been added successfully", name);
+
 error:
        pthread_mutex_unlock(&ua_sess->registry.lock);
+error_rcu_unlock:
        rcu_read_unlock();
        return ret;
 }
 
+/*
+ * Handle application notification through the given notify socket.
+ *
+ * Return 0 on success or else a negative value.
+ */
 int ust_app_recv_notify(int sock)
 {
        int ret;
@@ -3449,3 +3770,80 @@ int ust_app_recv_notify(int sock)
 error:
        return ret;
 }
+
+/*
+ * Once the notify socket hangs up, this is called. First, it tries to find the
+ * corresponding application. On failure, the call_rcu to close the socket is
+ * executed. If an application is found, it tries to delete it from the notify
+ * socket hash table. Whathever the result, it proceeds to the call_rcu.
+ *
+ * Note that an object needs to be allocated here so on ENOMEM failure, the
+ * call RCU is not done but the rest of the cleanup is.
+ */
+void ust_app_notify_sock_unregister(int sock)
+{
+       int err_enomem = 0;
+       struct lttng_ht_iter iter;
+       struct ust_app *app;
+       struct ust_app_notify_sock_obj *obj;
+
+       assert(sock >= 0);
+
+       rcu_read_lock();
+
+       obj = zmalloc(sizeof(*obj));
+       if (!obj) {
+               /*
+                * An ENOMEM is kind of uncool. If this strikes we continue the
+                * procedure but the call_rcu will not be called. In this case, we
+                * accept the fd leak rather than possibly creating an unsynchronized
+                * state between threads.
+                *
+                * TODO: The notify object should be created once the notify socket is
+                * registered and stored independantely from the ust app object. The
+                * tricky part is to synchronize the teardown of the application and
+                * this notify object. Let's keep that in mind so we can avoid this
+                * kind of shenanigans with ENOMEM in the teardown path.
+                */
+               err_enomem = 1;
+       } else {
+               obj->fd = sock;
+       }
+
+       DBG("UST app notify socket unregister %d", sock);
+
+       /*
+        * Lookup application by notify socket. If this fails, this means that the
+        * hash table delete has already been done by the application
+        * unregistration process so we can safely close the notify socket in a
+        * call RCU.
+        */
+       app = find_app_by_notify_sock(sock);
+       if (!app) {
+               goto close_socket;
+       }
+
+       iter.iter.node = &app->notify_sock_n.node;
+
+       /*
+        * Whatever happens here either we fail or succeed, in both cases we have
+        * to close the socket after a grace period to continue to the call RCU
+        * here. If the deletion is successful, the application is not visible
+        * anymore by other threads and is it fails it means that it was already
+        * deleted from the hash table so either way we just have to close the
+        * socket.
+        */
+       (void) lttng_ht_del(ust_app_ht_by_notify_sock, &iter);
+
+close_socket:
+       rcu_read_unlock();
+
+       /*
+        * Close socket after a grace period to avoid for the socket to be reused
+        * before the application object is freed creating potential race between
+        * threads trying to add unique in the global hash table.
+        */
+       if (!err_enomem) {
+               call_rcu(&obj->head, close_notify_sock_rcu);
+       }
+}
index 00d2d5ae402f5204ab9774bf03155eb48bc9f248..c6294d0a4626df9d51b8d602cb72ccbeb89d3cb6 100644 (file)
 #include "trace-ust.h"
 #include "ust-registry.h"
 
-/* lttng-ust supported version. */
-//#define LTTNG_UST_COMM_MAJOR          2      /* comm protocol major version */
-//#define UST_APP_MAJOR_VERSION         3 /* Internal UST version supported */
-
 #define UST_APP_EVENT_LIST_SIZE 32
 
-/* Process name (short). Extra for the NULL byte. */
-#define UST_APP_PROCNAME_LEN   17
+/* Process name (short). */
+#define UST_APP_PROCNAME_LEN   16
 
 struct lttng_filter_bytecode;
 struct lttng_ust_filter_bytecode;
 
 extern int ust_consumerd64_fd, ust_consumerd32_fd;
 
+/*
+ * Object used to close the notify socket in a call_rcu(). Since the
+ * application might not be found, we need an independant object containing the
+ * notify socket fd.
+ */
+struct ust_app_notify_sock_obj {
+       int fd;
+       struct rcu_head head;
+};
+
 struct ust_app_ht_key {
        const char *name;
        const struct lttng_ust_filter_bytecode *filter;
@@ -124,7 +130,7 @@ struct ust_app_channel {
        /* Channel and streams were sent to the UST tracer. */
        int is_sent;
        /* Unique key used to identify the channel on the consumer side. */
-       unsigned long key;
+       uint64_t key;
        /* Number of stream that this channel is expected to receive. */
        unsigned int expected_stream_count;
        char name[LTTNG_UST_SYM_NAME_LEN];
@@ -172,6 +178,11 @@ struct ust_app_session {
        uid_t uid;
        gid_t gid;
        struct cds_list_head teardown_node;
+       /*
+        * Once at least *one* session is created onto the application, the
+        * corresponding consumer is set so we can use it on unregistration.
+        */
+       struct consumer_output *consumer;
 };
 
 /*
@@ -201,7 +212,8 @@ struct ust_app {
        struct lttng_ust_tracer_version version;
        uint32_t v_major;    /* Version major number */
        uint32_t v_minor;    /* Version minor number */
-       char name[UST_APP_PROCNAME_LEN];
+       /* Extra for the NULL byte. */
+       char name[UST_APP_PROCNAME_LEN + 1];
        struct lttng_ht *sessions;
        struct lttng_ht_node_ulong pid_n;
        struct lttng_ht_node_ulong sock_n;
@@ -276,6 +288,7 @@ int ust_app_recv_registration(int sock, struct ust_register_msg *msg);
 int ust_app_recv_notify(int sock);
 void ust_app_add(struct ust_app *app);
 struct ust_app *ust_app_create(struct ust_register_msg *msg, int sock);
+void ust_app_notify_sock_unregister(int sock);
 
 #else /* HAVE_LIBLTTNG_UST_CTL */
 
@@ -458,6 +471,10 @@ static inline
 void ust_app_add(struct ust_app *app)
 {
 }
+static inline
+void ust_app_notify_sock_unregister(int sock)
+{
+}
 
 #endif /* HAVE_LIBLTTNG_UST_CTL */
 
index 60ec5c0b2d623013d7bc2858c2927e207e1b9e4e..33cb6ff376c16c92db8bf605963054b364b59555 100644 (file)
@@ -21,6 +21,7 @@
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
+#include <inttypes.h>
 
 #include <common/common.h>
 #include <common/consumer.h>
@@ -99,7 +100,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess,
                struct consumer_socket *socket)
 {
        int ret;
-       unsigned long key;
+       uint64_t key;
        char *pathname = NULL;
        struct lttcomm_consumer_msg msg;
 
@@ -151,7 +152,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess,
        /* We need at least one where 1 stream for 1 cpu. */
        assert(ua_chan->expected_stream_count > 0);
 
-       DBG2("UST ask channel %lu successfully done with %u stream(s)", key,
+       DBG2("UST ask channel %" PRIu64 " successfully done with %u stream(s)", key,
                        ua_chan->expected_stream_count);
 
 error:
@@ -383,3 +384,131 @@ int ust_consumer_send_channel_to_ust(struct ust_app *app,
 error:
        return ret;
 }
+
+/*
+ * Send metadata string to consumer.
+ *
+ * Return 0 on success else a negative value.
+ */
+int ust_consumer_push_metadata(struct consumer_socket *socket,
+               struct ust_app_session *ua_sess, char *metadata_str,
+               size_t len, size_t target_offset)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+
+       assert(socket);
+       assert(socket->fd >= 0);
+       assert(ua_sess);
+       assert(ua_sess->metadata);
+
+       DBG2("UST consumer push metadata to consumer socket %d", socket->fd);
+
+       msg.cmd_type = LTTNG_CONSUMER_PUSH_METADATA;
+       msg.u.push_metadata.key = ua_sess->metadata->key;
+       msg.u.push_metadata.target_offset = target_offset;
+       msg.u.push_metadata.len = len;
+
+       /*
+        * TODO: reenable these locks when the consumerd gets the ability to
+        * reorder the metadata it receives. This fits with locking in
+        * src/bin/lttng-sessiond/ust-app.c:push_metadata()
+        *
+        * pthread_mutex_lock(socket->lock);
+        */
+
+       health_code_update();
+       ret = consumer_send_msg(socket, &msg);
+       if (ret < 0) {
+               goto error;
+       }
+
+       DBG3("UST consumer push metadata on sock %d of len %lu", socket->fd, len);
+
+       ret = lttcomm_send_unix_sock(socket->fd, metadata_str, len);
+       if (ret < 0) {
+               fprintf(stderr, "send error: %d\n", ret);
+               goto error;
+       }
+
+       health_code_update();
+       ret = consumer_recv_status_reply(socket);
+       if (ret < 0) {
+               goto error;
+       }
+
+error:
+       health_code_update();
+       /*
+        * pthread_mutex_unlock(socket->lock);
+        */
+       return ret;
+}
+
+/*
+ * Send a close metdata command to consumer using the given channel key.
+ *
+ * Return 0 on success else a negative value.
+ */
+int ust_consumer_close_metadata(struct consumer_socket *socket,
+               struct ust_app_channel *ua_chan)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+
+       assert(ua_chan);
+       assert(socket);
+       assert(socket->fd >= 0);
+
+       DBG2("UST consumer close metadata channel key %lu", ua_chan->key);
+
+       msg.cmd_type = LTTNG_CONSUMER_CLOSE_METADATA;
+       msg.u.close_metadata.key = ua_chan->key;
+
+       pthread_mutex_lock(socket->lock);
+       health_code_update();
+
+       ret = consumer_send_msg(socket, &msg);
+       if (ret < 0) {
+               goto error;
+       }
+
+error:
+       health_code_update();
+       pthread_mutex_unlock(socket->lock);
+       return ret;
+}
+
+/*
+ * Send a setup metdata command to consumer using the given channel key.
+ *
+ * Return 0 on success else a negative value.
+ */
+int ust_consumer_setup_metadata(struct consumer_socket *socket,
+               struct ust_app_channel *ua_chan)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+
+       assert(ua_chan);
+       assert(socket);
+       assert(socket->fd >= 0);
+
+       DBG2("UST consumer setup metadata channel key %lu", ua_chan->key);
+
+       msg.cmd_type = LTTNG_CONSUMER_SETUP_METADATA;
+       msg.u.setup_metadata.key = ua_chan->key;
+
+       pthread_mutex_lock(socket->lock);
+       health_code_update();
+
+       ret = consumer_send_msg(socket, &msg);
+       if (ret < 0) {
+               goto error;
+       }
+
+error:
+       health_code_update();
+       pthread_mutex_unlock(socket->lock);
+       return ret;
+}
index f48ea42f27bfdfaff0e34cee692f3364ad590817..8739af5520c2c8d96838930c62e6332e2dbffbc4 100644 (file)
@@ -37,4 +37,14 @@ int ust_consumer_send_stream_to_ust(struct ust_app *app,
 int ust_consumer_send_channel_to_ust(struct ust_app *app,
                struct ust_app_session *ua_sess, struct ust_app_channel *channel);
 
+int ust_consumer_push_metadata(struct consumer_socket *socket,
+               struct ust_app_session *ua_sess, char *metadata_str,
+               size_t len, size_t target_offset);
+
+int ust_consumer_close_metadata(struct consumer_socket *socket,
+               struct ust_app_channel *ua_chan);
+
+int ust_consumer_setup_metadata(struct consumer_socket *socket,
+               struct ust_app_channel *ua_chan);
+
 #endif /* _UST_CONSUMER_H */
index f8871ecaad1f460e803d9862b891acd9df937aa9..45512ac7ba0df66ec1b2b19368b175c18198d3ae 100644 (file)
@@ -580,8 +580,7 @@ int ust_metadata_session_statedump(struct ust_registry_session *session,
                "       tracer_name = \"lttng-ust\";\n"
                "       tracer_major = %u;\n"
                "       tracer_minor = %u;\n"
-               "       tracer_patchlevel = %u;\n"
-               "};\n\n",
+               "       tracer_patchlevel = %u;\n",
                hostname,
                app->version.major,
                app->version.minor,
@@ -597,8 +596,7 @@ int ust_metadata_session_statedump(struct ust_registry_session *session,
        if (app) {
                ret = lttng_metadata_printf(session,
                        "       vpid = %d;\n"
-                       "       procname = \"%s\";\n"
-                       "};\n\n",
+                       "       procname = \"%s\";\n",
                        (int) app->pid,
                        app->name
                        );
index ae7ad3ae380d2751f0b669a8ef0bb5170bffc9e7..5efa0828d5a28b824f8938b188c400831442a927 100644 (file)
@@ -60,6 +60,8 @@ struct ust_registry_session {
        /* Generated metadata. */
        char *metadata;         /* NOT null-terminated ! Use memcpy. */
        size_t metadata_len, metadata_alloc_len;
+       /* Length of bytes sent to the consumer. */
+       size_t metadata_len_sent;
 };
 
 struct ust_registry_channel {
index 76d6ef99da84d31c77f89ef02de72d18931940f6..552b7ddd8176a6a246fd90168189dce3dd691aba 100644 (file)
@@ -128,24 +128,20 @@ restart:
                                                goto error;
                                        }
 
-                                       ret = close(pollfd);
-                                       if (ret < 0) {
-                                               PERROR("close sock %d", pollfd);
-                                       }
-                                       lttng_fd_put(LTTNG_FD_APPS, 1);
+                                       /* The socket is closed after a grace period here. */
+                                       ust_app_notify_sock_unregister(pollfd);
                                } else if (revents & (LPOLLIN | LPOLLPRI)) {
                                        ret = ust_app_recv_notify(pollfd);
                                        if (ret < 0) {
-                                               ret = lttng_poll_del(&events, pollfd);
-                                               if (ret < 0) {
-                                                       goto error;
-                                               }
-
-                                               ret = close(pollfd);
-                                               if (ret < 0) {
-                                                       PERROR("close sock %d", pollfd);
-                                               }
-                                               lttng_fd_put(LTTNG_FD_APPS, 1);
+                                               /*
+                                                * If the notification failed either the application is
+                                                * dead or an internal error happened. In both cases,
+                                                * we can only continue here. If the application is
+                                                * dead, an unregistration will follow or else the
+                                                * application will notice that we are not responding
+                                                * on that socket and will close it.
+                                                */
+                                               continue;
                                        }
                                } else {
                                        ERR("Unknown poll events %u for sock %d", revents, pollfd);
index 6272e19b710b1ff2106c0d709c50bb973222523a..25f891af9abf17b3f9e7beb4ed2e75bec671296e 100644 (file)
@@ -82,24 +82,24 @@ static void notify_thread_pipe(int wpipe)
  * Find a stream. The consumer_data.lock must be locked during this
  * call.
  */
-static struct lttng_consumer_stream *find_stream(int key,
+static struct lttng_consumer_stream *find_stream(uint64_t key,
                struct lttng_ht *ht)
 {
        struct lttng_ht_iter iter;
-       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_node_u64 *node;
        struct lttng_consumer_stream *stream = NULL;
 
        assert(ht);
 
-       /* Negative keys are lookup failures */
-       if (key < 0) {
+       /* -1ULL keys are lookup failures */
+       if (key == (uint64_t) -1ULL) {
                return NULL;
        }
 
        rcu_read_lock();
 
-       lttng_ht_lookup(ht, (void *)((unsigned long) key), &iter);
-       node = lttng_ht_iter_get_node_ulong(&iter);
+       lttng_ht_lookup(ht, &key, &iter);
+       node = lttng_ht_iter_get_node_u64(&iter);
        if (node != NULL) {
                stream = caa_container_of(node, struct lttng_consumer_stream, node);
        }
@@ -116,13 +116,13 @@ static void steal_stream_key(int key, struct lttng_ht *ht)
        rcu_read_lock();
        stream = find_stream(key, ht);
        if (stream) {
-               stream->key = -1;
+               stream->key = -1ULL;
                /*
                 * We don't want the lookup to match, but we still need
                 * to iterate on this stream when iterating over the hash table. Just
                 * change the node key.
                 */
-               stream->node.key = -1;
+               stream->node.key = -1ULL;
        }
        rcu_read_unlock();
 }
@@ -133,19 +133,19 @@ static void steal_stream_key(int key, struct lttng_ht *ht)
  * RCU read side lock MUST be acquired before calling this function and
  * protects the channel ptr.
  */
-struct lttng_consumer_channel *consumer_find_channel(unsigned long key)
+struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
 {
        struct lttng_ht_iter iter;
-       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_node_u64 *node;
        struct lttng_consumer_channel *channel = NULL;
 
-       /* Negative keys are lookup failures */
-       if (key < 0) {
+       /* -1ULL keys are lookup failures */
+       if (key == (uint64_t) -1ULL) {
                return NULL;
        }
 
-       lttng_ht_lookup(consumer_data.channel_ht, (void *) key, &iter);
-       node = lttng_ht_iter_get_node_ulong(&iter);
+       lttng_ht_lookup(consumer_data.channel_ht, &key, &iter);
+       node = lttng_ht_iter_get_node_u64(&iter);
        if (node != NULL) {
                channel = caa_container_of(node, struct lttng_consumer_channel, node);
        }
@@ -155,8 +155,8 @@ struct lttng_consumer_channel *consumer_find_channel(unsigned long key)
 
 static void free_stream_rcu(struct rcu_head *head)
 {
-       struct lttng_ht_node_ulong *node =
-               caa_container_of(head, struct lttng_ht_node_ulong, head);
+       struct lttng_ht_node_u64 *node =
+               caa_container_of(head, struct lttng_ht_node_u64, head);
        struct lttng_consumer_stream *stream =
                caa_container_of(node, struct lttng_consumer_stream, node);
 
@@ -165,8 +165,8 @@ static void free_stream_rcu(struct rcu_head *head)
 
 static void free_channel_rcu(struct rcu_head *head)
 {
-       struct lttng_ht_node_ulong *node =
-               caa_container_of(head, struct lttng_ht_node_ulong, head);
+       struct lttng_ht_node_u64 *node =
+               caa_container_of(head, struct lttng_ht_node_u64, head);
        struct lttng_consumer_channel *channel =
                caa_container_of(node, struct lttng_consumer_channel, node);
 
@@ -178,8 +178,8 @@ static void free_channel_rcu(struct rcu_head *head)
  */
 static void free_relayd_rcu(struct rcu_head *head)
 {
-       struct lttng_ht_node_ulong *node =
-               caa_container_of(head, struct lttng_ht_node_ulong, head);
+       struct lttng_ht_node_u64 *node =
+               caa_container_of(head, struct lttng_ht_node_u64, head);
        struct consumer_relayd_sock_pair *relayd =
                caa_container_of(node, struct consumer_relayd_sock_pair, node);
 
@@ -233,7 +233,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        int ret;
        struct lttng_ht_iter iter;
 
-       DBG("Consumer delete channel key %d", channel->key);
+       DBG("Consumer delete channel key %" PRIu64, channel->key);
 
        pthread_mutex_lock(&consumer_data.lock);
 
@@ -490,8 +490,8 @@ free_stream_rcu:
        call_rcu(&stream->node.head, free_stream_rcu);
 }
 
-struct lttng_consumer_stream *consumer_allocate_stream(int channel_key,
-               int stream_key,
+struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
+               uint64_t stream_key,
                enum lttng_consumer_stream_state state,
                const char *channel_name,
                uid_t uid,
@@ -540,12 +540,12 @@ struct lttng_consumer_stream *consumer_allocate_stream(int channel_key,
        }
 
        /* Key is always the wait_fd for streams. */
-       lttng_ht_node_init_ulong(&stream->node, stream->key);
+       lttng_ht_node_init_u64(&stream->node, stream->key);
 
        /* Init session id node with the stream session id */
-       lttng_ht_node_init_ulong(&stream->node_session_id, stream->session_id);
+       lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
 
-       DBG3("Allocated stream %s (key %d, relayd_id %d, session_id %" PRIu64,
+       DBG3("Allocated stream %s (key %" PRIu64 ", relayd_id %" PRIu64 ", session_id %" PRIu64,
                        stream->name, stream->key, stream->net_seq_idx, stream->session_id);
 
        rcu_read_unlock();
@@ -573,7 +573,7 @@ static int add_stream(struct lttng_consumer_stream *stream,
        assert(stream);
        assert(ht);
 
-       DBG3("Adding consumer stream %d", stream->key);
+       DBG3("Adding consumer stream %" PRIu64, stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&stream->lock);
@@ -582,14 +582,14 @@ static int add_stream(struct lttng_consumer_stream *stream,
        /* Steal stream identifier to avoid having streams with the same key */
        steal_stream_key(stream->key, ht);
 
-       lttng_ht_add_unique_ulong(ht, &stream->node);
+       lttng_ht_add_unique_u64(ht, &stream->node);
 
        /*
         * Add stream to the stream_list_ht of the consumer data. No need to steal
         * the key since the HT does not use it and we allow to add redundant keys
         * into this table.
         */
-       lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
+       lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
 
        /* Check and cleanup relayd */
        relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -629,18 +629,18 @@ static int add_stream(struct lttng_consumer_stream *stream,
 static int add_relayd(struct consumer_relayd_sock_pair *relayd)
 {
        int ret = 0;
-       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_node_u64 *node;
        struct lttng_ht_iter iter;
 
        assert(relayd);
 
        lttng_ht_lookup(consumer_data.relayd_ht,
-                       (void *)((unsigned long) relayd->net_seq_idx), &iter);
-       node = lttng_ht_iter_get_node_ulong(&iter);
+                       &relayd->net_seq_idx, &iter);
+       node = lttng_ht_iter_get_node_u64(&iter);
        if (node != NULL) {
                goto end;
        }
-       lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
+       lttng_ht_add_unique_u64(consumer_data.relayd_ht, &relayd->node);
 
 end:
        return ret;
@@ -668,7 +668,7 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
        obj->net_seq_idx = net_seq_idx;
        obj->refcount = 0;
        obj->destroy_flag = 0;
-       lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx);
+       lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
        pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
 
 error:
@@ -682,20 +682,20 @@ error:
  * RCU read-side lock must be held across this call and while using the
  * returned object.
  */
-struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
+struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
 {
        struct lttng_ht_iter iter;
-       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_node_u64 *node;
        struct consumer_relayd_sock_pair *relayd = NULL;
 
        /* Negative keys are lookup failures */
-       if (key < 0) {
+       if (key == (uint64_t) -1ULL) {
                goto error;
        }
 
-       lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
+       lttng_ht_lookup(consumer_data.relayd_ht, &key,
                        &iter);
-       node = lttng_ht_iter_get_node_ulong(&iter);
+       node = lttng_ht_iter_get_node_u64(&iter);
        if (node != NULL) {
                relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
        }
@@ -801,10 +801,10 @@ struct lttng_consumer_channel *consumer_allocate_channel(unsigned long key,
        strncpy(channel->name, name, sizeof(channel->name));
        channel->name[sizeof(channel->name) - 1] = '\0';
 
-       lttng_ht_node_init_ulong(&channel->node, channel->key);
+       lttng_ht_node_init_u64(&channel->node, channel->key);
        CDS_INIT_LIST_HEAD(&channel->streams.head);
 
-       DBG("Allocated channel (key %d)", channel->key)
+       DBG("Allocated channel (key %" PRIu64 ")", channel->key)
 
 end:
        return channel;
@@ -816,23 +816,24 @@ end:
 int consumer_add_channel(struct lttng_consumer_channel *channel)
 {
        int ret = 0;
-       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_node_u64 *node;
        struct lttng_ht_iter iter;
 
        pthread_mutex_lock(&consumer_data.lock);
        rcu_read_lock();
 
        lttng_ht_lookup(consumer_data.channel_ht,
-                       (void *)((unsigned long) channel->key), &iter);
-       node = lttng_ht_iter_get_node_ulong(&iter);
+                       &channel->key, &iter);
+       node = lttng_ht_iter_get_node_u64(&iter);
        if (node != NULL) {
                /* Channel already exist. Ignore the insertion */
-               ERR("Consumer add channel key %d already exists!", channel->key);
+               ERR("Consumer add channel key %" PRIu64 " already exists!",
+                       channel->key);
                ret = -1;
                goto end;
        }
 
-       lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
+       lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
 
 end:
        rcu_read_unlock();
@@ -1727,6 +1728,32 @@ static void destroy_stream_ht(struct lttng_ht *ht)
        lttng_ht_destroy(ht);
 }
 
+void lttng_consumer_close_metadata(void)
+{
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               /*
+                * The Kernel consumer has a different metadata scheme so we don't
+                * close anything because the stream will be closed by the session
+                * daemon.
+                */
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               /*
+                * Close all metadata streams. The metadata hash table is passed and
+                * this call iterates over it by closing all wakeup fd. This is safe
+                * because at this point we are sure that the metadata producer is
+                * either dead or blocked.
+                */
+               lttng_ustconsumer_close_metadata(metadata_ht);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+       }
+}
+
 /*
  * Clean up a metadata stream and free its memory.
  */
@@ -1851,12 +1878,12 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
        int ret = 0;
        struct consumer_relayd_sock_pair *relayd;
        struct lttng_ht_iter iter;
-       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_node_u64 *node;
 
        assert(stream);
        assert(ht);
 
-       DBG3("Adding metadata stream %d to hash table", stream->key);
+       DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&stream->lock);
@@ -1872,8 +1899,8 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
         * Lookup the stream just to make sure it does not exist in our internal
         * state. This should NEVER happen.
         */
-       lttng_ht_lookup(ht, (void *)((unsigned long) stream->key), &iter);
-       node = lttng_ht_iter_get_node_ulong(&iter);
+       lttng_ht_lookup(ht, &stream->key, &iter);
+       node = lttng_ht_iter_get_node_u64(&iter);
        assert(!node);
 
        /* Find relayd and, if one is found, increment refcount. */
@@ -1896,14 +1923,14 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
                uatomic_dec(&stream->chan->nb_init_stream_left);
        }
 
-       lttng_ht_add_unique_ulong(ht, &stream->node);
+       lttng_ht_add_unique_u64(ht, &stream->node);
 
        /*
         * Add stream to the stream_list_ht of the consumer data. No need to steal
         * the key since the HT does not use it and we allow to add redundant keys
         * into this table.
         */
-       lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
+       lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
 
        rcu_read_unlock();
 
@@ -1975,14 +2002,14 @@ void *consumer_thread_metadata_poll(void *data)
        uint32_t revents, nb_fd;
        struct lttng_consumer_stream *stream = NULL;
        struct lttng_ht_iter iter;
-       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_node_u64 *node;
        struct lttng_poll_event events;
        struct lttng_consumer_local_data *ctx = data;
        ssize_t len;
 
        rcu_register_thread();
 
-       metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+       metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!metadata_ht) {
                /* ENOMEM at this point. Better to bail out. */
                goto error;
@@ -2091,9 +2118,12 @@ restart:
                        }
 
                        rcu_read_lock();
-                       lttng_ht_lookup(metadata_ht, (void *)((unsigned long) pollfd),
-                                       &iter);
-                       node = lttng_ht_iter_get_node_ulong(&iter);
+                       {
+                               uint64_t tmp_id = (uint64_t) pollfd;
+
+                               lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
+                       }
+                       node = lttng_ht_iter_get_node_u64(&iter);
                        assert(node);
 
                        stream = caa_container_of(node, struct lttng_consumer_stream,
@@ -2175,7 +2205,7 @@ void *consumer_thread_data_poll(void *data)
 
        rcu_register_thread();
 
-       data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+       data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (data_ht == NULL) {
                /* ENOMEM at this point. Better to bail out. */
                goto end;
@@ -2284,7 +2314,7 @@ void *consumer_thread_data_poll(void *data)
 
                        ret = add_stream(new_stream, data_ht);
                        if (ret) {
-                               ERR("Consumer add stream %d failed. Continuing",
+                               ERR("Consumer add stream %" PRIu64 " failed. Continuing",
                                                new_stream->key);
                                /*
                                 * At this point, if the add_stream fails, it is not in the
@@ -2520,6 +2550,14 @@ void *consumer_thread_sessiond_poll(void *data)
 end:
        DBG("Consumer thread sessiond poll exiting");
 
+       /*
+        * Close metadata streams since the producer is the session daemon which
+        * just died.
+        *
+        * NOTE: for now, this only applies to the UST tracer.
+        */
+       lttng_consumer_close_metadata();
+
        /*
         * when all fds have hung up, the polling thread
         * can exit cleanly
@@ -2596,9 +2634,9 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
  */
 void lttng_consumer_init(void)
 {
-       consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
-       consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
-       consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+       consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
 }
 
 /*
@@ -2729,7 +2767,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                goto error;
        }
 
-       DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
+       DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
                        sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
                        relayd->net_seq_idx, fd);
 
@@ -2866,8 +2904,8 @@ int consumer_data_pending(uint64_t id)
        }
 
        cds_lfht_for_each_entry_duplicate(ht->ht,
-                       ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
-                       ht->match_fct, (void *)((unsigned long) id),
+                       ht->hash_fct(&id, lttng_ht_seed),
+                       ht->match_fct, &id,
                        &iter.iter, stream, node_session_id.node) {
                /* If this call fails, the stream is being used hence data pending. */
                ret = stream_try_lock(stream);
index 92f9e20957f09351834a366a38a1a708498c8dad..a3e1ec3535094f169042d1bd70dae73fc19d5f58 100644 (file)
@@ -49,6 +49,9 @@ enum lttng_consumer_command {
        LTTNG_CONSUMER_ASK_CHANNEL_CREATION,
        LTTNG_CONSUMER_GET_CHANNEL,
        LTTNG_CONSUMER_DESTROY_CHANNEL,
+       LTTNG_CONSUMER_PUSH_METADATA,
+       LTTNG_CONSUMER_CLOSE_METADATA,
+       LTTNG_CONSUMER_SETUP_METADATA,
 };
 
 /* State of each fd in consumer */
@@ -77,7 +80,7 @@ enum consumer_channel_output {
 
 enum consumer_channel_type {
        CONSUMER_CHANNEL_TYPE_METADATA  = 0,
-       CONSUMER_CHANNEL_TYPE_DATA              = 1,
+       CONSUMER_CHANNEL_TYPE_DATA      = 1,
 };
 
 struct stream_list {
@@ -87,9 +90,9 @@ struct stream_list {
 
 struct lttng_consumer_channel {
        /* HT node used for consumer_data.channel_ht */
-       struct lttng_ht_node_ulong node;
+       struct lttng_ht_node_u64 node;
        /* Indexed key. Incremented value in the consumer. */
-       int key;
+       uint64_t key;
        /* Number of streams referencing this channel */
        int refcount;
        /* Tracing session id on the session daemon side. */
@@ -102,7 +105,7 @@ struct lttng_consumer_channel {
        uid_t uid;
        gid_t gid;
        /* Relayd id of the channel. -1 if it does not apply. */
-       int relayd_id;
+       int64_t relayd_id;
        /*
         * Number of streams NOT initialized yet. This is used in order to not
         * delete this channel if streams are getting initialized.
@@ -122,6 +125,17 @@ struct lttng_consumer_channel {
         * LTTNG_CONSUMER_GET_CHANNEL.
         */
        struct stream_list streams;
+       /*
+        * Set if the channel is metadata. We keep a reference to the stream
+        * because we have to flush data once pushed by the session daemon. For a
+        * regular channel, this is always set to NULL.
+        */
+       struct lttng_consumer_stream *metadata_stream;
+       /*
+        * Metadata written so far. Helps keeping track of
+        * contiguousness and order.
+        */
+       uint64_t contig_metadata_written;
 };
 
 /*
@@ -130,14 +144,14 @@ struct lttng_consumer_channel {
  */
 struct lttng_consumer_stream {
        /* HT node used by the data_ht and metadata_ht */
-       struct lttng_ht_node_ulong node;
+       struct lttng_ht_node_u64 node;
        /* HT node used in consumer_data.stream_list_ht */
-       struct lttng_ht_node_ulong node_session_id;
+       struct lttng_ht_node_u64 node_session_id;
        /* Pointer to associated channel. */
        struct lttng_consumer_channel *chan;
 
        /* Key by which the stream is indexed for 'node'. */
-       int key;
+       uint64_t key;
        /*
         * File descriptor of the data output file. This can be either a file or a
         * socket fd for relayd streaming.
@@ -167,7 +181,7 @@ struct lttng_consumer_stream {
        uid_t uid;
        gid_t gid;
        /* Network sequence number. Indicating on which relayd socket it goes. */
-       int net_seq_idx;
+       uint64_t net_seq_idx;
        /* Identify if the stream is the metadata */
        unsigned int metadata_flag;
        /* Used when the stream is set for network streaming */
@@ -214,7 +228,7 @@ struct lttng_consumer_stream {
  */
 struct consumer_relayd_sock_pair {
        /* Network sequence number. */
-       int net_seq_idx;
+       int64_t net_seq_idx;
        /* Number of stream associated with this relayd */
        unsigned int refcount;
 
@@ -245,7 +259,7 @@ struct consumer_relayd_sock_pair {
         * this socket is for now only used in a single thread.
         */
        struct lttcomm_sock data_sock;
-       struct lttng_ht_node_ulong node;
+       struct lttng_ht_node_u64 node;
 
        /* Session id on both sides for the sockets. */
        uint64_t relayd_session_id;
@@ -407,8 +421,8 @@ void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
  */
 int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll);
 
-struct lttng_consumer_stream *consumer_allocate_stream(int channel_key,
-               int stream_key,
+struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
+               uint64_t stream_key,
                enum lttng_consumer_stream_state state,
                const char *channel_name,
                uid_t uid,
@@ -418,7 +432,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(int channel_key,
                int cpu,
                int *alloc_ret,
                enum consumer_channel_type type);
-struct lttng_consumer_channel *consumer_allocate_channel(unsigned long key,
+struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t session_id,
                const char *pathname,
                const char *name,
@@ -436,8 +450,8 @@ void consumer_del_channel(struct lttng_consumer_channel *channel);
 /* lttng-relayd consumer command */
 struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
                int net_seq_idx);
-struct consumer_relayd_sock_pair *consumer_find_relayd(int key);
-struct lttng_consumer_channel *consumer_find_channel(unsigned long key);
+struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key);
+struct lttng_consumer_channel *consumer_find_channel(uint64_t key);
 int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
                size_t data_size);
 void consumer_steal_stream_key(int key, struct lttng_ht *ht);
index 24d579773e0ab26d89cfb5f078f84562ce4431df..263df46839d37af58e2efe66770f5e1833c90f0f 100644 (file)
@@ -54,6 +54,17 @@ static int match_ulong(struct cds_lfht_node *node, const void *key)
        return hash_match_key_ulong((void *) match_node->key, (void *) key);
 }
 
+/*
+ * Match function for u64 node.
+ */
+static int match_u64(struct cds_lfht_node *node, const void *key)
+{
+       struct lttng_ht_node_u64 *match_node =
+               caa_container_of(node, struct lttng_ht_node_u64, node);
+
+       return hash_match_key_u64(&match_node->key, (void *) key);
+}
+
 /*
  * Return an allocated lttng hashtable.
  */
@@ -88,6 +99,10 @@ struct lttng_ht *lttng_ht_new(unsigned long size, int type)
                ht->match_fct = match_ulong;
                ht->hash_fct = hash_key_ulong;
                break;
+       case LTTNG_HT_TYPE_U64:
+               ht->match_fct = match_u64;
+               ht->hash_fct = hash_key_u64;
+               break;
        default:
                ERR("Unknown lttng hashtable type %d", type);
                goto error;
@@ -136,6 +151,18 @@ void lttng_ht_node_init_ulong(struct lttng_ht_node_ulong *node,
        cds_lfht_node_init(&node->node);
 }
 
+/*
+ * Init lttng ht node uint64_t.
+ */
+void lttng_ht_node_init_u64(struct lttng_ht_node_u64 *node,
+               uint64_t key)
+{
+       assert(node);
+
+       node->key = key;
+       cds_lfht_node_init(&node->node);
+}
+
 /*
  * Free lttng ht node string.
  */
@@ -154,6 +181,15 @@ void lttng_ht_node_free_ulong(struct lttng_ht_node_ulong *node)
        free(node);
 }
 
+/*
+ * Free lttng ht node uint64_t.
+ */
+void lttng_ht_node_free_u64(struct lttng_ht_node_ulong *node)
+{
+       assert(node);
+       free(node);
+}
+
 /*
  * Lookup function in hashtable.
  */
@@ -196,6 +232,20 @@ void lttng_ht_add_ulong(struct lttng_ht *ht, struct lttng_ht_node_ulong *node)
                        &node->node);
 }
 
+/*
+ * Add uint64_t node to hashtable.
+
+ */
+void lttng_ht_add_u64(struct lttng_ht *ht, struct lttng_ht_node_u64 *node)
+{
+       assert(ht);
+       assert(ht->ht);
+       assert(node);
+
+       cds_lfht_add(ht->ht, ht->hash_fct(&node->key, lttng_ht_seed),
+                       &node->node);
+}
+
 /*
  * Add unique unsigned long node to hashtable.
  */
@@ -213,6 +263,23 @@ void lttng_ht_add_unique_ulong(struct lttng_ht *ht,
        assert(node_ptr == &node->node);
 }
 
+/*
+ * Add unique uint64_t node to hashtable.
+ */
+void lttng_ht_add_unique_u64(struct lttng_ht *ht,
+               struct lttng_ht_node_u64 *node)
+{
+       struct cds_lfht_node *node_ptr;
+       assert(ht);
+       assert(ht->ht);
+       assert(node);
+
+       node_ptr = cds_lfht_add_unique(ht->ht,
+                       ht->hash_fct(&node->key, lttng_ht_seed), ht->match_fct,
+                       &node->key, &node->node);
+       assert(node_ptr == &node->node);
+}
+
 /*
  * Add replace unsigned long node to hashtable.
  */
@@ -235,6 +302,28 @@ struct lttng_ht_node_ulong *lttng_ht_add_replace_ulong(struct lttng_ht *ht,
        assert(node_ptr == &node->node);
 }
 
+/*
+ * Add replace unsigned long node to hashtable.
+ */
+struct lttng_ht_node_u64 *lttng_ht_add_replace_u64(struct lttng_ht *ht,
+               struct lttng_ht_node_u64 *node)
+{
+       struct cds_lfht_node *node_ptr;
+       assert(ht);
+       assert(ht->ht);
+       assert(node);
+
+       node_ptr = cds_lfht_add_replace(ht->ht,
+                       ht->hash_fct(&node->key, lttng_ht_seed), ht->match_fct,
+                       &node->key, &node->node);
+       if (!node_ptr) {
+               return NULL;
+       } else {
+               return caa_container_of(node_ptr, struct lttng_ht_node_u64, node);
+       }
+       assert(node_ptr == &node->node);
+}
+
 /*
  * Delete node from hashtable.
  */
@@ -319,10 +408,26 @@ struct lttng_ht_node_ulong *lttng_ht_iter_get_node_ulong(
        return caa_container_of(node, struct lttng_ht_node_ulong, node);
 }
 
+/*
+ * Return lttng ht unsigned long node from iterator.
+ */
+struct lttng_ht_node_u64 *lttng_ht_iter_get_node_u64(
+               struct lttng_ht_iter *iter)
+{
+       struct cds_lfht_node *node;
+
+       assert(iter);
+       node = cds_lfht_iter_get_node(&iter->iter);
+       if (!node) {
+               return NULL;
+       }
+       return caa_container_of(node, struct lttng_ht_node_u64, node);
+}
+
 /*
  * lib constructor
  */
-static void __attribute__((constructor)) _init()
+static void __attribute__((constructor)) _init(void)
 {
        /* Init hash table seed */
        lttng_ht_seed = (unsigned long) time(NULL);
index 4007a9c6c6b29696313415c60473f198cc859a6a..b4c1909b79796fe1a84b4714664939697a5488b2 100644 (file)
@@ -19,6 +19,7 @@
 #define _LTT_HT_H
 
 #include <urcu.h>
+#include <stdint.h>
 
 #include "rculfhash.h"
 #include "rculfhash-internal.h"
@@ -31,6 +32,7 @@ typedef cds_lfht_match_fct hash_match_fct;
 enum lttng_ht_type {
        LTTNG_HT_TYPE_STRING,
        LTTNG_HT_TYPE_ULONG,
+       LTTNG_HT_TYPE_U64,
 };
 
 struct lttng_ht {
@@ -55,6 +57,12 @@ struct lttng_ht_node_ulong {
        struct rcu_head head;
 };
 
+struct lttng_ht_node_u64 {
+       uint64_t key;
+       struct cds_lfht_node node;
+       struct rcu_head head;
+};
+
 /* Hashtable new and destroy */
 extern struct lttng_ht *lttng_ht_new(unsigned long size, int type);
 extern void lttng_ht_destroy(struct lttng_ht *ht);
@@ -63,8 +71,11 @@ extern void lttng_ht_destroy(struct lttng_ht *ht);
 extern void lttng_ht_node_init_str(struct lttng_ht_node_str *node, char *key);
 extern void lttng_ht_node_init_ulong(struct lttng_ht_node_ulong *node,
                unsigned long key);
+extern void lttng_ht_node_init_u64(struct lttng_ht_node_u64 *node,
+               uint64_t key);
 extern void lttng_ht_node_free_str(struct lttng_ht_node_str *node);
 extern void lttng_ht_node_free_ulong(struct lttng_ht_node_ulong *node);
+extern void lttng_ht_node_free_u64(struct lttng_ht_node_ulong *node);
 
 extern void lttng_ht_lookup(struct lttng_ht *ht, void *key,
                struct lttng_ht_iter *iter);
@@ -74,10 +85,16 @@ extern void lttng_ht_add_unique_str(struct lttng_ht *ht,
                struct lttng_ht_node_str *node);
 extern void lttng_ht_add_unique_ulong(struct lttng_ht *ht,
                struct lttng_ht_node_ulong *node);
+extern void lttng_ht_add_unique_u64(struct lttng_ht *ht,
+               struct lttng_ht_node_u64 *node);
 extern struct lttng_ht_node_ulong *lttng_ht_add_replace_ulong(
                struct lttng_ht *ht, struct lttng_ht_node_ulong *node);
+extern struct lttng_ht_node_u64 *lttng_ht_add_replace_u64(
+               struct lttng_ht *ht, struct lttng_ht_node_u64 *node);
 extern void lttng_ht_add_ulong(struct lttng_ht *ht,
                struct lttng_ht_node_ulong *node);
+extern void lttng_ht_add_u64(struct lttng_ht *ht,
+               struct lttng_ht_node_u64 *node);
 
 extern int lttng_ht_del(struct lttng_ht *ht, struct lttng_ht_iter *iter);
 
@@ -91,5 +108,7 @@ extern struct lttng_ht_node_str *lttng_ht_iter_get_node_str(
                struct lttng_ht_iter *iter);
 extern struct lttng_ht_node_ulong *lttng_ht_iter_get_node_ulong(
                struct lttng_ht_iter *iter);
+extern struct lttng_ht_node_u64 *lttng_ht_iter_get_node_u64(
+               struct lttng_ht_iter *iter);
 
 #endif /* _LTT_HT_H */
index 850f9e5db3d905be2f83da28221fc18ab6e589cb..8d0e515aecafbb94a347c418f23bf341f719f71c 100644 (file)
@@ -446,12 +446,8 @@ static uint32_t __attribute__((unused)) hashlittle(const void *key,
        return c;
 }
 
-#if (CAA_BITS_PER_LONG == 64)
-/*
- * Hash function for number value.
- */
 LTTNG_HIDDEN
-unsigned long hash_key_ulong(void *_key, unsigned long seed)
+unsigned long hash_key_u64(void *_key, unsigned long seed)
 {
        union {
                uint64_t v64;
@@ -463,10 +459,21 @@ unsigned long hash_key_ulong(void *_key, unsigned long seed)
        } key;
 
        v.v64 = (uint64_t) seed;
-       key.v64 = (uint64_t) _key;
+       key.v64 = *(uint64_t *) _key;
        hashword2(key.v32, 2, &v.v32[0], &v.v32[1]);
        return v.v64;
 }
+
+#if (CAA_BITS_PER_LONG == 64)
+/*
+ * Hash function for number value.
+ */
+LTTNG_HIDDEN
+unsigned long hash_key_ulong(void *_key, unsigned long seed)
+{
+       uint64_t __key = (uint64_t) _key;
+       return (unsigned long) hash_key_u64(&__key, seed);
+}
 #else
 /*
  * Hash function for number value.
@@ -502,6 +509,19 @@ int hash_match_key_ulong(void *key1, void *key2)
        return 0;
 }
 
+/*
+ * Hash function compare for number value.
+ */
+LTTNG_HIDDEN
+int hash_match_key_u64(void *key1, void *key2)
+{
+       if (*(uint64_t *) key1 == *(uint64_t *) key2) {
+               return 1;
+       }
+
+       return 0;
+}
+
 /*
  * Hash compare function for string.
  */
index 4f0890892ef4b27bf964e3be5b7765fac77ec13e..38d6121e4ce65cd891e7d1ce6d485515596985dd 100644 (file)
 #include <stdint.h>
 
 unsigned long hash_key_ulong(void *_key, unsigned long seed);
+unsigned long hash_key_u64(void *_key, unsigned long seed);
 unsigned long hash_key_str(void *key, unsigned long seed);
 int hash_match_key_ulong(void *key1, void *key2);
+int hash_match_key_u64(void *key1, void *key2);
 int hash_match_key_str(void *key1, void *key2);
 
 #endif /* _LTT_HT_UTILS_H */
index 9d75d3f09cfc9ada6f30503a6c7ddfe306366809..ca1e98be1f359dc08dfba8a41d3c61832ad23d9f 100644 (file)
@@ -128,7 +128,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                }
 
-               DBG("consumer_add_channel %d", msg.u.channel.channel_key);
+               DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
                new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
                                msg.u.channel.session_id, msg.u.channel.pathname,
                                msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid,
@@ -180,7 +180,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                         * We could not find the channel. Can happen if cpu hotplug
                         * happens while tearing down.
                         */
-                       ERR("Unable to find channel key %d", msg.u.stream.channel_key);
+                       ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key);
                        ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
                }
 
@@ -265,8 +265,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                consumer_del_stream(new_stream, NULL);
                                goto end_nosignal;
                        }
-               } else if (new_stream->net_seq_idx != -1) {
-                       ERR("Network sequence index %d unknown. Not adding stream.",
+               } else if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
+                       ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
                                        new_stream->net_seq_idx);
                        consumer_del_stream(new_stream, NULL);
                        goto end_nosignal;
@@ -464,8 +464,8 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                 * network streaming or the full padding (len) size when we are _not_
                 * streaming.
                 */
-               if ((ret != subbuf_size && stream->net_seq_idx != -1) ||
-                               (ret != len && stream->net_seq_idx == -1)) {
+               if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
+                               (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
                        /*
                         * Display the error but continue processing to try to release the
                         * subbuffer
@@ -513,7 +513,7 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
        }
 
        /* Opening the tracefile in write mode */
-       if (stream->net_seq_idx == -1) {
+       if (stream->net_seq_idx == (uint64_t) -1ULL) {
                ret = run_as_open(full_path, O_WRONLY | O_CREAT | O_TRUNC,
                                S_IRWXU|S_IRWXG|S_IRWXO, stream->uid, stream->gid);
                if (ret < 0) {
index 0293482fb966e3426a41248d20541c0903332ac1..60a3eade6171faf18f886078723ed48b2c1cc2f5 100644 (file)
@@ -117,6 +117,8 @@ enum lttcomm_return_code {
        LTTCOMM_CONSUMERD_SPLICE_ENOMEM,            /* ENOMEM from splice(2) */
        LTTCOMM_CONSUMERD_SPLICE_ESPIPE,            /* ESPIPE from splice(2) */
        LTTCOMM_CONSUMERD_ENOMEM,                   /* Consumer is out of memory */
+       LTTCOMM_CONSUMERD_ERROR_METADATA,           /* Error with metadata. */
+       LTTCOMM_CONSUMERD_FATAL,                    /* Fatal error. */
 
        /* MUST be last element */
        LTTCOMM_NR,                                             /* Last element */
@@ -145,7 +147,7 @@ struct lttcomm_sockaddr {
 } LTTNG_PACKED;
 
 struct lttcomm_sock {
-       int fd;
+       int32_t fd;
        enum lttcomm_sock_proto proto;
        struct lttcomm_sockaddr sockaddr;
        const struct lttcomm_proto_ops *ops;
@@ -259,26 +261,26 @@ struct lttcomm_consumer_msg {
        uint32_t cmd_type;      /* enum consumerd_command */
        union {
                struct {
-                       int channel_key;
+                       uint64_t channel_key;
                        uint64_t session_id;
                        char pathname[PATH_MAX];
-                       uid_t uid;
-                       gid_t gid;
-                       int relayd_id;
+                       uint32_t uid;
+                       uint32_t gid;
+                       uint64_t relayd_id;
                        /* nb_init_streams is the number of streams open initially. */
-                       unsigned int nb_init_streams;
+                       uint32_t nb_init_streams;
                        char name[LTTNG_SYMBOL_NAME_LEN];
                        /* Use splice or mmap to consume this fd */
                        enum lttng_event_output output;
                        int type; /* Per cpu or metadata. */
                } LTTNG_PACKED channel; /* Only used by Kernel. */
                struct {
-                       int stream_key;
-                       int channel_key;
-                       int cpu;        /* On which CPU this stream is assigned. */
+                       uint64_t stream_key;
+                       uint64_t channel_key;
+                       int32_t cpu;    /* On which CPU this stream is assigned. */
                } LTTNG_PACKED stream;  /* Only used by Kernel. */
                struct {
-                       int net_index;
+                       uint64_t net_index;
                        enum lttng_stream_type type;
                        /* Open socket to the relayd */
                        struct lttcomm_sock sock;
@@ -294,26 +296,37 @@ struct lttcomm_consumer_msg {
                struct {
                        uint64_t subbuf_size;                   /* bytes */
                        uint64_t num_subbuf;                    /* power of 2 */
-                       int overwrite;                                          /* 1: overwrite, 0: discard */
-                       unsigned int switch_timer_interval;     /* usec */
-                       unsigned int read_timer_interval;       /* usec */
-                       int output;                             /* splice, mmap */
-                       int type;                               /* metadata or per_cpu */
+                       int32_t overwrite;                      /* 1: overwrite, 0: discard */
+                       uint32_t switch_timer_interval;         /* usec */
+                       uint32_t read_timer_interval;           /* usec */
+                       int32_t output;                         /* splice, mmap */
+                       int32_t type;                           /* metadata or per_cpu */
                        uint64_t session_id;                    /* Tracing session id */
                        char pathname[PATH_MAX];                /* Channel file path. */
                        char name[LTTNG_SYMBOL_NAME_LEN];       /* Channel name. */
-                       uid_t uid;                              /* User ID of the session */
-                       gid_t gid;                              /* Group ID ot the session */
-                       int relayd_id;                          /* Relayd id if apply. */
-                       unsigned long key;                                      /* Unique channel key. */
+                       uint32_t uid;                           /* User ID of the session */
+                       uint32_t gid;                           /* Group ID ot the session */
+                       uint64_t relayd_id;                     /* Relayd id if apply. */
+                       uint64_t key;                           /* Unique channel key. */
                        unsigned char uuid[UUID_STR_LEN];       /* uuid for ust tracer. */
                } LTTNG_PACKED ask_channel;
                struct {
-                       unsigned long key;
+                       uint64_t key;
                } LTTNG_PACKED get_channel;
                struct {
-                       unsigned long key;
+                       uint64_t key;
                } LTTNG_PACKED destroy_channel;
+               struct {
+                       uint64_t key;   /* Metadata channel key. */
+                       uint64_t target_offset; /* Offset in the consumer */
+                       uint64_t len;   /* Length of metadata to be received. */
+               } LTTNG_PACKED push_metadata;
+               struct {
+                       uint64_t key;   /* Metadata channel key. */
+               } LTTNG_PACKED close_metadata;
+               struct {
+                       uint64_t key;   /* Metadata channel key. */
+               } LTTNG_PACKED setup_metadata;
        } u;
 } LTTNG_PACKED;
 
@@ -326,7 +339,7 @@ struct lttcomm_consumer_status_msg {
 
 struct lttcomm_consumer_status_channel {
        enum lttng_error_code ret_code;
-       unsigned long key;
+       uint64_t key;
        unsigned int stream_count;
 } LTTNG_PACKED;
 
index 442925754c634ec4d067382b5cdd7ed8f35b385e..a6a4f1a91735911633a8d85d098e63cd01425417 100644 (file)
@@ -98,7 +98,7 @@ static int add_channel(struct lttng_consumer_channel *channel,
                ret = consumer_add_channel(channel);
        }
 
-       DBG("UST consumer channel added (key: %u)", channel->key);
+       DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key);
 
 error:
        return ret;
@@ -109,7 +109,7 @@ error:
  */
 static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
                const char *pathname, const char *name, uid_t uid, gid_t gid,
-               int relayd_id, unsigned long key, enum lttng_event_output output)
+               int relayd_id, uint64_t key, enum lttng_event_output output)
 {
        assert(pathname);
        assert(name);
@@ -223,8 +223,8 @@ static int send_stream_to_relayd(struct lttng_consumer_stream *stream)
                if (ret < 0) {
                        goto error;
                }
-       } else if (stream->net_seq_idx != -1) {
-               ERR("Network sequence index %d unknown. Not adding stream.",
+       } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
+               ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
                                stream->net_seq_idx);
                ret = -1;
                goto error;
@@ -234,6 +234,11 @@ error:
        return ret;
 }
 
+/*
+ * Create streams for the given channel using liblttng-ust-ctl.
+ *
+ * Return 0 on success else a negative value.
+ */
 static int create_ust_streams(struct lttng_consumer_channel *channel,
                struct lttng_consumer_local_data *ctx)
 {
@@ -288,11 +293,16 @@ static int create_ust_streams(struct lttng_consumer_channel *channel,
                        }
                }
 
-               DBG("UST consumer add stream %s (key: %d) with relayd id %" PRIu64,
+               DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64,
                                stream->name, stream->key, stream->relayd_stream_id);
 
                /* Set next CPU stream. */
                channel->streams.count = ++cpu;
+
+               /* Keep stream reference when creating metadata. */
+               if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
+                       channel->metadata_stream = stream;
+               }
        }
 
        return 0;
@@ -338,6 +348,11 @@ error_create:
        return ret;
 }
 
+/*
+ * Send a single given stream to the session daemon using the sock.
+ *
+ * Return 0 on success else a negative value.
+ */
 static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
 {
        int ret;
@@ -345,7 +360,7 @@ static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
        assert(stream);
        assert(sock >= 0);
 
-       DBG2("UST consumer sending stream %d to sessiond", stream->key);
+       DBG2("UST consumer sending stream %" PRIu64 " to sessiond", stream->key);
 
        /* Send stream to session daemon. */
        ret = ustctl_send_stream_to_sessiond(sock, stream->ustream);
@@ -365,8 +380,7 @@ error:
 /*
  * Send channel to sessiond.
  *
- * Return 0 on success or else a negative value. On error, the channel is
- * destroy using ustctl.
+ * Return 0 on success or else a negative value.
  */
 static int send_sessiond_channel(int sock,
                struct lttng_consumer_channel *channel,
@@ -472,6 +486,155 @@ error:
        return ret;
 }
 
+/*
+ * Send all stream of a channel to the right thread handling it.
+ *
+ * On error, return a negative value else 0 on success.
+ */
+static int send_streams_to_thread(struct lttng_consumer_channel *channel,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret = 0;
+       struct lttng_consumer_stream *stream, *stmp;
+
+       assert(channel);
+       assert(ctx);
+
+       /* Send streams to the corresponding thread. */
+       cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
+                       send_node) {
+               /* Sending the stream to the thread. */
+               ret = send_stream_to_thread(stream, ctx);
+               if (ret < 0) {
+                       /*
+                        * If we are unable to send the stream to the thread, there is
+                        * a big problem so just stop everything.
+                        */
+                       goto error;
+               }
+
+               /* Remove node from the channel stream list. */
+               cds_list_del(&stream->send_node);
+       }
+
+error:
+       return ret;
+}
+
+/*
+ * Write metadata to the given channel using ustctl to convert the string to
+ * the ringbuffer.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int push_metadata(struct lttng_consumer_channel *metadata,
+               const char *metadata_str, uint64_t target_offset, uint64_t len)
+{
+       int ret;
+
+       assert(metadata);
+       assert(metadata_str);
+
+       DBG("UST consumer writing metadata to channel %s", metadata->name);
+
+       assert(target_offset == metadata->contig_metadata_written);
+       ret = ustctl_write_metadata_to_channel(metadata->uchan, metadata_str, len);
+       if (ret < 0) {
+               ERR("ustctl write metadata fail with ret %d, len %ld", ret, len);
+               goto error;
+       }
+       metadata->contig_metadata_written += len;
+
+       ustctl_flush_buffer(metadata->metadata_stream->ustream, 1);
+
+error:
+       return ret;
+}
+
+/*
+ * Close metadata stream wakeup_fd using the given key to retrieve the channel.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int close_metadata(uint64_t chan_key)
+{
+       int ret;
+       struct lttng_consumer_channel *channel;
+
+       DBG("UST consumer close metadata key %lu", chan_key);
+
+       channel = consumer_find_channel(chan_key);
+       if (!channel) {
+               ERR("UST consumer close metadata %lu not found", chan_key);
+               ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+               goto error;
+       }
+
+       ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
+       if (ret < 0) {
+               ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
+               ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+               goto error;
+       }
+
+error:
+       return ret;
+}
+
+/*
+ * RCU read side lock MUST be acquired before calling this function.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
+{
+       int ret;
+       struct lttng_consumer_channel *metadata;
+
+       DBG("UST consumer setup metadata key %lu", key);
+
+       metadata = consumer_find_channel(key);
+       if (!metadata) {
+               ERR("UST consumer push metadata %" PRIu64 " not found", key);
+               ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+               goto error;
+       }
+
+       /*
+        * Send metadata stream to relayd if one available. Availability is
+        * known if the stream is still in the list of the channel.
+        */
+       if (cds_list_empty(&metadata->streams.head)) {
+               ERR("Metadata channel key %" PRIu64 ", no stream available.", key);
+               ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+               goto error;
+       }
+
+       /* Send metadata stream to relayd if needed. */
+       ret = send_stream_to_relayd(metadata->metadata_stream);
+       if (ret < 0) {
+               ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+               goto error;
+       }
+
+       ret = send_streams_to_thread(metadata, ctx);
+       if (ret < 0) {
+               /*
+                * If we are unable to send the stream to the thread, there is
+                * a big problem so just stop everything.
+                */
+               ret = LTTCOMM_CONSUMERD_FATAL;
+               goto error;
+       }
+       /* List MUST be empty after or else it could be reused. */
+       assert(cds_list_empty(&metadata->streams.head));
+
+       ret = 0;
+
+error:
+       return ret;
+}
+
 /*
  * Receive command from session daemon and process it.
  *
@@ -548,13 +711,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        consumer_flag_relayd_for_destroy(relayd);
                }
 
-               ret = consumer_send_status_msg(sock, ret_code);
-               if (ret < 0) {
-                       /* Somehow, the session daemon is not responding anymore. */
-                       goto end_nosignal;
-               }
-
-               goto end_nosignal;
+               goto end_msg_sessiond;
        }
        case LTTNG_CONSUMER_UPDATE_STREAM:
        {
@@ -665,9 +822,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        case LTTNG_CONSUMER_GET_CHANNEL:
        {
                int ret, relayd_err = 0;
-               unsigned long key = msg.u.get_channel.key;
+               uint64_t key = msg.u.get_channel.key;
                struct lttng_consumer_channel *channel;
-               struct lttng_consumer_stream *stream, *stmp;
 
                channel = consumer_find_channel(key);
                if (!channel) {
@@ -702,58 +858,108 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto error_fatal;
                }
 
-               /* Send streams to the corresponding thread. */
-               cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
-                               send_node) {
-                       /* Sending the stream to the thread. */
-                       ret = send_stream_to_thread(stream, ctx);
-                       if (ret < 0) {
-                               /*
-                                * If we are unable to send the stream to the thread, there is
-                                * a big problem so just stop everything.
-                                */
-                               goto error_fatal;
-                       }
-
-                       /* Remove node from the channel stream list. */
-                       cds_list_del(&stream->send_node);
+               ret = send_streams_to_thread(channel, ctx);
+               if (ret < 0) {
+                       /*
+                        * If we are unable to send the stream to the thread, there is
+                        * a big problem so just stop everything.
+                        */
+                       goto error_fatal;
                }
-
                /* List MUST be empty after or else it could be reused. */
                assert(cds_list_empty(&channel->streams.head));
 
-               /* Inform sessiond that everything is done and OK on our side. */
-               ret = consumer_send_status_msg(sock, LTTNG_OK);
-               if (ret < 0) {
-                       /* Somehow, the session daemon is not responding anymore. */
-                       goto end_nosignal;
+               goto end_msg_sessiond;
+       }
+       case LTTNG_CONSUMER_DESTROY_CHANNEL:
+       {
+               uint64_t key = msg.u.destroy_channel.key;
+               struct lttng_consumer_channel *channel;
+
+               channel = consumer_find_channel(key);
+               if (!channel) {
+                       ERR("UST consumer get channel key %lu not found", key);
+                       ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+                       goto end_msg_sessiond;
                }
 
-               break;
+               destroy_channel(channel);
+
+               goto end_msg_sessiond;
        }
-       case LTTNG_CONSUMER_DESTROY_CHANNEL:
+       case LTTNG_CONSUMER_CLOSE_METADATA:
+       {
+               int ret;
+
+               ret = close_metadata(msg.u.close_metadata.key);
+               if (ret != 0) {
+                       ret_code = ret;
+               }
+
+               goto end_msg_sessiond;
+       }
+       case LTTNG_CONSUMER_PUSH_METADATA:
        {
                int ret;
-               unsigned long key = msg.u.destroy_channel.key;
+               uint64_t len = msg.u.push_metadata.len;
+               uint64_t target_offset = msg.u.push_metadata.target_offset;
+               uint64_t key = msg.u.push_metadata.key;
                struct lttng_consumer_channel *channel;
+               char *metadata_str;
 
-               DBG("UST consumer destroy channel key %lu", key);
+               DBG("UST consumer push metadata key %lu of len %lu", key, len);
 
                channel = consumer_find_channel(key);
                if (!channel) {
-                       ERR("UST consumer destroy channel %lu not found", key);
+                       ERR("UST consumer push metadata %lu not found", key);
                        ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
-               } else {
-                       /* Protocol error if the stream list is NOT empty. */
-                       assert(!cds_list_empty(&channel->streams.head));
-                       consumer_del_channel(channel);
                }
 
+               metadata_str = zmalloc(len * sizeof(char));
+               if (!metadata_str) {
+                       PERROR("zmalloc metadata string");
+                       ret_code = LTTCOMM_CONSUMERD_ENOMEM;
+                       goto end_msg_sessiond;
+               }
+
+               /* Tell session daemon we are ready to receive the metadata. */
                ret = consumer_send_status_msg(sock, LTTNG_OK);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
+                       goto error_fatal;
+               }
+
+               /* Wait for more data. */
+               if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+                       goto end_nosignal;
+               }
+
+               /* Receive metadata string. */
+               ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
+               if (ret < 0) {
+                       /* Session daemon is dead so return gracefully. */
                        goto end_nosignal;
                }
+
+               ret = push_metadata(channel, metadata_str, target_offset, len);
+               free(metadata_str);
+               if (ret < 0) {
+                       /* Unable to handle metadata. Notify session daemon. */
+                       ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
+                       goto end_msg_sessiond;
+               }
+
+               goto end_msg_sessiond;
+       }
+       case LTTNG_CONSUMER_SETUP_METADATA:
+       {
+               int ret;
+
+               ret = setup_metadata(ctx, msg.u.setup_metadata.key);
+               if (ret) {
+                       ret_code = ret;
+               }
+               goto end_msg_sessiond;
        }
        default:
                break;
@@ -945,8 +1151,8 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
         * The mmap operation should write subbuf_size amount of data when network
         * streaming or the full padding (len) size when we are _not_ streaming.
         */
-       if ((ret != subbuf_size && stream->net_seq_idx != -1) ||
-                       (ret != len && stream->net_seq_idx == -1)) {
+       if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
+                       (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
                /*
                 * Display the error but continue processing to try to release the
                 * subbuffer. This is a DBG statement since any unexpected kill or
@@ -974,7 +1180,7 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
        char full_path[PATH_MAX];
 
        /* Opening the tracefile in write mode */
-       if (stream->net_seq_idx != -1) {
+       if (stream->net_seq_idx != (uint64_t) -1ULL) {
                goto end;
        }
 
@@ -1033,3 +1239,41 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
 end:
        return ret;
 }
+
+/*
+ * Close every metadata stream wait fd of the metadata hash table. This
+ * function MUST be used very carefully so not to run into a race between the
+ * metadata thread handling streams and this function closing their wait fd.
+ *
+ * For UST, this is used when the session daemon hangs up. Its the metadata
+ * producer so calling this is safe because we are assured that no state change
+ * can occur in the metadata thread for the streams in the hash table.
+ */
+void lttng_ustconsumer_close_metadata(struct lttng_ht *metadata_ht)
+{
+       int ret;
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+
+       assert(metadata_ht);
+       assert(metadata_ht->ht);
+
+       DBG("UST consumer closing all metadata streams");
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream,
+                       node.node) {
+               int fd = stream->wait_fd;
+
+               /*
+                * Whatever happens here we have to continue to try to close every
+                * streams. Let's report at least the error on failure.
+                */
+               ret = ustctl_stream_close_wakeup_fd(stream->ustream);
+               if (ret) {
+                       ERR("Unable to close metadata stream fd %d ret %d", fd, ret);
+               }
+               DBG("Metadata wait fd %d closed", fd);
+       }
+       rcu_read_unlock();
+}
index 009fa5e5901bcfaa6cd3d3b3d59ba354cb05f06a..2c3c0e8703427fba29c37695d7e7d6b02c6b493f 100644 (file)
@@ -49,6 +49,7 @@ int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream,
                unsigned long *off);
 void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream);
 int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream);
+void lttng_ustconsumer_close_metadata(struct lttng_ht *ht);
 
 #else /* HAVE_LIBLTTNG_UST_CTL */
 
@@ -146,6 +147,10 @@ void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream)
 {
        return NULL;
 }
+static inline
+void lttng_ustconsumer_close_metadata(struct lttng_ht *ht)
+{
+}
 #endif /* HAVE_LIBLTTNG_UST_CTL */
 
 #endif /* _LTTNG_USTCONSUMER_H */
This page took 0.086869 seconds and 4 git commands to generate.