From: Jérémie Galarneau Date: Tue, 16 Jul 2019 19:31:36 +0000 (-0400) Subject: relayd: add remote trace chunk creation command X-Git-Tag: v2.12.0-rc1~534 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=e5add6d004793894ef4c7e047bc0f8885763b205;p=lttng-tools.git relayd: add remote trace chunk creation command Signed-off-by: Jérémie Galarneau --- diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 85c4fe9b9..11277f293 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -2534,8 +2534,6 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr size_t path_len; struct lttng_buffer_view new_path_view; - DBG("Rotate stream received"); - if (!session || !conn->version_check_done) { ERR("Trying to rotate a stream before version check"); ret = -1; @@ -2662,6 +2660,199 @@ end_no_reply: return ret; } +static int init_session_output_directory_handle(struct relay_session *session, + struct lttng_directory_handle *handle) +{ + int ret; + /* hostname/session_name */ + char *session_directory = NULL; + /* + * base path + session_directory + * e.g. /home/user/lttng-traces/hostname/session_name + */ + char *full_session_path = NULL; + + pthread_mutex_lock(&session->lock); + ret = asprintf(&session_directory, "%s/%s", session->hostname, + session->session_name); + pthread_mutex_unlock(&session->lock); + if (ret < 0) { + PERROR("Failed to format session directory name"); + goto end; + } + + full_session_path = create_output_path(session_directory); + if (!full_session_path) { + ret = -1; + goto end; + } + + ret = utils_mkdir_recursive( + full_session_path, S_IRWXU | S_IRWXG, -1, -1); + if (ret) { + ERR("Failed to create session output path \"%s\"", + full_session_path); + goto end; + } + + ret = lttng_directory_handle_init(handle, full_session_path); + if (ret) { + goto end; + } +end: + free(session_directory); + free(full_session_path); + return ret; +} + +/* + * relay_create_trace_chunk: create a new trace chunk + */ +static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) +{ + int ret = 0; + ssize_t send_ret; + struct relay_session *session = conn->session; + struct lttcomm_relayd_create_trace_chunk *msg; + struct lttcomm_relayd_generic_reply reply = {}; + struct lttng_buffer_view header_view; + struct lttng_buffer_view chunk_name_view; + struct lttng_trace_chunk *chunk = NULL, *published_chunk = NULL; + enum lttng_error_code reply_code = LTTNG_OK; + enum lttng_trace_chunk_status chunk_status; + struct lttng_directory_handle session_output; + + if (!session || !conn->version_check_done) { + ERR("Trying to create a trace chunk before version check"); + ret = -1; + goto end_no_reply; + } + + if (session->major == 2 && session->minor < 11) { + ERR("Chunk creation command is unsupported before 2.11"); + ret = -1; + goto end_no_reply; + } + + header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg)); + if (!header_view.data) { + ERR("Failed to receive payload of chunk creation command"); + ret = -1; + goto end_no_reply; + } + + /* Convert to host endianness. */ + msg = (typeof(msg)) header_view.data; + msg->chunk_id = be64toh(msg->chunk_id); + msg->creation_timestamp = be64toh(msg->creation_timestamp); + msg->override_name_length = be32toh(msg->override_name_length); + + chunk = lttng_trace_chunk_create( + msg->chunk_id, msg->creation_timestamp); + if (!chunk) { + ERR("Failed to create trace chunk in trace chunk creation command"); + ret = -1; + reply_code = LTTNG_ERR_NOMEM; + goto end; + } + + if (msg->override_name_length) { + const char *name; + + chunk_name_view = lttng_buffer_view_from_view(payload, + sizeof(*msg), + msg->override_name_length); + name = chunk_name_view.data; + if (!name || name[msg->override_name_length - 1]) { + ERR("Failed to receive payload of chunk creation command"); + ret = -1; + reply_code = LTTNG_ERR_INVALID; + goto end; + } + + chunk_status = lttng_trace_chunk_override_name( + chunk, chunk_name_view.data); + switch (chunk_status) { + case LTTNG_TRACE_CHUNK_STATUS_OK: + break; + case LTTNG_TRACE_CHUNK_STATUS_INVALID_ARGUMENT: + ERR("Failed to set the name of new trace chunk in trace chunk creation command (invalid name)"); + reply_code = LTTNG_ERR_INVALID; + ret = -1; + goto end; + default: + ERR("Failed to set the name of new trace chunk in trace chunk creation command (unknown error)"); + reply_code = LTTNG_ERR_UNK; + ret = -1; + goto end; + } + } + + ret = init_session_output_directory_handle( + conn->session, &session_output); + if (ret) { + reply_code = LTTNG_ERR_CREATE_DIR_FAIL; + goto end; + } + + chunk_status = lttng_trace_chunk_set_credentials_current_user(chunk); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + reply_code = LTTNG_ERR_UNK; + ret = -1; + goto end; + } + + chunk_status = lttng_trace_chunk_set_as_owner(chunk, &session_output); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + reply_code = LTTNG_ERR_UNK; + ret = -1; + goto end; + } + + published_chunk = sessiond_trace_chunk_registry_publish_chunk( + sessiond_trace_chunk_registry, + conn->session->sessiond_uuid, + conn->session->id, + chunk); + if (!published_chunk) { + char uuid_str[UUID_STR_LEN]; + + lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str); + ERR("Failed to publish chunk: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64, + uuid_str, + conn->session->id, + msg->chunk_id); + ret = -1; + reply_code = LTTNG_ERR_NOMEM; + goto end; + } + + pthread_mutex_lock(&conn->session->lock); + lttng_trace_chunk_put(conn->session->current_trace_chunk); + conn->session->current_trace_chunk = published_chunk; + pthread_mutex_unlock(&conn->session->lock); + published_chunk = NULL; + +end: + reply.ret_code = htobe32((uint32_t) reply_code); + send_ret = conn->sock->ops->sendmsg(conn->sock, + &reply, + sizeof(struct lttcomm_relayd_generic_reply), + 0); + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)", + send_ret); + ret = -1; + } +end_no_reply: + lttng_trace_chunk_put(chunk); + lttng_trace_chunk_put(published_chunk); + lttng_directory_handle_fini(&session_output); + return ret; +} + #define DBG_CMD(cmd_name, conn) \ DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd); @@ -2728,6 +2919,10 @@ static int relay_process_control_command(struct relay_connection *conn, DBG_CMD("RELAYD_ROTATE_STREAM", conn); ret = relay_rotate_session_stream(header, conn, payload); break; + case RELAYD_CREATE_TRACE_CHUNK: + DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn); + ret = relay_create_trace_chunk(header, conn, payload); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", header->cmd); diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 8ff002cea..83c30be86 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -1772,7 +1772,6 @@ int consumer_create_trace_chunk(struct consumer_socket *socket, goto error; } msg.u.create_trace_chunk.chunk_id = chunk_id; - /* Only used for logging purposes. */ if (chunk_has_local_output) { chunk_status = lttng_trace_chunk_get_chunk_directory_handle( @@ -1793,20 +1792,23 @@ int consumer_create_trace_chunk(struct consumer_socket *socket, chunk_dirfd = lttng_directory_handle_get_dirfd( chunk_directory_handle); assert(chunk_dirfd >= 0); - } - chunk_status = lttng_trace_chunk_get_credentials(chunk, - &chunk_credentials); - if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { - /* - * Not associating credentials to a sessiond chunk is a fatal - * internal error. - */ - ret = -LTTNG_ERR_FATAL; - goto error; + chunk_status = lttng_trace_chunk_get_credentials( + chunk, &chunk_credentials); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + /* + * Not associating credentials to a sessiond chunk is a + * fatal internal error. + */ + ret = -LTTNG_ERR_FATAL; + goto error; + } + msg.u.create_trace_chunk.credentials.value.uid = + chunk_credentials.uid; + msg.u.create_trace_chunk.credentials.value.gid = + chunk_credentials.gid; + msg.u.create_trace_chunk.credentials.is_set = 1; } - msg.u.create_trace_chunk.credentials.uid = chunk_credentials.uid; - msg.u.create_trace_chunk.credentials.gid = chunk_credentials.gid; DBG("Sending consumer create trace chunk command: relayd_id = %" PRId64 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 diff --git a/src/bin/lttng-sessiond/session.c b/src/bin/lttng-sessiond/session.c index 8424762f7..61cbb0c1a 100644 --- a/src/bin/lttng-sessiond/session.c +++ b/src/bin/lttng-sessiond/session.c @@ -429,8 +429,6 @@ int _session_set_trace_chunk_no_lock_check(struct ltt_session *session, struct lttng_trace_chunk *current_trace_chunk; uint64_t chunk_id; enum lttng_trace_chunk_status chunk_status; - const uint64_t relayd_id = session->consumer->net_seq_index; - const bool is_local_trace = relayd_id == -1ULL; rcu_read_lock(); /* @@ -470,6 +468,12 @@ int _session_set_trace_chunk_no_lock_check(struct ltt_session *session, } if (session->ust_session) { + const uint64_t relayd_id = + session->ust_session->consumer->net_seq_index; + const bool is_local_trace = + session->ust_session->consumer->type == + CONSUMER_DST_LOCAL; + session->ust_session->current_trace_chunk = new_trace_chunk; if (is_local_trace) { enum lttng_error_code ret_error_code; @@ -495,6 +499,12 @@ int _session_set_trace_chunk_no_lock_check(struct ltt_session *session, } } if (session->kernel_session) { + const uint64_t relayd_id = + session->kernel_session->consumer->net_seq_index; + const bool is_local_trace = + session->kernel_session->consumer->type == + CONSUMER_DST_LOCAL; + session->kernel_session->current_trace_chunk = new_trace_chunk; if (is_local_trace) { enum lttng_error_code ret_error_code; diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index efafeddbc..dea7b7f51 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -4507,8 +4507,39 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( break; } } - rcu_read_unlock(); + if (relayd_id) { + struct consumer_relayd_sock_pair *relayd; + + relayd = consumer_find_relayd(*relayd_id); + if (relayd) { + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_create_trace_chunk( + &relayd->control_sock, published_chunk); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } else { + ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id); + } + + if (!relayd || ret) { + enum lttcomm_return_code close_ret; + + close_ret = lttng_consumer_close_trace_chunk(relayd_id, + session_id, + chunk_id, + chunk_creation_timestamp); + if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { + ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64, + session_id, + chunk_id); + } + + ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; + goto error; + } + } +error: + rcu_read_unlock(); /* Release the reference returned by the "publish" operation. */ lttng_trace_chunk_put(published_chunk); end: diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index ae909517f..edf8acb82 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1167,8 +1167,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_CREATE_TRACE_CHUNK: { const struct lttng_credentials credentials = { - .uid = msg.u.create_trace_chunk.credentials.uid, - .gid = msg.u.create_trace_chunk.credentials.gid, + .uid = msg.u.create_trace_chunk.credentials.value.uid, + .gid = msg.u.create_trace_chunk.credentials.value.gid, }; const bool is_local_trace = !msg.u.create_trace_chunk.relayd_id.is_set; @@ -1221,9 +1221,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, !is_local_trace ? &relayd_id : NULL, msg.u.create_trace_chunk.session_id, msg.u.create_trace_chunk.chunk_id, - (time_t) msg.u.create_trace_chunk.creation_timestamp, + (time_t) msg.u.create_trace_chunk + .creation_timestamp, chunk_override_name, - &credentials, + msg.u.create_trace_chunk.credentials.is_set ? + &credentials : + NULL, chunk_directory_handle.is_set ? &chunk_directory_handle.value : NULL); diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index 90c6649ac..b065f364e 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -1195,3 +1195,91 @@ error: free(msg); return ret; } + +int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock, + struct lttng_trace_chunk *chunk) +{ + int ret = 0; + enum lttng_trace_chunk_status status; + struct lttcomm_relayd_create_trace_chunk msg = {}; + struct lttcomm_relayd_generic_reply reply = {}; + struct lttng_dynamic_buffer payload; + uint64_t chunk_id; + time_t creation_timestamp; + const char *chunk_name; + size_t chunk_name_length; + bool overriden_name; + + lttng_dynamic_buffer_init(&payload); + + status = lttng_trace_chunk_get_id(chunk, &chunk_id); + if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -1; + goto end; + } + + status = lttng_trace_chunk_get_creation_timestamp( + chunk, &creation_timestamp); + if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -1; + goto end; + } + + status = lttng_trace_chunk_get_name( + chunk, &chunk_name, &overriden_name); + if (status != LTTNG_TRACE_CHUNK_STATUS_OK && + status != LTTNG_TRACE_CHUNK_STATUS_NONE) { + ret = -1; + goto end; + } + + chunk_name_length = overriden_name ? (strlen(chunk_name) + 1) : 0; + msg = (typeof(msg)){ + .chunk_id = htobe64(chunk_id), + .creation_timestamp = htobe64((uint64_t) creation_timestamp), + .override_name_length = htobe32((uint32_t) chunk_name_length), + }; + + ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg)); + if (ret) { + goto end; + } + if (chunk_name_length) { + ret = lttng_dynamic_buffer_append( + &payload, chunk_name, chunk_name_length); + if (ret) { + goto end; + } + } + + ret = send_command(sock, + RELAYD_CREATE_TRACE_CHUNK, + payload.data, + payload.size, + 0); + if (ret < 0) { + ERR("Failed to send trace chunk creation command to relay daemon"); + goto end; + } + + ret = recv_reply(sock, &reply, sizeof(reply)); + if (ret < 0) { + ERR("Failed to receive relay daemon trace chunk creation command reply"); + goto end; + } + + reply.ret_code = be32toh(reply.ret_code); + if (reply.ret_code != LTTNG_OK) { + ret = -1; + ERR("Relayd trace chunk create replied error %d", + reply.ret_code); + } else { + ret = 0; + DBG("Relayd successfully created trace chunk: chunk_id = %" PRIu64, + chunk_id); + } + +end: + lttng_dynamic_buffer_reset(&payload); + return ret; +} diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index c8a216c54..695ef6ffa 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -59,5 +59,7 @@ int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock, uint64_t stream_id, uint64_t version); int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id, uint64_t new_chunk_id, uint64_t seq_num); +int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock, + struct lttng_trace_chunk *chunk); #endif /* _RELAYD_H */ diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h index fd456a3a1..f86dde3d1 100644 --- a/src/common/sessiond-comm/relayd.h +++ b/src/common/sessiond-comm/relayd.h @@ -237,4 +237,13 @@ struct lttcomm_relayd_rotate_stream { char new_pathname[]; } LTTNG_PACKED; +struct lttcomm_relayd_create_trace_chunk { + uint64_t chunk_id; + /* Seconds since EPOCH. */ + uint64_t creation_timestamp; + /* Includes trailing NULL. */ + uint32_t override_name_length; + char override_name[]; +} LTTNG_PACKED; + #endif /* _RELAYD_COMM */ diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index d9177cf37..d5009740e 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -133,12 +133,8 @@ enum lttcomm_relayd_command { RELAYD_RESET_METADATA = 17, /* Ask the relay to rotate a stream file (2.11+) */ RELAYD_ROTATE_STREAM = 18, - /* Rename a chunk after the rotation is completed (2.11+) */ - RELAYD_ROTATE_RENAME = 19, - /* Check if a chunk has data pending (2.11+) */ - RELAYD_ROTATE_PENDING = 20, - /* Create a folder on the relayd FS (2.11+) */ - RELAYD_MKDIR = 21, + /* Ask the relay to create a trace chunk (2.11+) */ + RELAYD_CREATE_TRACE_CHUNK = 19, }; /* @@ -639,10 +635,10 @@ struct lttcomm_consumer_msg { uint64_t session_id; uint64_t chunk_id; uint64_t creation_timestamp; - struct { + LTTNG_OPTIONAL_COMM(struct { uint32_t uid; uint32_t gid; - } LTTNG_PACKED credentials; + } LTTNG_PACKED ) LTTNG_PACKED credentials; } LTTNG_PACKED create_trace_chunk; struct { LTTNG_OPTIONAL_COMM(uint64_t) LTTNG_PACKED relayd_id; diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index ff9d31d52..0856cf0b9 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -2008,8 +2008,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_CREATE_TRACE_CHUNK: { const struct lttng_credentials credentials = { - .uid = msg.u.create_trace_chunk.credentials.uid, - .gid = msg.u.create_trace_chunk.credentials.gid, + .uid = msg.u.create_trace_chunk.credentials.value.uid, + .gid = msg.u.create_trace_chunk.credentials.value.gid, }; const bool is_local_trace = !msg.u.create_trace_chunk.relayd_id.is_set; @@ -2062,9 +2062,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, !is_local_trace ? &relayd_id : NULL, msg.u.create_trace_chunk.session_id, msg.u.create_trace_chunk.chunk_id, - (time_t) msg.u.create_trace_chunk.creation_timestamp, + (time_t) msg.u.create_trace_chunk + .creation_timestamp, chunk_override_name, - &credentials, + msg.u.create_trace_chunk.credentials.is_set ? + &credentials : + NULL, chunk_directory_handle.is_set ? &chunk_directory_handle.value : NULL);