relayd: add remote trace chunk creation command
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 16 Jul 2019 19:31:36 +0000 (15:31 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 25 Jul 2019 19:51:47 +0000 (15:51 -0400)
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/main.c
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/session.c
src/common/consumer/consumer.c
src/common/kernel-consumer/kernel-consumer.c
src/common/relayd/relayd.c
src/common/relayd/relayd.h
src/common/sessiond-comm/relayd.h
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c

index 85c4fe9b9aca282ca2d35af5bfa18c910c261b55..11277f29346e99104cfbedd6453ace99a76493f6 100644 (file)
@@ -2534,8 +2534,6 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr
        size_t path_len;
        struct lttng_buffer_view new_path_view;
 
-       DBG("Rotate stream received");
-
        if (!session || !conn->version_check_done) {
                ERR("Trying to rotate a stream before version check");
                ret = -1;
@@ -2662,6 +2660,199 @@ end_no_reply:
        return ret;
 }
 
+static int init_session_output_directory_handle(struct relay_session *session,
+               struct lttng_directory_handle *handle)
+{
+       int ret;
+       /* hostname/session_name */
+       char *session_directory = NULL;
+       /*
+        * base path + session_directory
+        * e.g. /home/user/lttng-traces/hostname/session_name
+        */
+       char *full_session_path = NULL;
+
+       pthread_mutex_lock(&session->lock);
+       ret = asprintf(&session_directory, "%s/%s", session->hostname,
+                       session->session_name);
+       pthread_mutex_unlock(&session->lock);
+       if (ret < 0) {
+               PERROR("Failed to format session directory name");
+               goto end;
+       }
+
+       full_session_path = create_output_path(session_directory);
+       if (!full_session_path) {
+               ret = -1;
+               goto end;
+       }
+
+       ret = utils_mkdir_recursive(
+                       full_session_path, S_IRWXU | S_IRWXG, -1, -1);
+       if (ret) {
+               ERR("Failed to create session output path \"%s\"",
+                               full_session_path);
+               goto end;
+       }
+
+       ret = lttng_directory_handle_init(handle, full_session_path);
+       if (ret) {
+               goto end;
+       }
+end:
+       free(session_directory);
+       free(full_session_path);
+       return ret;
+}
+
+/*
+ * relay_create_trace_chunk: create a new trace chunk
+ */
+static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_connection *conn,
+               const struct lttng_buffer_view *payload)
+{
+       int ret = 0;
+       ssize_t send_ret;
+       struct relay_session *session = conn->session;
+       struct lttcomm_relayd_create_trace_chunk *msg;
+       struct lttcomm_relayd_generic_reply reply = {};
+       struct lttng_buffer_view header_view;
+       struct lttng_buffer_view chunk_name_view;
+       struct lttng_trace_chunk *chunk = NULL, *published_chunk = NULL;
+       enum lttng_error_code reply_code = LTTNG_OK;
+       enum lttng_trace_chunk_status chunk_status;
+       struct lttng_directory_handle session_output;
+
+       if (!session || !conn->version_check_done) {
+               ERR("Trying to create a trace chunk before version check");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       if (session->major == 2 && session->minor < 11) {
+               ERR("Chunk creation command is unsupported before 2.11");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg));
+       if (!header_view.data) {
+               ERR("Failed to receive payload of chunk creation command");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       /* Convert to host endianness. */
+       msg = (typeof(msg)) header_view.data;
+       msg->chunk_id = be64toh(msg->chunk_id);
+       msg->creation_timestamp = be64toh(msg->creation_timestamp);
+       msg->override_name_length = be32toh(msg->override_name_length);
+
+       chunk = lttng_trace_chunk_create(
+                       msg->chunk_id, msg->creation_timestamp);
+       if (!chunk) {
+               ERR("Failed to create trace chunk in trace chunk creation command");
+               ret = -1;
+               reply_code = LTTNG_ERR_NOMEM;
+               goto end;
+       }
+
+       if (msg->override_name_length) {
+               const char *name;
+
+               chunk_name_view = lttng_buffer_view_from_view(payload,
+                               sizeof(*msg),
+                               msg->override_name_length);
+               name = chunk_name_view.data;
+               if (!name || name[msg->override_name_length - 1]) {
+                       ERR("Failed to receive payload of chunk creation command");
+                       ret = -1;
+                       reply_code = LTTNG_ERR_INVALID;
+                       goto end;
+               }
+
+               chunk_status = lttng_trace_chunk_override_name(
+                               chunk, chunk_name_view.data);
+               switch (chunk_status) {
+               case LTTNG_TRACE_CHUNK_STATUS_OK:
+                       break;
+               case LTTNG_TRACE_CHUNK_STATUS_INVALID_ARGUMENT:
+                       ERR("Failed to set the name of new trace chunk in trace chunk creation command (invalid name)");
+                       reply_code = LTTNG_ERR_INVALID;
+                       ret = -1;
+                       goto end;
+               default:
+                       ERR("Failed to set the name of new trace chunk in trace chunk creation command (unknown error)");
+                       reply_code = LTTNG_ERR_UNK;
+                       ret = -1;
+                       goto end;
+               }
+       }
+
+       ret = init_session_output_directory_handle(
+                       conn->session, &session_output);
+       if (ret) {
+               reply_code = LTTNG_ERR_CREATE_DIR_FAIL;
+               goto end;
+       }
+
+       chunk_status = lttng_trace_chunk_set_credentials_current_user(chunk);
+       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               reply_code = LTTNG_ERR_UNK;
+               ret = -1;
+               goto end;
+       }
+
+       chunk_status = lttng_trace_chunk_set_as_owner(chunk, &session_output);
+       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               reply_code = LTTNG_ERR_UNK;
+               ret = -1;
+               goto end;
+       }
+
+       published_chunk = sessiond_trace_chunk_registry_publish_chunk(
+                       sessiond_trace_chunk_registry,
+                       conn->session->sessiond_uuid,
+                       conn->session->id,
+                       chunk);
+       if (!published_chunk) {
+               char uuid_str[UUID_STR_LEN];
+
+               lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str);
+               ERR("Failed to publish chunk: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+                               uuid_str,
+                               conn->session->id,
+                               msg->chunk_id);
+               ret = -1;
+               reply_code = LTTNG_ERR_NOMEM;
+               goto end;
+       }
+
+       pthread_mutex_lock(&conn->session->lock);
+       lttng_trace_chunk_put(conn->session->current_trace_chunk);
+       conn->session->current_trace_chunk = published_chunk;
+       pthread_mutex_unlock(&conn->session->lock);
+       published_chunk = NULL;
+
+end:
+       reply.ret_code = htobe32((uint32_t) reply_code);
+       send_ret = conn->sock->ops->sendmsg(conn->sock,
+                       &reply,
+                       sizeof(struct lttcomm_relayd_generic_reply),
+                       0);
+       if (send_ret < (ssize_t) sizeof(reply)) {
+               ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)",
+                               send_ret);
+               ret = -1;
+       }
+end_no_reply:
+       lttng_trace_chunk_put(chunk);
+       lttng_trace_chunk_put(published_chunk);
+       lttng_directory_handle_fini(&session_output);
+       return ret;
+}
+
 #define DBG_CMD(cmd_name, conn) \
                DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd);
 
