Channel rotate pipe between sessiond and the consumers
authorJulien Desfossez <jdesfossez@efficios.com>
Wed, 13 Dec 2017 18:24:34 +0000 (13:24 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Mon, 12 Mar 2018 16:03:29 +0000 (12:03 -0400)
This new pipe is used by the consumers to inform the session daemon (in
the rotation_thread) that it has finished the rotation of a channel. In
this patch, we only setup the pipe between the daemons, it is not yet in
use.

Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/main.c
src/common/consumer/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c

index 8727ce3776a1d85d80a9bdef7ae09110cedb256e..35d1b8aa2d9b5aaa31d217f790ec1655fa1520c0 100644 (file)
@@ -1064,37 +1064,70 @@ error:
        return ret;
 }
 
-int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock,
-               int pipe)
+static
+int consumer_send_pipe(struct consumer_socket *consumer_sock,
+               enum lttng_consumer_command cmd, int pipe)
 {
        int ret;
        struct lttcomm_consumer_msg msg;
+       const char *pipe_name;
+       const char *command_name;
+
+       switch (cmd) {
+       case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
+               pipe_name = "channel monitor";
+               command_name = "SET_CHANNEL_MONITOR_PIPE";
+               break;
+       case LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE:
+               pipe_name = "channel rotate";
+               command_name = "SET_CHANNEL_ROTATE_PIPE";
+               break;
+       default:
+               ERR("Unexpected command received in %s (cmd = %d)", __func__,
+                               (int) cmd);
+               abort();
+       }
 
        /* Code flow error. Safety net. */
 
        memset(&msg, 0, sizeof(msg));
-       msg.cmd_type = LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE;
+       msg.cmd_type = cmd;
 
        pthread_mutex_lock(consumer_sock->lock);
-       DBG3("Sending set_channel_monitor_pipe command to consumer");
+       DBG3("Sending %s command to consumer", command_name);
        ret = consumer_send_msg(consumer_sock, &msg);
        if (ret < 0) {
                goto error;
        }
 
-       DBG3("Sending channel monitoring pipe %d to consumer on socket %d",
+       DBG3("Sending %s pipe %d to consumer on socket %d",
+                       pipe_name,
                        pipe, *consumer_sock->fd_ptr);
        ret = consumer_send_fds(consumer_sock, &pipe, 1);
        if (ret < 0) {
                goto error;
        }
 
-       DBG2("Channel monitoring pipe successfully sent");
+       DBG2("%s pipe successfully sent", pipe_name);
 error:
        pthread_mutex_unlock(consumer_sock->lock);
        return ret;
 }
 
+int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock,
+               int pipe)
+{
+       return consumer_send_pipe(consumer_sock,
+                       LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE, pipe);
+}
+
+int consumer_send_channel_rotate_pipe(struct consumer_socket *consumer_sock,
+               int pipe)
+{
+       return consumer_send_pipe(consumer_sock,
+                       LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE, pipe);
+}
+
 /*
  * Set consumer subdirectory using the session name and a generated datetime if
  * needed. This is appended to the current subdirectory.
index 691aad9f6a1dd7f0992c16147c4dcc55f1bb4f2a..36e7c83d5bbaab2c6f2397461756327ce5d1fc29 100644 (file)
@@ -98,6 +98,11 @@ struct consumer_data {
         * consumer.
         */
        int channel_monitor_pipe;
