From bbc4768c20f1c552222e1746f9475d145d7bf04e Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Thu, 18 Jul 2019 15:32:57 -0400 Subject: [PATCH] relayd: add remote trace chunk close command MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérémie Galarneau --- src/bin/lttng-relayd/main.c | 105 +++++++++++++++++++ src/bin/lttng-sessiond/cmd.c | 15 +-- src/bin/lttng-sessiond/consumer.c | 35 +++++-- src/bin/lttng-sessiond/session.c | 14 ++- src/bin/lttng-sessiond/session.h | 3 +- src/common/consumer/consumer.c | 72 ++++++++++--- src/common/consumer/consumer.h | 3 +- src/common/kernel-consumer/kernel-consumer.c | 10 +- src/common/relayd/relayd.c | 82 ++++++++++++++- src/common/relayd/relayd.h | 2 + src/common/sessiond-comm/relayd.h | 8 ++ src/common/sessiond-comm/sessiond-comm.h | 4 + src/common/trace-chunk.c | 41 +++++++- src/common/trace-chunk.h | 9 ++ src/common/ust-consumer/ust-consumer.c | 10 +- 15 files changed, 366 insertions(+), 47 deletions(-) diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 11277f293..b5ae34ebf 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -2853,6 +2853,107 @@ end_no_reply: return ret; } +/* + * relay_close_trace_chunk: close a trace chunk + */ +static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) +{ + int ret = 0; + ssize_t send_ret; + struct relay_session *session = conn->session; + struct lttcomm_relayd_close_trace_chunk *msg; + struct lttcomm_relayd_generic_reply reply = {}; + struct lttng_buffer_view header_view; + struct lttng_trace_chunk *chunk = NULL; + enum lttng_error_code reply_code = LTTNG_OK; + enum lttng_trace_chunk_status chunk_status; + uint64_t chunk_id; + LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command; + time_t close_timestamp; + + if (!session || !conn->version_check_done) { + ERR("Trying to close a trace chunk before version check"); + ret = -1; + goto end_no_reply; + } + + if (session->major == 2 && session->minor < 11) { + ERR("Chunk close command is unsupported before 2.11"); + ret = -1; + goto end_no_reply; + } + + header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg)); + if (!header_view.data) { + ERR("Failed to receive payload of chunk close command"); + ret = -1; + goto end_no_reply; + } + + /* Convert to host endianness. */ + msg = (typeof(msg)) header_view.data; + chunk_id = be64toh(msg->chunk_id); + close_timestamp = (time_t) be64toh(msg->close_timestamp); + close_command = (typeof(close_command)){ + .value = be32toh(msg->close_command.value), + .is_set = msg->close_command.is_set, + }; + + chunk = sessiond_trace_chunk_registry_get_chunk( + sessiond_trace_chunk_registry, + conn->session->sessiond_uuid, + conn->session->id, + chunk_id); + if (!chunk) { + char uuid_str[UUID_STR_LEN]; + + lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str); + ERR("Failed to find chunk to close: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64, + uuid_str, + conn->session->id, + msg->chunk_id); + ret = -1; + reply_code = LTTNG_ERR_NOMEM; + goto end; + } + + chunk_status = lttng_trace_chunk_set_close_timestamp( + chunk, close_timestamp); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to set trace chunk close timestamp"); + ret = -1; + reply_code = LTTNG_ERR_UNK; + goto end; + } + + if (close_command.is_set) { + chunk_status = lttng_trace_chunk_set_close_command( + chunk, close_command.value); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -1; + reply_code = LTTNG_ERR_INVALID; + goto end; + } + } + +end: + reply.ret_code = htobe32((uint32_t) reply_code); + send_ret = conn->sock->ops->sendmsg(conn->sock, + &reply, + sizeof(struct lttcomm_relayd_generic_reply), + 0); + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)", + send_ret); + ret = -1; + } +end_no_reply: + lttng_trace_chunk_put(chunk); + return ret; +} + #define DBG_CMD(cmd_name, conn) \ DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd); @@ -2923,6 +3024,10 @@ static int relay_process_control_command(struct relay_connection *conn, DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn); ret = relay_create_trace_chunk(header, conn, payload); break; + case RELAYD_CLOSE_TRACE_CHUNK: + DBG_CMD("RELAYD_CLOSE_TRACE_CHUNK", conn); + ret = relay_close_trace_chunk(header, conn, payload); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", header->cmd); diff --git a/src/bin/lttng-sessiond/cmd.c b/src/bin/lttng-sessiond/cmd.c index d26d8de0b..6e5128917 100644 --- a/src/bin/lttng-sessiond/cmd.c +++ b/src/bin/lttng-sessiond/cmd.c @@ -4504,7 +4504,8 @@ enum lttng_error_code snapshot_record(struct ltt_session *session, } } - if (session_close_trace_chunk(session, session->current_trace_chunk)) { + if (session_close_trace_chunk( + session, session->current_trace_chunk, NULL)) { /* * Don't goto end; make sure the chunk is closed for the session * to allow future snapshots. @@ -4750,14 +4751,6 @@ int cmd_rotate_session(struct ltt_session *session, &ongoing_rotation_chunk_id); assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); - chunk_status = lttng_trace_chunk_set_close_command( - chunk_being_archived, - LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED); - if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { - cmd_ret = LTTNG_ERR_FATAL; - goto error; - } - if (session->kernel_session) { cmd_ret = kernel_rotate_session(session); if (cmd_ret != LTTNG_OK) { @@ -4771,7 +4764,9 @@ int cmd_rotate_session(struct ltt_session *session, } } - ret = session_close_trace_chunk(session, chunk_being_archived); + ret = session_close_trace_chunk(session, chunk_being_archived, + &((enum lttng_trace_chunk_command_type) { + LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED})); if (ret) { cmd_ret = LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER; goto error; diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 1bbee02f2..155f2b053 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -1845,17 +1845,34 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, int ret; enum lttng_trace_chunk_status chunk_status; struct lttcomm_consumer_msg msg = { - .cmd_type = LTTNG_CONSUMER_CLOSE_TRACE_CHUNK, - .u.close_trace_chunk.session_id = session_id, + .cmd_type = LTTNG_CONSUMER_CLOSE_TRACE_CHUNK, + .u.close_trace_chunk.session_id = session_id, }; uint64_t chunk_id; time_t close_timestamp; + enum lttng_trace_chunk_command_type close_command; + const char *close_command_name = "none"; assert(socket); if (relayd_id != -1ULL) { - LTTNG_OPTIONAL_SET(&msg.u.close_trace_chunk.relayd_id, - relayd_id); + LTTNG_OPTIONAL_SET( + &msg.u.close_trace_chunk.relayd_id, relayd_id); + } + + chunk_status = lttng_trace_chunk_get_close_command( + chunk, &close_command); + switch (chunk_status) { + case LTTNG_TRACE_CHUNK_STATUS_OK: + LTTNG_OPTIONAL_SET(&msg.u.close_trace_chunk.close_command, + (uint32_t) close_command); + break; + case LTTNG_TRACE_CHUNK_STATUS_NONE: + break; + default: + ERR("Failed to get trace chunk close command"); + ret = -1; + goto error; } chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id); @@ -1877,10 +1894,14 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); msg.u.close_trace_chunk.close_timestamp = (uint64_t) close_timestamp; + if (msg.u.close_trace_chunk.close_command.is_set) { + close_command_name = lttng_trace_chunk_command_type_get_name( + close_command); + } DBG("Sending consumer close trace chunk command: relayd_id = %" PRId64 - ", session_id = %" PRIu64 - ", chunk_id = %" PRIu64, - relayd_id, session_id, chunk_id); + ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 + ", close command = \"%s\"", + relayd_id, session_id, chunk_id, close_command_name); health_code_update(); ret = consumer_send_msg(socket, &msg); diff --git a/src/bin/lttng-sessiond/session.c b/src/bin/lttng-sessiond/session.c index 61cbb0c1a..4b5c22bfb 100644 --- a/src/bin/lttng-sessiond/session.c +++ b/src/bin/lttng-sessiond/session.c @@ -677,7 +677,8 @@ error: } int session_close_trace_chunk(const struct ltt_session *session, - struct lttng_trace_chunk *trace_chunk) + struct lttng_trace_chunk *trace_chunk, + const enum lttng_trace_chunk_command_type *close_command) { int ret = 0; bool error_occurred = false; @@ -686,6 +687,15 @@ int session_close_trace_chunk(const struct ltt_session *session, enum lttng_trace_chunk_status chunk_status; const time_t chunk_close_timestamp = time(NULL); + if (close_command) { + chunk_status = lttng_trace_chunk_set_close_command( + trace_chunk, *close_command); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -1; + goto end; + } + } + if (chunk_close_timestamp == (time_t) -1) { ERR("Failed to sample the close timestamp of the current trace chunk of session \"%s\"", session->name); @@ -784,7 +794,7 @@ void session_release(struct urcu_ref *ref) session_notify_destruction(session); lttng_dynamic_array_reset(&session->destroy_notifiers); if (session->current_trace_chunk) { - ret = session_close_trace_chunk(session, session->current_trace_chunk); + ret = session_close_trace_chunk(session, session->current_trace_chunk, NULL); if (ret) { ERR("Failed to close the current trace chunk of session \"%s\" during its release", session->name); diff --git a/src/bin/lttng-sessiond/session.h b/src/bin/lttng-sessiond/session.h index 0b4746cb5..0750e1bcd 100644 --- a/src/bin/lttng-sessiond/session.h +++ b/src/bin/lttng-sessiond/session.h @@ -245,6 +245,7 @@ int session_set_trace_chunk(struct ltt_session *session, * ltt_session itself. */ int session_close_trace_chunk(const struct ltt_session *session, - struct lttng_trace_chunk *trace_chunk); + struct lttng_trace_chunk *trace_chunk, + const enum lttng_trace_chunk_command_type *close_command); #endif /* _LTT_SESSION_H */ diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index dea7b7f51..3896875f5 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -4497,7 +4497,7 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( DBG("Failed to set new trace chunk on existing channels, rolling back"); close_ret = lttng_consumer_close_trace_chunk(relayd_id, session_id, chunk_id, - chunk_creation_timestamp); + chunk_creation_timestamp, NULL); if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64, session_id, chunk_id); @@ -4527,7 +4527,8 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( close_ret = lttng_consumer_close_trace_chunk(relayd_id, session_id, chunk_id, - chunk_creation_timestamp); + chunk_creation_timestamp, + NULL); if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64, session_id, @@ -4548,12 +4549,14 @@ end: enum lttcomm_return_code lttng_consumer_close_trace_chunk( const uint64_t *relayd_id, uint64_t session_id, - uint64_t chunk_id, time_t chunk_close_timestamp) + uint64_t chunk_id, time_t chunk_close_timestamp, + const enum lttng_trace_chunk_command_type *close_command) { enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttng_trace_chunk *chunk; char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)]; const char *relayd_id_str = "(none)"; + const char *close_command_name = "none"; struct lttng_ht_iter iter; struct lttng_consumer_channel *channel; enum lttng_trace_chunk_status chunk_status; @@ -4569,16 +4572,21 @@ enum lttcomm_return_code lttng_consumer_close_trace_chunk( } else { relayd_id_str = "(formatting error)"; } - } + } + if (close_command) { + close_command_name = lttng_trace_chunk_command_type_get_name( + *close_command); + } DBG("Consumer close trace chunk command: relayd_id = %s" - ", session_id = %" PRIu64 - ", chunk_id = %" PRIu64, relayd_id_str, - session_id, chunk_id); + ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 + ", close command = %s", + relayd_id_str, session_id, chunk_id, + close_command_name); + chunk = lttng_trace_chunk_registry_find_chunk( - consumer_data.chunk_registry, session_id, - chunk_id); - if (!chunk) { + consumer_data.chunk_registry, session_id, chunk_id); + if (!chunk) { ERR("Failed to find chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64, session_id, chunk_id); @@ -4592,12 +4600,15 @@ enum lttcomm_return_code lttng_consumer_close_trace_chunk( ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; goto end; } - /* - * Release the reference returned by the "find" operation and - * the session daemon's implicit reference to the chunk. - */ - lttng_trace_chunk_put(chunk); - lttng_trace_chunk_put(chunk); + + if (close_command) { + chunk_status = lttng_trace_chunk_set_close_command( + chunk, *close_command); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; + goto end; + } + } /* * chunk is now invalid to access as we no longer hold a reference to @@ -4628,8 +4639,37 @@ enum lttcomm_return_code lttng_consumer_close_trace_chunk( ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; } } + + if (relayd_id) { + int ret; + struct consumer_relayd_sock_pair *relayd; + + relayd = consumer_find_relayd(*relayd_id); + if (relayd) { + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_close_trace_chunk( + &relayd->control_sock, chunk); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } else { + ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, + *relayd_id); + } + + if (!relayd || ret) { + ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; + goto error_unlock; + } + } +error_unlock: rcu_read_unlock(); end: + /* + * Release the reference returned by the "find" operation and + * the session daemon's implicit reference to the chunk. + */ + lttng_trace_chunk_put(chunk); + lttng_trace_chunk_put(chunk); + return ret_code; } diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index f514aba71..a0b81c6fa 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -844,7 +844,8 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( struct lttng_directory_handle *chunk_directory_handle); enum lttcomm_return_code lttng_consumer_close_trace_chunk( const uint64_t *relayd_id, uint64_t session_id, - uint64_t chunk_id, time_t chunk_close_timestamp); + uint64_t chunk_id, time_t chunk_close_timestamp, + const enum lttng_trace_chunk_command_type *close_command); enum lttcomm_return_code lttng_consumer_trace_chunk_exists( const uint64_t *relayd_id, uint64_t session_id, uint64_t chunk_id); diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index edf8acb82..29ccd0f09 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1239,15 +1239,21 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK: { + enum lttng_trace_chunk_command_type close_command = + msg.u.close_trace_chunk.close_command.value; const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value; ret_code = lttng_consumer_close_trace_chunk( msg.u.close_trace_chunk.relayd_id.is_set ? - &relayd_id : NULL, + &relayd_id : + NULL, msg.u.close_trace_chunk.session_id, msg.u.close_trace_chunk.chunk_id, - (time_t) msg.u.close_trace_chunk.close_timestamp); + (time_t) msg.u.close_trace_chunk.close_timestamp, + msg.u.close_trace_chunk.close_command.is_set ? + &close_command : + NULL); goto end_msg_sessiond; } case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS: diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index b065f364e..830c7c263 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -1252,11 +1252,8 @@ int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock, } } - ret = send_command(sock, - RELAYD_CREATE_TRACE_CHUNK, - payload.data, - payload.size, - 0); + ret = send_command(sock, RELAYD_CREATE_TRACE_CHUNK, payload.data, + payload.size, 0); if (ret < 0) { ERR("Failed to send trace chunk creation command to relay daemon"); goto end; @@ -1283,3 +1280,78 @@ end: lttng_dynamic_buffer_reset(&payload); return ret; } + +int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock, + struct lttng_trace_chunk *chunk) +{ + int ret = 0; + enum lttng_trace_chunk_status status; + struct lttcomm_relayd_close_trace_chunk msg = {}; + struct lttcomm_relayd_generic_reply reply = {}; + uint64_t chunk_id; + time_t close_timestamp; + LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {}; + + status = lttng_trace_chunk_get_id(chunk, &chunk_id); + if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to get trace chunk id"); + ret = -1; + goto end; + } + + status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp); + if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to get trace chunk close timestamp"); + ret = -1; + goto end; + } + + status = lttng_trace_chunk_get_close_command(chunk, + &close_command.value); + switch (status) { + case LTTNG_TRACE_CHUNK_STATUS_OK: + close_command.is_set = 1; + break; + case LTTNG_TRACE_CHUNK_STATUS_NONE: + break; + default: + ERR("Failed to get trace chunk close command"); + ret = -1; + goto end; + } + + msg = (typeof(msg)){ + .chunk_id = htobe64(chunk_id), + .close_timestamp = htobe64((uint64_t) close_timestamp), + .close_command = { + .value = htobe32((uint32_t) close_command.value), + .is_set = close_command.is_set, + }, + }; + + ret = send_command(sock, RELAYD_CLOSE_TRACE_CHUNK, &msg, sizeof(msg), + 0); + if (ret < 0) { + ERR("Failed to send trace chunk close command to relay daemon"); + goto end; + } + + ret = recv_reply(sock, &reply, sizeof(reply)); + if (ret < 0) { + ERR("Failed to receive relay daemon trace chunk close command reply"); + goto end; + } + + reply.ret_code = be32toh(reply.ret_code); + if (reply.ret_code != LTTNG_OK) { + ret = -1; + ERR("Relayd trace chunk close replied error %d", + reply.ret_code); + } else { + ret = 0; + DBG("Relayd successfully closed trace chunk: chunk_id = %" PRIu64, + chunk_id); + } +end: + return ret; +} diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index 695ef6ffa..eb7782de0 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -61,5 +61,7 @@ int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id, uint64_t new_chunk_id, uint64_t seq_num); int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock, struct lttng_trace_chunk *chunk); +int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock, + struct lttng_trace_chunk *chunk); #endif /* _RELAYD_H */ diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h index f86dde3d1..1568f5113 100644 --- a/src/common/sessiond-comm/relayd.h +++ b/src/common/sessiond-comm/relayd.h @@ -246,4 +246,12 @@ struct lttcomm_relayd_create_trace_chunk { char override_name[]; } LTTNG_PACKED; +struct lttcomm_relayd_close_trace_chunk { + uint64_t chunk_id; + /* Seconds since EPOCH. */ + uint64_t close_timestamp; + /* enum lttng_trace_chunk_command_type */ + LTTNG_OPTIONAL_COMM(uint32_t) LTTNG_PACKED close_command; +} LTTNG_PACKED; + #endif /* _RELAYD_COMM */ diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index d5009740e..aeefdaaa6 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -135,6 +135,8 @@ enum lttcomm_relayd_command { RELAYD_ROTATE_STREAM = 18, /* Ask the relay to create a trace chunk (2.11+) */ RELAYD_CREATE_TRACE_CHUNK = 19, + /* Ask the relay to close a trace chunk (2.11+) */ + RELAYD_CLOSE_TRACE_CHUNK = 20, }; /* @@ -645,6 +647,8 @@ struct lttcomm_consumer_msg { uint64_t session_id; uint64_t chunk_id; uint64_t close_timestamp; + /* enum lttng_trace_chunk_command_type */ + LTTNG_OPTIONAL_COMM(uint32_t) LTTNG_PACKED close_command; } LTTNG_PACKED close_trace_chunk; struct { LTTNG_OPTIONAL_COMM(uint64_t) LTTNG_PACKED relayd_id; diff --git a/src/common/trace-chunk.c b/src/common/trace-chunk.c index 488d7ebc4..3f472b435 100644 --- a/src/common/trace-chunk.c +++ b/src/common/trace-chunk.c @@ -829,7 +829,16 @@ void lttng_trace_chunk_move_to_completed(struct lttng_trace_chunk *trace_chunk) LTTNG_OPTIONAL_GET(trace_chunk->timestamp_close); LTTNG_OPTIONAL(struct lttng_directory_handle) archived_chunks_directory; - assert(trace_chunk->mode.is_set); + if (!trace_chunk->mode.is_set || + trace_chunk->mode.value != TRACE_CHUNK_MODE_OWNER || + !trace_chunk->session_output_directory.is_set) { + /* + * This command doesn't need to run if the output is remote + * or if the trace chunk is not owned by this process. + */ + goto end; + } + assert(trace_chunk->mode.value == TRACE_CHUNK_MODE_OWNER); assert(!trace_chunk->name_overriden); @@ -949,6 +958,24 @@ end: } } +LTTNG_HIDDEN +enum lttng_trace_chunk_status lttng_trace_chunk_get_close_command( + struct lttng_trace_chunk *chunk, + enum lttng_trace_chunk_command_type *command_type) +{ + enum lttng_trace_chunk_status status = LTTNG_TRACE_CHUNK_STATUS_OK; + + pthread_mutex_lock(&chunk->lock); + if (chunk->close_command.is_set) { + *command_type = chunk->close_command.value; + status = LTTNG_TRACE_CHUNK_STATUS_OK; + } else { + status = LTTNG_TRACE_CHUNK_STATUS_NONE; + } + pthread_mutex_unlock(&chunk->lock); + return status; +} + LTTNG_HIDDEN enum lttng_trace_chunk_status lttng_trace_chunk_set_close_command( struct lttng_trace_chunk *chunk, @@ -977,6 +1004,18 @@ end_unlock: return status; } +LTTNG_HIDDEN +const char *lttng_trace_chunk_command_type_get_name( + enum lttng_trace_chunk_command_type command) +{ + switch (command) { + case LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED: + return "move to completed trace chunk folder"; + default: + abort(); + } +} + LTTNG_HIDDEN bool lttng_trace_chunk_get(struct lttng_trace_chunk *chunk) { diff --git a/src/common/trace-chunk.h b/src/common/trace-chunk.h index a318341b5..474e9e000 100644 --- a/src/common/trace-chunk.h +++ b/src/common/trace-chunk.h @@ -153,11 +153,20 @@ LTTNG_HIDDEN int lttng_trace_chunk_unlink_file(struct lttng_trace_chunk *chunk, const char *filename); +LTTNG_HIDDEN +enum lttng_trace_chunk_status lttng_trace_chunk_get_close_command( + struct lttng_trace_chunk *chunk, + enum lttng_trace_chunk_command_type *command_type); + LTTNG_HIDDEN enum lttng_trace_chunk_status lttng_trace_chunk_set_close_command( struct lttng_trace_chunk *chunk, enum lttng_trace_chunk_command_type command_type); +LTTNG_HIDDEN +const char *lttng_trace_chunk_command_type_get_name( + enum lttng_trace_chunk_command_type command); + /* Returns true on success. */ LTTNG_HIDDEN bool lttng_trace_chunk_get(struct lttng_trace_chunk *chunk); diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 0856cf0b9..0d61b866f 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -2080,15 +2080,21 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK: { + enum lttng_trace_chunk_command_type close_command = + msg.u.close_trace_chunk.close_command.value; const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value; ret_code = lttng_consumer_close_trace_chunk( msg.u.close_trace_chunk.relayd_id.is_set ? - &relayd_id : NULL, + &relayd_id : + NULL, msg.u.close_trace_chunk.session_id, msg.u.close_trace_chunk.chunk_id, - (time_t) msg.u.close_trace_chunk.close_timestamp); + (time_t) msg.u.close_trace_chunk.close_timestamp, + msg.u.close_trace_chunk.close_command.is_set ? + &close_command : + NULL); goto end_msg_sessiond; } case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS: -- 2.34.1