@@ -2728,6 +2919,10 @@ static int relay_process_control_command(struct relay_connection *conn,
                DBG_CMD("RELAYD_ROTATE_STREAM", conn);
                ret = relay_rotate_session_stream(header, conn, payload);
                break;
+       case RELAYD_CREATE_TRACE_CHUNK:
+               DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn);
+               ret = relay_create_trace_chunk(header, conn, payload);
+               break;
        case RELAYD_UPDATE_SYNC_INFO:
        default:
                ERR("Received unknown command (%u)", header->cmd);
index 8ff002cea1598013d9c460c11e07ba0d3e32c598..83c30be86edfee4c2d1443b6ef77ae1f44808887 100644 (file)
@@ -1772,7 +1772,6 @@ int consumer_create_trace_chunk(struct consumer_socket *socket,
                goto error;
        }
        msg.u.create_trace_chunk.chunk_id = chunk_id;
-       /* Only used for logging purposes. */
 
        if (chunk_has_local_output) {
                chunk_status = lttng_trace_chunk_get_chunk_directory_handle(
@@ -1793,20 +1792,23 @@ int consumer_create_trace_chunk(struct consumer_socket *socket,
                chunk_dirfd = lttng_directory_handle_get_dirfd(
                                chunk_directory_handle);
                assert(chunk_dirfd >= 0);
-       }
 
-       chunk_status = lttng_trace_chunk_get_credentials(chunk,
-                       &chunk_credentials);
-       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
-               /*
-                * Not associating credentials to a sessiond chunk is a fatal
-                * internal error.
-                */
-               ret = -LTTNG_ERR_FATAL;
-               goto error;
+               chunk_status = lttng_trace_chunk_get_credentials(
+                               chunk, &chunk_credentials);
+               if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+                       /*
+                        * Not associating credentials to a sessiond chunk is a
+                        * fatal internal error.
+                        */
+                       ret = -LTTNG_ERR_FATAL;
+                       goto error;
+               }
+               msg.u.create_trace_chunk.credentials.value.uid =
+                               chunk_credentials.uid;
+               msg.u.create_trace_chunk.credentials.value.gid =
+                               chunk_credentials.gid;
+               msg.u.create_trace_chunk.credentials.is_set = 1;
        }