+       /*
+        * Write-end of the channel rotation pipe to be passed to the
+        * consumer.
+        */
+       int channel_rotate_pipe;
        /*
         * The metadata socket object is handled differently and only created
         * locally in this object thus it's the only reference available in the
@@ -231,6 +236,8 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
                char *session_name, char *hostname, int session_live_timer);
 int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock,
                int pipe);
+int consumer_send_channel_rotate_pipe(struct consumer_socket *consumer_sock,
+               int pipe);
 int consumer_send_destroy_relayd(struct consumer_socket *sock,
                struct consumer_output *consumer);
 int consumer_recv_status_reply(struct consumer_socket *sock);
index a6c45c2ed90942e03ba45a36f79a9ca6af538112..532e345c08ded85521bce34437d911d58e6156ab 100644 (file)
@@ -106,6 +106,7 @@ static struct consumer_data kconsumer_data = {
        .err_sock = -1,
        .cmd_sock = -1,
        .channel_monitor_pipe = -1,
+       .channel_rotate_pipe = -1,
        .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
        .lock = PTHREAD_MUTEX_INITIALIZER,
        .cond = PTHREAD_COND_INITIALIZER,
@@ -116,6 +117,7 @@ static struct consumer_data ustconsumer64_data = {
        .err_sock = -1,
        .cmd_sock = -1,
        .channel_monitor_pipe = -1,
+       .channel_rotate_pipe = -1,
        .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
        .lock = PTHREAD_MUTEX_INITIALIZER,
        .cond = PTHREAD_COND_INITIALIZER,
@@ -126,6 +128,7 @@ static struct consumer_data ustconsumer32_data = {
        .err_sock = -1,
        .cmd_sock = -1,
        .channel_monitor_pipe = -1,
+       .channel_rotate_pipe = -1,
        .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
        .lock = PTHREAD_MUTEX_INITIALIZER,
        .cond = PTHREAD_COND_INITIALIZER,
@@ -471,6 +474,24 @@ static void close_consumer_sockets(void)
                        PERROR("UST consumerd64 channel monitor pipe close");
                }
        }
+       if (kconsumer_data.channel_rotate_pipe >= 0) {
+               ret = close(kconsumer_data.channel_rotate_pipe);
+               if (ret < 0) {
+                       PERROR("kernel consumer channel rotate pipe close");
+               }
+       }
+       if (ustconsumer32_data.channel_rotate_pipe >= 0) {
+               ret = close(ustconsumer32_data.channel_rotate_pipe);
+               if (ret < 0) {
+                       PERROR("UST consumerd32 channel rotate pipe close");
+               }
+       }
+       if (ustconsumer64_data.channel_rotate_pipe >= 0) {
+               ret = close(ustconsumer64_data.channel_rotate_pipe);
+               if (ret < 0) {
+                       PERROR("UST consumerd64 channel rotate pipe close");
+               }
+       }
 }
 
 /*
@@ -1266,8 +1287,9 @@ restart:
        health_code_update();
 
        /*
-        * Transfer the write-end of the channel monitoring pipe to the
-        * by issuing a SET_CHANNEL_MONITOR_PIPE command.
+        * Transfer the write-end of the channel monitoring and rotate pipe
+        * to the consumer by issuing a SET_CHANNEL_MONITOR_PIPE and
+        * SET_CHANNEL_ROTATE_PIPE commands.
         */
        cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
        if (!cmd_socket_wrapper) {
@@ -1280,6 +1302,13 @@ restart:
        if (ret) {
                goto error;
        }
+
+       ret = consumer_send_channel_rotate_pipe(cmd_socket_wrapper,
+                       consumer_data->channel_rotate_pipe);
+       if (ret) {
+               goto error;
+       }
+
        /* Discard the socket wrapper as it is no longer needed. */
        consumer_destroy_socket(cmd_socket_wrapper);
        cmd_socket_wrapper = NULL;
@@ -5447,6 +5476,9 @@ int main(int argc, char **argv)
                        *ust64_channel_monitor_pipe = NULL,
                        *kernel_channel_monitor_pipe = NULL;
        bool notification_thread_running = false;
+       struct lttng_pipe *ust32_channel_rotate_pipe = NULL,
+                       *ust64_channel_rotate_pipe = NULL,
+                       *kernel_channel_rotate_pipe = NULL;
 
        init_kernel_workarounds();
 
@@ -5594,6 +5626,19 @@ int main(int argc, char **argv)
                        retval = -1;
                        goto exit_init_data;
                }
+               kernel_channel_rotate_pipe = lttng_pipe_open(0);
+               if (!kernel_channel_rotate_pipe) {
+                       ERR("Failed to create kernel consumer channel rotate pipe");
+                       retval = -1;
+                       goto exit_init_data;
+               }
+               kconsumer_data.channel_rotate_pipe =
+                               lttng_pipe_release_writefd(
+                                       kernel_channel_rotate_pipe);
+               if (kconsumer_data.channel_rotate_pipe < 0) {
+                       retval = -1;
+                       goto exit_init_data;
+               }
        }
 
        lockfile_fd = create_lockfile();
@@ -5618,6 +5663,19 @@ int main(int argc, char **argv)
                retval = -1;
                goto exit_init_data;
        }
+       ust32_channel_rotate_pipe = lttng_pipe_open(0);
+       if (!ust32_channel_rotate_pipe) {
+               ERR("Failed to create 32-bit user space consumer channel rotate pipe");
+               retval = -1;
+               goto exit_init_data;
+       }
+       ustconsumer32_data.channel_rotate_pipe = lttng_pipe_release_writefd(
+                       ust32_channel_rotate_pipe);
+       if (ustconsumer32_data.channel_rotate_pipe < 0) {
+               retval = -1;
+               goto exit_init_data;
+       }
+
 
        ust64_channel_monitor_pipe = lttng_pipe_open(0);
        if (!ust64_channel_monitor_pipe) {
@@ -5631,6 +5689,18 @@ int main(int argc, char **argv)
                retval = -1;
                goto exit_init_data;
        }
