From: Mathieu Desnoyers Date: Fri, 23 Aug 2019 00:59:49 +0000 (-0700) Subject: relayd protocol: reply path for close chunk and create session 2.11 X-Git-Tag: v2.11.0-rc3~35 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=41b235984d17069c74c21686fb8faf0aaf47d91c;p=lttng-tools.git relayd protocol: reply path for close chunk and create session 2.11 Since the relay daemon is expected to be able to move the target destination of the trace, reply the chunk and session path so session can have relevant data rather than guessing their location. The session daemon now use this information to send the path back to the client for rotation and destroy commands, as well as for rotation completion notifications. Signed-off-by: Mathieu Desnoyers Signed-off-by: Jérémie Galarneau --- diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index a2904eb08..09e1b7963 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1068,7 +1068,7 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr, int ret = 0; ssize_t send_ret; struct relay_session *session = NULL; - struct lttcomm_relayd_status_session reply = {}; + struct lttcomm_relayd_create_session_reply_2_11 reply = {}; char session_name[LTTNG_NAME_MAX] = {}; char hostname[LTTNG_HOST_NAME_MAX] = {}; uint32_t live_timer = 0; @@ -1080,6 +1080,9 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr, LTTNG_OPTIONAL(uint64_t) id_sessiond = {}; LTTNG_OPTIONAL(uint64_t) current_chunk_id = {}; LTTNG_OPTIONAL(time_t) creation_time = {}; + struct lttng_dynamic_buffer reply_payload; + + lttng_dynamic_buffer_init(&reply_payload); if (conn->minor < 4) { /* From 2.1 to 2.3 */ @@ -1133,24 +1136,56 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr, conn->session = session; DBG("Created session %" PRIu64, session->id); - reply.session_id = htobe64(session->id); + reply.generic.session_id = htobe64(session->id); send_reply: if (ret < 0) { - reply.ret_code = htobe32(LTTNG_ERR_FATAL); + reply.generic.ret_code = htobe32(LTTNG_ERR_FATAL); } else { - reply.ret_code = htobe32(LTTNG_OK); + reply.generic.ret_code = htobe32(LTTNG_OK); } - send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0); - if (send_ret < (ssize_t) sizeof(reply)) { - ERR("Failed to send \"create session\" command reply (ret = %zd)", - send_ret); + if (conn->minor < 11) { + /* From 2.1 to 2.10 */ + ret = lttng_dynamic_buffer_append(&reply_payload, + &reply.generic, sizeof(reply.generic)); + if (ret) { + ERR("Failed to append \"create session\" command reply header to payload buffer"); + ret = -1; + goto end; + } + } else { + const uint32_t output_path_length = + strlen(session->output_path) + 1; + + reply.output_path_length = htobe32(output_path_length); + ret = lttng_dynamic_buffer_append(&reply_payload, &reply, + sizeof(reply)); + if (ret) { + ERR("Failed to append \"create session\" command reply header to payload buffer"); + goto end; + } + + ret = lttng_dynamic_buffer_append(&reply_payload, + session->output_path, output_path_length); + if (ret) { + ERR("Failed to append \"create session\" command reply path to payload buffer"); + goto end; + } + } + + send_ret = conn->sock->ops->sendmsg(conn->sock, reply_payload.data, + reply_payload.size, 0); + if (send_ret < (ssize_t) reply_payload.size) { + ERR("Failed to send \"create session\" command reply of %zu bytes (ret = %zd)", + reply_payload.size, send_ret); ret = -1; } +end: if (ret < 0 && session) { session_put(session); } + lttng_dynamic_buffer_reset(&reply_payload); return ret; } @@ -2195,6 +2230,7 @@ static int relay_rotate_session_streams( } else { chunk_id_str = chunk_id_buf; } + session->has_rotated = true; } DBG("Rotate %" PRIu32 " streams of session \"%s\" to chunk \"%s\"", @@ -2266,87 +2302,14 @@ static int init_session_output_directory_handle(struct relay_session *session, struct lttng_directory_handle *handle) { int ret; - /* - * session_directory: - * - * if base_path is \0' - * hostname/session_name - * else - * hostname/base_path - */ - char *session_directory = NULL; /* * relayd_output_path/session_directory * e.g. /home/user/lttng-traces/hostname/session_name */ char *full_session_path = NULL; - /* - * If base path is set, it overrides the session name for the - * session relative base path. No timestamp is appended if the - * base path is overridden. - * - * If the session name already contains the creation time (e.g. - * auto-, don't append yet another timestamp after - * the session name in the generated path. - * - * Otherwise, generate the path with session_name-. - */ - if (session->base_path[0] != '\0') { - pthread_mutex_lock(&session->lock); - ret = asprintf(&session_directory, "%s/%s", session->hostname, - session->base_path); - pthread_mutex_unlock(&session->lock); - } else if (session->session_name_contains_creation_time) { - pthread_mutex_lock(&session->lock); - ret = asprintf(&session_directory, "%s/%s", session->hostname, - session->session_name); - pthread_mutex_unlock(&session->lock); - } else { - char session_creation_datetime[16]; - size_t strftime_ret; - struct tm *timeinfo; - time_t creation_time; - - /* - * The 2.11+ protocol guarantees that a creation time - * is provided for a session. This would indicate a - * protocol error or an improper use of this util. - */ - if (!session->creation_time.is_set) { - ERR("Creation time missing for session \"%s\" (protocol error)", - session->session_name); - ret = -1; - goto end; - } - creation_time = LTTNG_OPTIONAL_GET(session->creation_time); - - timeinfo = localtime(&creation_time); - if (!timeinfo) { - ERR("Failed to get timeinfo while initializing session output directory handle"); - ret = -1; - goto end; - } - strftime_ret = strftime(session_creation_datetime, - sizeof(session_creation_datetime), - "%Y%m%d-%H%M%S", timeinfo); - if (strftime_ret == 0) { - ERR("Failed to format session creation timestamp while initializing session output directory handle"); - ret = -1; - goto end; - } - pthread_mutex_lock(&session->lock); - ret = asprintf(&session_directory, "%s/%s-%s", - session->hostname, session->session_name, - session_creation_datetime); - pthread_mutex_unlock(&session->lock); - } - if (ret < 0) { - PERROR("Failed to format session directory name"); - goto end; - } - - full_session_path = create_output_path(session_directory); + pthread_mutex_lock(&session->lock); + full_session_path = create_output_path(session->output_path); if (!full_session_path) { ret = -1; goto end; @@ -2365,7 +2328,7 @@ static int init_session_output_directory_handle(struct relay_session *session, goto end; } end: - free(session_directory); + pthread_mutex_unlock(&session->lock); free(full_session_path); return ret; } @@ -2540,7 +2503,7 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, ssize_t send_ret; struct relay_session *session = conn->session; struct lttcomm_relayd_close_trace_chunk *msg; - struct lttcomm_relayd_generic_reply reply = {}; + struct lttcomm_relayd_close_trace_chunk_reply reply = {}; struct lttng_buffer_view header_view; struct lttng_trace_chunk *chunk = NULL; enum lttng_error_code reply_code = LTTNG_OK; @@ -2548,6 +2511,12 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, uint64_t chunk_id; LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {}; time_t close_timestamp; + char closed_trace_chunk_path[LTTNG_PATH_MAX]; + size_t path_length = 0; + const char *chunk_name = NULL; + struct lttng_dynamic_buffer reply_payload; + + lttng_dynamic_buffer_init(&reply_payload); if (!session || !conn->version_check_done) { ERR("Trying to close a trace chunk before version check"); @@ -2623,6 +2592,53 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, goto end_unlock_session; } } + chunk_status = lttng_trace_chunk_get_name(chunk, &chunk_name, NULL); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to get chunk name"); + ret = -1; + reply_code = LTTNG_ERR_UNK; + goto end_unlock_session; + } + if (!session->has_rotated && !session->snapshot) { + ret = lttng_strncpy(closed_trace_chunk_path, + session->output_path, + sizeof(closed_trace_chunk_path)); + if (ret) { + ERR("Failed to send trace chunk path: path length of %zu bytes exceeds the maximal allowed length of %zu bytes", + strlen(session->output_path), + sizeof(closed_trace_chunk_path)); + reply_code = LTTNG_ERR_NOMEM; + ret = -1; + goto end_unlock_session; + } + } else { + if (session->snapshot) { + ret = snprintf(closed_trace_chunk_path, + sizeof(closed_trace_chunk_path), + "%s/%s", session->output_path, + chunk_name); + } else { + ret = snprintf(closed_trace_chunk_path, + sizeof(closed_trace_chunk_path), + "%s/" DEFAULT_ARCHIVED_TRACE_CHUNKS_DIRECTORY + "/%s", + session->output_path, chunk_name); + } + if (ret < 0 || ret == sizeof(closed_trace_chunk_path)) { + ERR("Failed to format closed trace chunk resulting path"); + reply_code = ret < 0 ? LTTNG_ERR_UNK : LTTNG_ERR_NOMEM; + ret = -1; + goto end_unlock_session; + } + } + DBG("Reply chunk path on close: %s", closed_trace_chunk_path); + path_length = strlen(closed_trace_chunk_path) + 1; + if (path_length > UINT32_MAX) { + ERR("Closed trace chunk path exceeds the maximal length allowed by the protocol"); + ret = -1; + reply_code = LTTNG_ERR_INVALID_PROTOCOL; + goto end_unlock_session; + } if (session->current_trace_chunk == chunk) { /* @@ -2642,18 +2658,37 @@ end_unlock_session: pthread_mutex_unlock(&session->lock); end: - reply.ret_code = htobe32((uint32_t) reply_code); + reply.generic.ret_code = htobe32((uint32_t) reply_code); + reply.path_length = htobe32((uint32_t) path_length); + ret = lttng_dynamic_buffer_append( + &reply_payload, &reply, sizeof(reply)); + if (ret) { + ERR("Failed to append \"close trace chunk\" command reply header to payload buffer"); + goto end_no_reply; + } + + if (reply_code == LTTNG_OK) { + ret = lttng_dynamic_buffer_append(&reply_payload, + closed_trace_chunk_path, path_length); + if (ret) { + ERR("Failed to append \"close trace chunk\" command reply path to payload buffer"); + goto end_no_reply; + } + } + send_ret = conn->sock->ops->sendmsg(conn->sock, - &reply, - sizeof(struct lttcomm_relayd_generic_reply), + reply_payload.data, + reply_payload.size, 0); - if (send_ret < (ssize_t) sizeof(reply)) { - ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)", - send_ret); + if (send_ret < reply_payload.size) { + ERR("Failed to send \"close trace chunk\" command reply of %zu bytes (ret = %zd)", + reply_payload.size, send_ret); ret = -1; + goto end_no_reply; } end_no_reply: lttng_trace_chunk_put(chunk); + lttng_dynamic_buffer_reset(&reply_payload); return ret; } diff --git a/src/bin/lttng-relayd/session.c b/src/bin/lttng-relayd/session.c index c4a62c986..31f27184f 100644 --- a/src/bin/lttng-relayd/session.c +++ b/src/bin/lttng-relayd/session.c @@ -34,6 +34,94 @@ static uint64_t last_relay_session_id; static pthread_mutex_t last_relay_session_id_lock = PTHREAD_MUTEX_INITIALIZER; +static int init_session_output_path(struct relay_session *session) +{ + /* + * session_directory: + * + * if base_path is \0' + * hostname/session_name + * else + * hostname/base_path + */ + char *session_directory = NULL; + int ret = 0; + + if (session->output_path[0] != '\0') { + goto end; + } + /* + * If base path is set, it overrides the session name for the + * session relative base path. No timestamp is appended if the + * base path is overridden. + * + * If the session name already contains the creation time (e.g. + * auto-, don't append yet another timestamp after + * the session name in the generated path. + * + * Otherwise, generate the path with session_name-. + */ + if (session->base_path[0] != '\0') { + ret = asprintf(&session_directory, "%s/%s", session->hostname, + session->base_path); + } else if (session->session_name_contains_creation_time) { + ret = asprintf(&session_directory, "%s/%s", session->hostname, + session->session_name); + } else { + char session_creation_datetime[16]; + size_t strftime_ret; + struct tm *timeinfo; + time_t creation_time; + + /* + * The 2.11+ protocol guarantees that a creation time + * is provided for a session. This would indicate a + * protocol error or an improper use of this util. + */ + if (!session->creation_time.is_set) { + ERR("Creation time missing for session \"%s\" (protocol error)", + session->session_name); + ret = -1; + goto end; + } + creation_time = LTTNG_OPTIONAL_GET(session->creation_time); + + timeinfo = localtime(&creation_time); + if (!timeinfo) { + ERR("Failed to get timeinfo while initializing session output directory handle"); + ret = -1; + goto end; + } + strftime_ret = strftime(session_creation_datetime, + sizeof(session_creation_datetime), + "%Y%m%d-%H%M%S", timeinfo); + if (strftime_ret == 0) { + ERR("Failed to format session creation timestamp while initializing session output directory handle"); + ret = -1; + goto end; + } + ret = asprintf(&session_directory, "%s/%s-%s", + session->hostname, session->session_name, + session_creation_datetime); + } + if (ret < 0) { + PERROR("Failed to format session directory name"); + goto end; + } + + if (strlen(session_directory) >= LTTNG_PATH_MAX) { + ERR("Session output directory exceeds maximal length"); + ret = -1; + goto end; + } + strcpy(session->output_path, session_directory); + ret = 0; + +end: + free(session_directory); + return ret; +} + static int session_set_anonymous_chunk(struct relay_session *session) { int ret = 0; @@ -183,6 +271,10 @@ struct relay_session *session_create(const char *session_name, LTTNG_OPTIONAL_SET(&session->id_sessiond, *id_sessiond); } + ret = init_session_output_path(session); + if (ret) { + goto error; + } ret = sessiond_trace_chunk_registry_session_created( sessiond_trace_chunk_registry, sessiond_uuid); if (ret) { diff --git a/src/bin/lttng-relayd/session.h b/src/bin/lttng-relayd/session.h index b3d99012d..2a034daf3 100644 --- a/src/bin/lttng-relayd/session.h +++ b/src/bin/lttng-relayd/session.h @@ -56,6 +56,13 @@ struct relay_session { char session_name[LTTNG_NAME_MAX]; char hostname[LTTNG_HOST_NAME_MAX]; char base_path[LTTNG_PATH_MAX]; + /* + * Session output path relative to relayd's output path. + * Will be empty when interacting with peers < 2.11 since their + * streams' path are expressed relative to the relay daemon's + * output path. + */ + char output_path[LTTNG_PATH_MAX]; uint32_t live_timer; /* Session in snapshot mode. */ @@ -88,6 +95,8 @@ struct relay_session { bool aborted; bool session_name_contains_creation_time; + /* Whether session has performed an explicit rotation. */ + bool has_rotated; /* Contains ctf_trace object of that session indexed by path name. */ struct lttng_ht *ctf_traces_ht; diff --git a/src/bin/lttng-sessiond/cmd.c b/src/bin/lttng-sessiond/cmd.c index 4447747de..48aa79298 100644 --- a/src/bin/lttng-sessiond/cmd.c +++ b/src/bin/lttng-sessiond/cmd.c @@ -4562,7 +4562,7 @@ enum lttng_error_code snapshot_record(struct ltt_session *session, } if (session_close_trace_chunk( - session, session->current_trace_chunk, NULL)) { + session, session->current_trace_chunk, NULL, NULL)) { /* * Don't goto end; make sure the chunk is closed for the session * to allow future snapshots. @@ -4851,7 +4851,8 @@ int cmd_rotate_session(struct ltt_session *session, quiet_rotation ? NULL : &((enum lttng_trace_chunk_command_type){ - LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED})); + LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED}), + session->last_chunk_path); if (ret) { cmd_ret = LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER; goto error; @@ -4972,6 +4973,16 @@ int cmd_rotate_get_info(struct ltt_session *session, sizeof(info_return->location.local.absolute_path); info_return->location_type = (int8_t) LTTNG_TRACE_ARCHIVE_LOCATION_TYPE_LOCAL; + fmt_ret = asprintf(&chunk_path, + "%s/" DEFAULT_ARCHIVED_TRACE_CHUNKS_DIRECTORY "/%s", + session_get_base_path(session), + session->last_archived_chunk_name); + if (fmt_ret == -1) { + PERROR("Failed to format the path of the last archived trace chunk"); + info_return->status = LTTNG_ROTATION_STATUS_ERROR; + cmd_ret = LTTNG_ERR_UNK; + goto end; + } break; case CONSUMER_DST_NET: current_tracing_path_reply = @@ -4997,20 +5008,17 @@ int cmd_rotate_get_info(struct ltt_session *session, &info_return->location.relay.ports.data); info_return->location_type = (int8_t) LTTNG_TRACE_ARCHIVE_LOCATION_TYPE_RELAY; + chunk_path = strdup(session->last_chunk_path); + if (!chunk_path) { + ERR("Failed to allocate the path of the last archived trace chunk"); + info_return->status = LTTNG_ROTATION_STATUS_ERROR; + cmd_ret = LTTNG_ERR_UNK; + goto end; + } break; default: abort(); } - fmt_ret = asprintf(&chunk_path, - "%s/" DEFAULT_ARCHIVED_TRACE_CHUNKS_DIRECTORY "/%s", - session_get_base_path(session), - session->last_archived_chunk_name); - if (fmt_ret == -1) { - PERROR("Failed to format the path of the last archived trace chunk"); - info_return->status = LTTNG_ROTATION_STATUS_ERROR; - cmd_ret = LTTNG_ERR_UNK; - goto end; - } fmt_ret = lttng_strncpy(current_tracing_path_reply, chunk_path, current_tracing_path_reply_len); diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 4480da368..d6a6e8cbd 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -1095,6 +1095,8 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, } if (type == LTTNG_STREAM_CONTROL) { + char output_path[LTTNG_PATH_MAX] = {}; + ret = relayd_create_session(rsock, &msg.u.relayd_sock.relayd_session_id, session_name, hostname, base_path, @@ -1102,12 +1104,15 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, consumer->snapshot, session_id, sessiond_uuid, current_chunk_id, session_creation_time, - session_name_contains_creation_time); + session_name_contains_creation_time, + output_path); if (ret < 0) { /* Close the control socket. */ (void) relayd_close(rsock); goto error; } + DBG("Created session on relay, output path reply: %s", + output_path); } msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET; @@ -1840,7 +1845,8 @@ error: */ int consumer_close_trace_chunk(struct consumer_socket *socket, uint64_t relayd_id, uint64_t session_id, - struct lttng_trace_chunk *chunk) + struct lttng_trace_chunk *chunk, + char *closed_trace_chunk_path) { int ret; enum lttng_trace_chunk_status chunk_status; @@ -1848,12 +1854,15 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, .cmd_type = LTTNG_CONSUMER_CLOSE_TRACE_CHUNK, .u.close_trace_chunk.session_id = session_id, }; + struct lttcomm_consumer_close_trace_chunk_reply reply; uint64_t chunk_id; time_t close_timestamp; enum lttng_trace_chunk_command_type close_command; const char *close_command_name = "none"; + struct lttng_dynamic_buffer path_reception_buffer; assert(socket); + lttng_dynamic_buffer_init(&path_reception_buffer); if (relayd_id != -1ULL) { LTTNG_OPTIONAL_SET( @@ -1904,13 +1913,51 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, relayd_id, session_id, chunk_id, close_command_name); health_code_update(); - ret = consumer_send_msg(socket, &msg); + ret = consumer_socket_send(socket, &msg, sizeof(struct lttcomm_consumer_msg)); if (ret < 0) { ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER; goto error; } - + ret = consumer_socket_recv(socket, &reply, sizeof(reply)); + if (ret < 0) { + ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER; + goto error; + } + if (reply.path_length >= LTTNG_PATH_MAX) { + ERR("Invalid path returned by relay daemon: %" PRIu32 "bytes exceeds maximal allowed length of %d bytes", + reply.path_length, LTTNG_PATH_MAX); + ret = -LTTNG_ERR_INVALID_PROTOCOL; + goto error; + } + ret = lttng_dynamic_buffer_set_size(&path_reception_buffer, + reply.path_length); + if (ret) { + ERR("Failed to allocate reception buffer of path returned by the \"close trace chunk\" command"); + ret = -LTTNG_ERR_NOMEM; + goto error; + } + ret = consumer_socket_recv(socket, path_reception_buffer.data, + path_reception_buffer.size); + if (ret < 0) { + ERR("Communication error while receiving path of closed trace chunk"); + ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER; + goto error; + } + if (path_reception_buffer.data[path_reception_buffer.size - 1] != '\0') { + ERR("Invalid path returned by relay daemon: not null-terminated"); + ret = -LTTNG_ERR_INVALID_PROTOCOL; + goto error; + } + if (closed_trace_chunk_path) { + /* + * closed_trace_chunk_path is assumed to have a length >= + * LTTNG_PATH_MAX + */ + memcpy(closed_trace_chunk_path, path_reception_buffer.data, + path_reception_buffer.size); + } error: + lttng_dynamic_buffer_reset(&path_reception_buffer); health_code_update(); return ret; } diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index af07f5ccf..3d5fc6fc9 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -318,7 +318,8 @@ int consumer_create_trace_chunk(struct consumer_socket *socket, struct lttng_trace_chunk *chunk); int consumer_close_trace_chunk(struct consumer_socket *socket, uint64_t relayd_id, uint64_t session_id, - struct lttng_trace_chunk *chunk); + struct lttng_trace_chunk *chunk, + char *closed_trace_chunk_path); int consumer_trace_chunk_exists(struct consumer_socket *socket, uint64_t relayd_id, uint64_t session_id, struct lttng_trace_chunk *chunk, diff --git a/src/bin/lttng-sessiond/session.c b/src/bin/lttng-sessiond/session.c index 7fba07bb4..e0ab7f4a5 100644 --- a/src/bin/lttng-sessiond/session.c +++ b/src/bin/lttng-sessiond/session.c @@ -262,15 +262,15 @@ struct lttng_trace_archive_location *session_get_trace_archive_location( goto end; } - ret = asprintf(&chunk_path, "%s/" DEFAULT_ARCHIVED_TRACE_CHUNKS_DIRECTORY "/%s", - session_get_base_path(session), - session->last_archived_chunk_name); - if (ret == -1) { - goto end; - } - switch (session_get_consumer_destination_type(session)) { case CONSUMER_DST_LOCAL: + ret = asprintf(&chunk_path, + "%s/" DEFAULT_ARCHIVED_TRACE_CHUNKS_DIRECTORY "/%s", + session_get_base_path(session), + session->last_archived_chunk_name); + if (ret == -1) { + goto end; + } location = lttng_trace_archive_location_local_create( chunk_path); break; @@ -286,7 +286,7 @@ struct lttng_trace_archive_location *session_get_trace_archive_location( location = lttng_trace_archive_location_relay_create( hostname, LTTNG_TRACE_ARCHIVE_LOCATION_RELAY_PROTOCOL_TYPE_TCP, - control_port, data_port, chunk_path); + control_port, data_port, session->last_chunk_path); break; } default: @@ -663,7 +663,8 @@ error: int session_close_trace_chunk(const struct ltt_session *session, struct lttng_trace_chunk *trace_chunk, - const enum lttng_trace_chunk_command_type *close_command) + const enum lttng_trace_chunk_command_type *close_command, + char *closed_trace_chunk_path) { int ret = 0; bool error_occurred = false; @@ -707,7 +708,7 @@ int session_close_trace_chunk(const struct ltt_session *session, ret = consumer_close_trace_chunk(socket, relayd_id, session->id, - trace_chunk); + trace_chunk, closed_trace_chunk_path); pthread_mutex_unlock(socket->lock); if (ret) { ERR("Failed to close trace chunk on user space consumer"); @@ -726,7 +727,7 @@ int session_close_trace_chunk(const struct ltt_session *session, ret = consumer_close_trace_chunk(socket, relayd_id, session->id, - trace_chunk); + trace_chunk, closed_trace_chunk_path); pthread_mutex_unlock(socket->lock); if (ret) { ERR("Failed to close trace chunk on kernel consumer"); diff --git a/src/bin/lttng-sessiond/session.h b/src/bin/lttng-sessiond/session.h index 204701429..31a40a741 100644 --- a/src/bin/lttng-sessiond/session.h +++ b/src/bin/lttng-sessiond/session.h @@ -77,6 +77,8 @@ struct ltt_session { bool has_auto_generated_name; bool name_contains_creation_time; char hostname[HOST_NAME_MAX]; /* Local hostname. */ + /* Path of the last closed chunk. */ + char last_chunk_path[LTTNG_PATH_MAX]; time_t creation_time; struct ltt_kernel_session *kernel_session; struct ltt_ust_session *ust_session; @@ -251,7 +253,8 @@ int session_set_trace_chunk(struct ltt_session *session, */ int session_close_trace_chunk(const struct ltt_session *session, struct lttng_trace_chunk *trace_chunk, - const enum lttng_trace_chunk_command_type *close_command); + const enum lttng_trace_chunk_command_type *close_command, + char *path); bool session_output_supports_trace_chunks(const struct ltt_session *session); diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index a53b7383e..716b560a9 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -4580,11 +4580,13 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( * channels. */ enum lttcomm_return_code close_ret; + char path[LTTNG_PATH_MAX]; DBG("Failed to set new trace chunk on existing channels, rolling back"); close_ret = lttng_consumer_close_trace_chunk(relayd_id, session_id, chunk_id, - chunk_creation_timestamp, NULL); + chunk_creation_timestamp, NULL, + path); if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64, session_id, chunk_id); @@ -4610,12 +4612,13 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( if (!relayd || ret) { enum lttcomm_return_code close_ret; + char path[LTTNG_PATH_MAX]; close_ret = lttng_consumer_close_trace_chunk(relayd_id, session_id, chunk_id, chunk_creation_timestamp, - NULL); + NULL, path); if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64, session_id, @@ -4637,7 +4640,8 @@ end: enum lttcomm_return_code lttng_consumer_close_trace_chunk( const uint64_t *relayd_id, uint64_t session_id, uint64_t chunk_id, time_t chunk_close_timestamp, - const enum lttng_trace_chunk_command_type *close_command) + const enum lttng_trace_chunk_command_type *close_command, + char *path) { enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttng_trace_chunk *chunk; @@ -4735,7 +4739,8 @@ enum lttcomm_return_code lttng_consumer_close_trace_chunk( if (relayd) { pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_close_trace_chunk( - &relayd->control_sock, chunk); + &relayd->control_sock, chunk, + path); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } else { ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 13fc61617..1aaddb5ab 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -861,7 +861,8 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( enum lttcomm_return_code lttng_consumer_close_trace_chunk( const uint64_t *relayd_id, uint64_t session_id, uint64_t chunk_id, time_t chunk_close_timestamp, - const enum lttng_trace_chunk_command_type *close_command); + const enum lttng_trace_chunk_command_type *close_command, + char *path); enum lttcomm_return_code lttng_consumer_trace_chunk_exists( const uint64_t *relayd_id, uint64_t session_id, uint64_t chunk_id); diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index f9278f75d..29a27134f 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1250,6 +1250,8 @@ error_rotate_channel: msg.u.close_trace_chunk.close_command.value; const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value; + struct lttcomm_consumer_close_trace_chunk_reply reply; + char path[LTTNG_PATH_MAX]; ret_code = lttng_consumer_close_trace_chunk( msg.u.close_trace_chunk.relayd_id.is_set ? @@ -1260,8 +1262,18 @@ error_rotate_channel: (time_t) msg.u.close_trace_chunk.close_timestamp, msg.u.close_trace_chunk.close_command.is_set ? &close_command : - NULL); - goto end_msg_sessiond; + NULL, path); + reply.ret_code = ret_code; + reply.path_length = strlen(path) + 1; + ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply)); + if (ret != sizeof(reply)) { + goto error_fatal; + } + ret = lttcomm_send_unix_sock(sock, path, reply.path_length); + if (ret != reply.path_length) { + goto error_fatal; + } + goto end_nosignal; } case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS: { diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index 8e6b71173..49997015e 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -143,7 +143,9 @@ static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock, const char *base_path, int session_live_timer, unsigned int snapshot, uint64_t sessiond_session_id, const lttng_uuid sessiond_uuid, const uint64_t *current_chunk_id, - time_t creation_time, bool session_name_contains_creation_time) + time_t creation_time, bool session_name_contains_creation_time, + struct lttcomm_relayd_create_session_reply_2_11 *reply, + char *output_path) { int ret; struct lttcomm_relayd_create_session_2_11 *msg = NULL; @@ -212,6 +214,24 @@ static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock, if (ret < 0) { goto error; } + /* Receive response */ + ret = recv_reply(rsock, reply, sizeof(*reply)); + if (ret < 0) { + goto error; + } + reply->generic.session_id = be64toh(reply->generic.session_id); + reply->generic.ret_code = be32toh(reply->generic.ret_code); + reply->output_path_length = be32toh(reply->output_path_length); + if (reply->output_path_length >= LTTNG_PATH_MAX) { + ERR("Invalid session output path length in reply (%" PRIu32 " bytes) exceeds maximal allowed length (%d bytes)", + reply->output_path_length, LTTNG_PATH_MAX); + ret = -1; + goto error; + } + ret = recv_reply(rsock, output_path, reply->output_path_length); + if (ret < 0) { + goto error; + } error: free(msg); return ret; @@ -222,7 +242,8 @@ error: */ static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock, const char *session_name, const char *hostname, - int session_live_timer, unsigned int snapshot) + int session_live_timer, unsigned int snapshot, + struct lttcomm_relayd_status_session *reply) { int ret; struct lttcomm_relayd_create_session_2_4 msg; @@ -245,6 +266,13 @@ static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock, goto error; } + /* Receive response */ + ret = recv_reply(rsock, reply, sizeof(*reply)); + if (ret < 0) { + goto error; + } + reply->session_id = be64toh(reply->session_id); + reply->ret_code = be32toh(reply->ret_code); error: return ret; } @@ -252,7 +280,8 @@ error: /* * RELAYD_CREATE_SESSION from 2.1 to 2.3. */ -static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock) +static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock, + struct lttcomm_relayd_status_session *reply) { int ret; @@ -262,6 +291,13 @@ static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock) goto error; } + /* Receive response */ + ret = recv_reply(rsock, reply, sizeof(*reply)); + if (ret < 0) { + goto error; + } + reply->session_id = be64toh(reply->session_id); + reply->ret_code = be32toh(reply->ret_code); error: return ret; } @@ -280,10 +316,11 @@ int relayd_create_session(struct lttcomm_relayd_sock *rsock, unsigned int snapshot, uint64_t sessiond_session_id, const lttng_uuid sessiond_uuid, const uint64_t *current_chunk_id, - time_t creation_time, bool session_name_contains_creation_time) + time_t creation_time, bool session_name_contains_creation_time, + char *output_path) { int ret; - struct lttcomm_relayd_status_session reply; + struct lttcomm_relayd_create_session_reply_2_11 reply = {}; assert(rsock); assert(relayd_session_id); @@ -292,44 +329,38 @@ int relayd_create_session(struct lttcomm_relayd_sock *rsock, if (rsock->minor < 4) { /* From 2.1 to 2.3 */ - ret = relayd_create_session_2_1(rsock); + ret = relayd_create_session_2_1(rsock, &reply.generic); } else if (rsock->minor >= 4 && rsock->minor < 11) { /* From 2.4 to 2.10 */ ret = relayd_create_session_2_4(rsock, session_name, - hostname, session_live_timer, snapshot); + hostname, session_live_timer, snapshot, + &reply.generic); } else { /* From 2.11 to ... */ ret = relayd_create_session_2_11(rsock, session_name, hostname, base_path, session_live_timer, snapshot, sessiond_session_id, sessiond_uuid, current_chunk_id, creation_time, - session_name_contains_creation_time); + session_name_contains_creation_time, + &reply, output_path); } if (ret < 0) { goto error; } - /* Receive response */ - ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); - if (ret < 0) { - goto error; - } - - reply.session_id = be64toh(reply.session_id); - reply.ret_code = be32toh(reply.ret_code); - /* Return session id or negative ret code. */ - if (reply.ret_code != LTTNG_OK) { + if (reply.generic.ret_code != LTTNG_OK) { ret = -1; - ERR("Relayd create session replied error %d", reply.ret_code); + ERR("Relayd create session replied error %d", + reply.generic.ret_code); goto error; } else { ret = 0; - *relayd_session_id = reply.session_id; + *relayd_session_id = reply.generic.session_id; } - DBG("Relayd session created with id %" PRIu64, reply.session_id); + DBG("Relayd session created with id %" PRIu64, reply.generic.session_id); error: return ret; @@ -1342,12 +1373,13 @@ end: } int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock, - struct lttng_trace_chunk *chunk) + struct lttng_trace_chunk *chunk, + char *path) { int ret = 0; enum lttng_trace_chunk_status status; struct lttcomm_relayd_close_trace_chunk msg = {}; - struct lttcomm_relayd_generic_reply reply = {}; + struct lttcomm_relayd_close_trace_chunk_reply reply = {}; uint64_t chunk_id; time_t close_timestamp; LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {}; @@ -1407,11 +1439,29 @@ int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock, goto end; } - reply.ret_code = be32toh(reply.ret_code); - if (reply.ret_code != LTTNG_OK) { + reply.path_length = be32toh(reply.path_length); + if (reply.path_length >= LTTNG_PATH_MAX) { + ERR("Chunk path too long"); + ret = -1; + goto end; + } + + ret = recv_reply(sock, path, reply.path_length); + if (ret < 0) { + ERR("Failed to receive relay daemon trace chunk close command reply"); + goto end; + } + if (path[reply.path_length - 1] != '\0') { + ERR("Invalid trace chunk path returned by relay daemon (not null-terminated)"); + ret = -1; + goto end; + } + + reply.generic.ret_code = be32toh(reply.generic.ret_code); + if (reply.generic.ret_code != LTTNG_OK) { ret = -1; ERR("Relayd trace chunk close replied error %d", - reply.ret_code); + reply.generic.ret_code); } else { ret = 0; DBG("Relayd successfully closed trace chunk: chunk_id = %" PRIu64, diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index 8d4c9da13..5c5368391 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -46,7 +46,8 @@ int relayd_create_session(struct lttcomm_relayd_sock *rsock, unsigned int snapshot, uint64_t sessiond_session_id, const lttng_uuid sessiond_uuid, const uint64_t *current_chunk_id, - time_t creation_time, bool session_name_contains_creation_time); + time_t creation_time, bool session_name_contains_creation_time, + char *output_path); int relayd_add_stream(struct lttcomm_relayd_sock *sock, const char *channel_name, const char *pathname, uint64_t *stream_id, uint64_t tracefile_size, uint64_t tracefile_count, @@ -78,7 +79,8 @@ int relayd_rotate_streams(struct lttcomm_relayd_sock *sock, int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock, struct lttng_trace_chunk *chunk); int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock, - struct lttng_trace_chunk *chunk); + struct lttng_trace_chunk *chunk, + char *path); int relayd_trace_chunk_exists(struct lttcomm_relayd_sock *sock, uint64_t chunk_id, bool *chunk_exists); diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h index cc26a2c6b..efb577e99 100644 --- a/src/common/sessiond-comm/relayd.h +++ b/src/common/sessiond-comm/relayd.h @@ -222,6 +222,13 @@ struct lttcomm_relayd_create_session_2_11 { char names[]; } LTTNG_PACKED; +struct lttcomm_relayd_create_session_reply_2_11 { + struct lttcomm_relayd_status_session generic; + /* Includes the '\0' terminator. */ + uint32_t output_path_length; + char output_path[]; +} LTTNG_PACKED; + /* * Used to ask the relay to reset the metadata trace file (regeneration). * Send the new version of the metadata (starts at 0). @@ -270,6 +277,13 @@ struct lttcomm_relayd_close_trace_chunk { LTTNG_OPTIONAL_COMM(uint32_t) LTTNG_PACKED close_command; } LTTNG_PACKED; +struct lttcomm_relayd_close_trace_chunk_reply { + struct lttcomm_relayd_generic_reply generic; + /* Includes trailing NULL. */ + uint32_t path_length; + char path[]; +} LTTNG_PACKED; + struct lttcomm_relayd_trace_chunk_exists { uint64_t chunk_id; } LTTNG_PACKED; diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index 9e757f1fd..6fc65a103 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -695,6 +695,12 @@ struct lttcomm_consumer_status_channel { #ifdef HAVE_LIBLTTNG_UST_CTL +struct lttcomm_consumer_close_trace_chunk_reply { + enum lttcomm_return_code ret_code; + uint32_t path_length; + char path[]; +}; + #include /* diff --git a/src/common/trace-chunk.c b/src/common/trace-chunk.c index dbab99d19..e853d7366 100644 --- a/src/common/trace-chunk.c +++ b/src/common/trace-chunk.c @@ -366,12 +366,14 @@ enum lttng_trace_chunk_status lttng_trace_chunk_set_close_timestamp( goto end; } LTTNG_OPTIONAL_SET(&chunk->timestamp_close, close_ts); - free(chunk->name); - chunk->name = generate_chunk_name(LTTNG_OPTIONAL_GET(chunk->id), - LTTNG_OPTIONAL_GET(chunk->timestamp_creation), - &close_ts); - if (!chunk->name) { - status = LTTNG_TRACE_CHUNK_STATUS_ERROR; + if (!chunk->name_overridden) { + free(chunk->name); + chunk->name = generate_chunk_name(LTTNG_OPTIONAL_GET(chunk->id), + LTTNG_OPTIONAL_GET(chunk->timestamp_creation), + &close_ts); + if (!chunk->name) { + status = LTTNG_TRACE_CHUNK_STATUS_ERROR; + } } end: pthread_mutex_unlock(&chunk->lock); diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 2963b16ed..01e27bf66 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -2096,6 +2096,9 @@ end_rotate_channel_nosignal: msg.u.close_trace_chunk.close_command.value; const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value; + struct lttcomm_consumer_close_trace_chunk_reply reply; + char closed_trace_chunk_path[LTTNG_PATH_MAX]; + int ret; ret_code = lttng_consumer_close_trace_chunk( msg.u.close_trace_chunk.relayd_id.is_set ? @@ -2106,8 +2109,19 @@ end_rotate_channel_nosignal: (time_t) msg.u.close_trace_chunk.close_timestamp, msg.u.close_trace_chunk.close_command.is_set ? &close_command : - NULL); - goto end_msg_sessiond; + NULL, closed_trace_chunk_path); + reply.ret_code = ret_code; + reply.path_length = strlen(closed_trace_chunk_path) + 1; + ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply)); + if (ret != sizeof(reply)) { + goto error_fatal; + } + ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path, + reply.path_length); + if (ret != reply.path_length) { + goto error_fatal; + } + goto end_nosignal; } case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS: {