-       msg.u.create_trace_chunk.credentials.uid = chunk_credentials.uid;
-       msg.u.create_trace_chunk.credentials.gid = chunk_credentials.gid;
 
        DBG("Sending consumer create trace chunk command: relayd_id = %" PRId64
                        ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
index 8424762f7e47f9c940a5269b669e9c781ac9d59a..61cbb0c1af2f3e10abed78c1394de238706597dd 100644 (file)
@@ -429,8 +429,6 @@ int _session_set_trace_chunk_no_lock_check(struct ltt_session *session,
        struct lttng_trace_chunk *current_trace_chunk;
        uint64_t chunk_id;
        enum lttng_trace_chunk_status chunk_status;
-       const uint64_t relayd_id = session->consumer->net_seq_index;
-       const bool is_local_trace = relayd_id  == -1ULL;
 
        rcu_read_lock();
        /*
@@ -470,6 +468,12 @@ int _session_set_trace_chunk_no_lock_check(struct ltt_session *session,
        }
 
        if (session->ust_session) {
+               const uint64_t relayd_id =
+                               session->ust_session->consumer->net_seq_index;
+               const bool is_local_trace =
+                               session->ust_session->consumer->type ==
+                               CONSUMER_DST_LOCAL;
+
                session->ust_session->current_trace_chunk = new_trace_chunk;
                 if (is_local_trace) {
                        enum lttng_error_code ret_error_code;
@@ -495,6 +499,12 @@ int _session_set_trace_chunk_no_lock_check(struct ltt_session *session,
                 }
         }
        if (session->kernel_session) {
+               const uint64_t relayd_id =
+                               session->kernel_session->consumer->net_seq_index;
+               const bool is_local_trace =
+                               session->kernel_session->consumer->type ==
+                               CONSUMER_DST_LOCAL;
+
                session->kernel_session->current_trace_chunk = new_trace_chunk;
                if (is_local_trace) {
                        enum lttng_error_code ret_error_code;
index efafeddbc3543e0d26c5cd2f66f046663c5b8572..dea7b7f51287022ea360ba1d68d24fbd9f541d9e 100644 (file)
@@ -4507,8 +4507,39 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk(
                        break;
                }
        }
-       rcu_read_unlock();
 
+       if (relayd_id) {
+               struct consumer_relayd_sock_pair *relayd;
+
+               relayd = consumer_find_relayd(*relayd_id);
+               if (relayd) {
+                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+                       ret = relayd_create_trace_chunk(
+                                       &relayd->control_sock, published_chunk);
+                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               } else {
+                       ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
+               }
+
+               if (!relayd || ret) {
+                       enum lttcomm_return_code close_ret;
+
+                       close_ret = lttng_consumer_close_trace_chunk(relayd_id,
+                                       session_id,
+                                       chunk_id,
+                                       chunk_creation_timestamp);
+                       if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+                               ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+                                               session_id,
+                                               chunk_id);
+                       }
+
+                       ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+                       goto error;
+               }
+       }
+error:
+       rcu_read_unlock();
        /* Release the reference returned by the "publish" operation. */
        lttng_trace_chunk_put(published_chunk);
 end:
index ae909517fb31f84ba35c2afd9ddfc809c4e13693..edf8acb8286cedc3c8649b6c64313ccb0ff385fc 100644 (file)
@@ -1167,8 +1167,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
        {
                const struct lttng_credentials credentials = {
-                       .uid = msg.u.create_trace_chunk.credentials.uid,
-                       .gid = msg.u.create_trace_chunk.credentials.gid,
+                       .uid = msg.u.create_trace_chunk.credentials.value.uid,
+                       .gid = msg.u.create_trace_chunk.credentials.value.gid,
                };
                const bool is_local_trace =
                                !msg.u.create_trace_chunk.relayd_id.is_set;
@@ -1221,9 +1221,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                !is_local_trace ? &relayd_id : NULL,
                                msg.u.create_trace_chunk.session_id,
                                msg.u.create_trace_chunk.chunk_id,
-                               (time_t) msg.u.create_trace_chunk.creation_timestamp,
+                               (time_t) msg.u.create_trace_chunk
+                                               .creation_timestamp,
                                chunk_override_name,
-                               &credentials,
+                               msg.u.create_trace_chunk.credentials.is_set ?
+                                               &credentials :
+                                               NULL,
                                chunk_directory_handle.is_set ?
                                                &chunk_directory_handle.value :
                                                NULL);
index 90c6649acc3f7b26cac59b2c89eb706eb74b0d72..b065f364eb67f3e06410b9c3043082abdd8f85ad 100644 (file)
@@ -1195,3 +1195,91 @@ error:
        free(msg);
        return ret;
 }
+
+int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
+               struct lttng_trace_chunk *chunk)
+{
+       int ret = 0;
+       enum lttng_trace_chunk_status status;
+       struct lttcomm_relayd_create_trace_chunk msg = {};
+       struct lttcomm_relayd_generic_reply reply = {};
+       struct lttng_dynamic_buffer payload;
+       uint64_t chunk_id;
+       time_t creation_timestamp;
+       const char *chunk_name;
+       size_t chunk_name_length;
+       bool overriden_name;
+
+       lttng_dynamic_buffer_init(&payload);
+
+       status = lttng_trace_chunk_get_id(chunk, &chunk_id);
+       if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               ret = -1;
+               goto end;
+       }
+
+       status = lttng_trace_chunk_get_creation_timestamp(
+                       chunk, &creation_timestamp);
+       if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               ret = -1;
+               goto end;
+       }
+
+       status = lttng_trace_chunk_get_name(
+                       chunk, &chunk_name, &overriden_name);
+       if (status != LTTNG_TRACE_CHUNK_STATUS_OK &&
+                       status != LTTNG_TRACE_CHUNK_STATUS_NONE) {
+               ret = -1;
+               goto end;
+       }
+
+       chunk_name_length = overriden_name ? (strlen(chunk_name) + 1) : 0;
+       msg = (typeof(msg)){
+               .chunk_id = htobe64(chunk_id),
+               .creation_timestamp = htobe64((uint64_t) creation_timestamp),
+               .override_name_length = htobe32((uint32_t) chunk_name_length),
+       };
+
+       ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
+       if (ret) {
+               goto end;
+       }
+       if (chunk_name_length) {
+               ret = lttng_dynamic_buffer_append(
+                               &payload, chunk_name, chunk_name_length);
+               if (ret) {
+                       goto end;
+               }
+       }
+
+       ret = send_command(sock,
+                       RELAYD_CREATE_TRACE_CHUNK,
+                       payload.data,
+                       payload.size,
+                       0);
+       if (ret < 0) {
+               ERR("Failed to send trace chunk creation command to relay daemon");
+               goto end;
+       }
+
+       ret = recv_reply(sock, &reply, sizeof(reply));
+       if (ret < 0) {
+               ERR("Failed to receive relay daemon trace chunk creation command reply");
+               goto end;
+       }
+
+       reply.ret_code = be32toh(reply.ret_code);
+       if (reply.ret_code != LTTNG_OK) {
+               ret = -1;
+               ERR("Relayd trace chunk create replied error %d",
+                               reply.ret_code);
+       } else {
+               ret = 0;
+               DBG("Relayd successfully created trace chunk: chunk_id = %" PRIu64,
+                               chunk_id);
+       }
+
+end:
+       lttng_dynamic_buffer_reset(&payload);
+       return ret;
+}
index c8a216c543a0a984d4d368a81f1204723b500f0a..695ef6ffa9cf723997bc2012a16c2bfdf8c03dad 100644 (file)
@@ -59,5 +59,7 @@ int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
                uint64_t stream_id, uint64_t version);
 int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id,
                uint64_t new_chunk_id, uint64_t seq_num);