+       ust64_channel_rotate_pipe = lttng_pipe_open(0);
+       if (!ust64_channel_rotate_pipe) {
+               ERR("Failed to create 64-bit user space consumer channel rotate pipe");
+               retval = -1;
+               goto exit_init_data;
+       }
+       ustconsumer64_data.channel_rotate_pipe = lttng_pipe_release_writefd(
+                       ust64_channel_rotate_pipe);
+       if (ustconsumer64_data.channel_rotate_pipe < 0) {
+               retval = -1;
+               goto exit_init_data;
+       }
 
        /*
         * See if daemon already exist.
@@ -6049,6 +6119,9 @@ exit_init_data:
        lttng_pipe_destroy(ust32_channel_monitor_pipe);
        lttng_pipe_destroy(ust64_channel_monitor_pipe);
        lttng_pipe_destroy(kernel_channel_monitor_pipe);
+       lttng_pipe_destroy(ust32_channel_rotate_pipe);
+       lttng_pipe_destroy(ust64_channel_rotate_pipe);
+       lttng_pipe_destroy(kernel_channel_rotate_pipe);
 exit_ht_cleanup:
 
        health_app_destroy(health_sessiond);
index 8a780e7f835eb49abd721c027e4c6f1dc6df9a54..1187942577366f8f061ea1795b67b5e5e6e3ce0c 100644 (file)
@@ -62,6 +62,7 @@ enum lttng_consumer_command {
        LTTNG_CONSUMER_LOST_PACKETS,
        LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL,
        LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE,
+       LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE,
        LTTNG_CONSUMER_ROTATE_RENAME,
        LTTNG_CONSUMER_MKDIR,
 };
@@ -565,6 +566,11 @@ struct lttng_consumer_local_data {
         * to the session daemon (write-only).
         */
        int channel_monitor_pipe;
+       /*
+        * Pipe used to inform the session daemon that a stream has finished
+        * its rotation (write-only).
+        */
+       int channel_rotate_pipe;
 };
 
 /*
index 539d80b0b4a32223054a08be4116fda25f7986eb..f7704b46564a63273dabe03b9e181b4846ed4292 100644 (file)
@@ -1078,6 +1078,47 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                break;
        }
+       case LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE:
+       {
+               int channel_rotate_pipe;
+               int flags;
+
+               ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+               /* Successfully received the command's type. */
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       goto error_fatal;
+               }
+
+               ret = lttcomm_recv_fds_unix_sock(sock, &channel_rotate_pipe, 1);
+               if (ret != (ssize_t) sizeof(channel_rotate_pipe)) {
+                       ERR("Failed to receive channel rotate pipe");
+                       goto error_fatal;
+               }
+
+               DBG("Received channel rotate pipe (%d)", channel_rotate_pipe);
+               ctx->channel_rotate_pipe = channel_rotate_pipe;
+               /* Set the pipe as non-blocking. */
+               ret = fcntl(channel_rotate_pipe, F_GETFL, 0);
+               if (ret == -1) {
+                       PERROR("fcntl get flags of the channel rotate pipe");
+                       goto error_fatal;
+               }
+               flags = ret;
+
+               ret = fcntl(channel_rotate_pipe, F_SETFL, flags | O_NONBLOCK);
+               if (ret == -1) {
+                       PERROR("fcntl set O_NONBLOCK flag of the channel rotate pipe");
+                       goto error_fatal;
+               }
+               DBG("Channel rotate pipe set as non-blocking");
+               ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       goto error_fatal;
+               }
+               break;
+       }
        case LTTNG_CONSUMER_ROTATE_RENAME:
        {
                DBG("Consumer rename session %" PRIu64 " after rotation, old path = \"%s\", new path = \"%s\"",
index 775cb1766f62e4a356cf49ca739351c4cedc7dac..5e1ff896c4e37102bf895819717eda89aefbbba3 100644 (file)
@@ -1918,6 +1918,47 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                goto end_msg_sessiond;
        }
+       case LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE:
+       {
+               int channel_rotate_pipe;
+               int flags;
+
+               ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+               /* Successfully received the command's type. */
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       goto error_fatal;
+               }
+
+               ret = lttcomm_recv_fds_unix_sock(sock, &channel_rotate_pipe, 1);
+               if (ret != sizeof(channel_rotate_pipe)) {
+                       ERR("Failed to receive channel rotate pipe");
+                       goto error_fatal;
+               }
+
+               DBG("Received channel rotate pipe (%d)", channel_rotate_pipe);
+               ctx->channel_rotate_pipe = channel_rotate_pipe;
+               /* Set the pipe as non-blocking. */
+               ret = fcntl(channel_rotate_pipe, F_GETFL, 0);
+               if (ret == -1) {
+                       PERROR("fcntl get flags of the channel rotate pipe");
+                       goto error_fatal;
+               }
+               flags = ret;
+
+               ret = fcntl(channel_rotate_pipe, F_SETFL, flags | O_NONBLOCK);
+               if (ret == -1) {
+                       PERROR("fcntl set O_NONBLOCK flag of the channel rotate pipe");
+                       goto error_fatal;
+               }
+               DBG("Channel rotate pipe set as non-blocking");
+               ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       goto error_fatal;
+               }
+               break;
+       }
        case LTTNG_CONSUMER_ROTATE_RENAME:
        {
                DBG("Consumer rename session %" PRIu64 " after rotation",
This page took 0.031723 seconds and 4 git commands to generate.