+int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
+               struct lttng_trace_chunk *chunk);
 
 #endif /* _RELAYD_H */
index fd456a3a1502311423c18f1b27d887b9d107072d..f86dde3d1243d70f26769e9efaa893fed1a89a66 100644 (file)
@@ -237,4 +237,13 @@ struct lttcomm_relayd_rotate_stream {
        char new_pathname[];
 } LTTNG_PACKED;
 
+struct lttcomm_relayd_create_trace_chunk {
+       uint64_t chunk_id;
+       /* Seconds since EPOCH. */
+       uint64_t creation_timestamp;
+       /* Includes trailing NULL. */
+       uint32_t override_name_length;
+       char override_name[];
+} LTTNG_PACKED;
+
 #endif /* _RELAYD_COMM */
index d9177cf37d8554c723e94a80a448839071042fc6..d5009740e45da8ad96263226d4db80929f0fed25 100644 (file)
@@ -133,12 +133,8 @@ enum lttcomm_relayd_command {
        RELAYD_RESET_METADATA               = 17,
        /* Ask the relay to rotate a stream file (2.11+) */
        RELAYD_ROTATE_STREAM                = 18,
-       /* Rename a chunk after the rotation is completed (2.11+) */
-       RELAYD_ROTATE_RENAME                = 19,
-       /* Check if a chunk has data pending (2.11+) */
-       RELAYD_ROTATE_PENDING               = 20,
-       /* Create a folder on the relayd FS (2.11+) */
-       RELAYD_MKDIR                        = 21,
+       /* Ask the relay to create a trace chunk (2.11+) */
+       RELAYD_CREATE_TRACE_CHUNK           = 19,
 };
 
 /*
@@ -639,10 +635,10 @@ struct lttcomm_consumer_msg {
                        uint64_t session_id;
                        uint64_t chunk_id;
                        uint64_t creation_timestamp;
-                       struct {
+                       LTTNG_OPTIONAL_COMM(struct {
                                uint32_t uid;
                                uint32_t gid;
-                       } LTTNG_PACKED credentials;
+                       } LTTNG_PACKED ) LTTNG_PACKED credentials;
                } LTTNG_PACKED create_trace_chunk;
                struct {
                        LTTNG_OPTIONAL_COMM(uint64_t) LTTNG_PACKED relayd_id;
index ff9d31d5287a92b29bf6ac16bff0e6a17876d837..0856cf0b9d3abf0858ca693b664ab5e542e5bbfb 100644 (file)
@@ -2008,8 +2008,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
        {
                const struct lttng_credentials credentials = {
-                       .uid = msg.u.create_trace_chunk.credentials.uid,
-                       .gid = msg.u.create_trace_chunk.credentials.gid,
+                       .uid = msg.u.create_trace_chunk.credentials.value.uid,
+                       .gid = msg.u.create_trace_chunk.credentials.value.gid,
                };
                const bool is_local_trace =
                                !msg.u.create_trace_chunk.relayd_id.is_set;
@@ -2062,9 +2062,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                !is_local_trace ? &relayd_id : NULL,
                                msg.u.create_trace_chunk.session_id,
                                msg.u.create_trace_chunk.chunk_id,
-                               (time_t) msg.u.create_trace_chunk.creation_timestamp,
+                               (time_t) msg.u.create_trace_chunk
+                                               .creation_timestamp,
                                chunk_override_name,
-                               &credentials,
+                               msg.u.create_trace_chunk.credentials.is_set ?
+                                               &credentials :
+                                               NULL,
                                chunk_directory_handle.is_set ?
                                                &chunk_directory_handle.value :
                                                NULL);
This page took 0.035405 seconds and 4 git commands to generate.