return ret;
}
+/*
+ * relay_close_trace_chunk: close a trace chunk
+ */
+static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn,
+ const struct lttng_buffer_view *payload)
+{
+ int ret = 0;
+ ssize_t send_ret;
+ struct relay_session *session = conn->session;
+ struct lttcomm_relayd_close_trace_chunk *msg;
+ struct lttcomm_relayd_generic_reply reply = {};
+ struct lttng_buffer_view header_view;
+ struct lttng_trace_chunk *chunk = NULL;
+ enum lttng_error_code reply_code = LTTNG_OK;
+ enum lttng_trace_chunk_status chunk_status;
+ uint64_t chunk_id;
+ LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command;
+ time_t close_timestamp;
+
+ if (!session || !conn->version_check_done) {
+ ERR("Trying to close a trace chunk before version check");
+ ret = -1;
+ goto end_no_reply;
+ }
+
+ if (session->major == 2 && session->minor < 11) {
+ ERR("Chunk close command is unsupported before 2.11");
+ ret = -1;
+ goto end_no_reply;
+ }
+
+ header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg));
+ if (!header_view.data) {
+ ERR("Failed to receive payload of chunk close command");
+ ret = -1;
+ goto end_no_reply;
+ }
+
+ /* Convert to host endianness. */
+ msg = (typeof(msg)) header_view.data;
+ chunk_id = be64toh(msg->chunk_id);
+ close_timestamp = (time_t) be64toh(msg->close_timestamp);
+ close_command = (typeof(close_command)){
+ .value = be32toh(msg->close_command.value),
+ .is_set = msg->close_command.is_set,
+ };
+
+ chunk = sessiond_trace_chunk_registry_get_chunk(
+ sessiond_trace_chunk_registry,
+ conn->session->sessiond_uuid,
+ conn->session->id,
+ chunk_id);
+ if (!chunk) {
+ char uuid_str[UUID_STR_LEN];
+
+ lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str);
+ ERR("Failed to find chunk to close: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+ uuid_str,
+ conn->session->id,
+ msg->chunk_id);
+ ret = -1;
+ reply_code = LTTNG_ERR_NOMEM;
+ goto end;
+ }
+
+ chunk_status = lttng_trace_chunk_set_close_timestamp(
+ chunk, close_timestamp);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to set trace chunk close timestamp");
+ ret = -1;
+ reply_code = LTTNG_ERR_UNK;
+ goto end;
+ }
+
+ if (close_command.is_set) {
+ chunk_status = lttng_trace_chunk_set_close_command(
+ chunk, close_command.value);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
+ reply_code = LTTNG_ERR_INVALID;
+ goto end;
+ }
+ }
+
+end:
+ reply.ret_code = htobe32((uint32_t) reply_code);
+ send_ret = conn->sock->ops->sendmsg(conn->sock,
+ &reply,
+ sizeof(struct lttcomm_relayd_generic_reply),
+ 0);
+ if (send_ret < (ssize_t) sizeof(reply)) {
+ ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)",
+ send_ret);
+ ret = -1;
+ }
+end_no_reply:
+ lttng_trace_chunk_put(chunk);
+ return ret;
+}
+
#define DBG_CMD(cmd_name, conn) \
DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd);
DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn);
ret = relay_create_trace_chunk(header, conn, payload);
break;
+ case RELAYD_CLOSE_TRACE_CHUNK:
+ DBG_CMD("RELAYD_CLOSE_TRACE_CHUNK", conn);
+ ret = relay_close_trace_chunk(header, conn, payload);
+ break;
case RELAYD_UPDATE_SYNC_INFO:
default:
ERR("Received unknown command (%u)", header->cmd);
}
}
- if (session_close_trace_chunk(session, session->current_trace_chunk)) {
+ if (session_close_trace_chunk(
+ session, session->current_trace_chunk, NULL)) {
/*
* Don't goto end; make sure the chunk is closed for the session
* to allow future snapshots.
&ongoing_rotation_chunk_id);
assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
- chunk_status = lttng_trace_chunk_set_close_command(
- chunk_being_archived,
- LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
- if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
- cmd_ret = LTTNG_ERR_FATAL;
- goto error;
- }
-
if (session->kernel_session) {
cmd_ret = kernel_rotate_session(session);
if (cmd_ret != LTTNG_OK) {
}
}
- ret = session_close_trace_chunk(session, chunk_being_archived);
+ ret = session_close_trace_chunk(session, chunk_being_archived,
+ &((enum lttng_trace_chunk_command_type) {
+ LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED}));
if (ret) {
cmd_ret = LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER;
goto error;
int ret;
enum lttng_trace_chunk_status chunk_status;
struct lttcomm_consumer_msg msg = {
- .cmd_type = LTTNG_CONSUMER_CLOSE_TRACE_CHUNK,
- .u.close_trace_chunk.session_id = session_id,
+ .cmd_type = LTTNG_CONSUMER_CLOSE_TRACE_CHUNK,
+ .u.close_trace_chunk.session_id = session_id,
};
uint64_t chunk_id;
time_t close_timestamp;
+ enum lttng_trace_chunk_command_type close_command;
+ const char *close_command_name = "none";
assert(socket);
if (relayd_id != -1ULL) {
- LTTNG_OPTIONAL_SET(&msg.u.close_trace_chunk.relayd_id,
- relayd_id);
+ LTTNG_OPTIONAL_SET(
+ &msg.u.close_trace_chunk.relayd_id, relayd_id);
+ }
+
+ chunk_status = lttng_trace_chunk_get_close_command(
+ chunk, &close_command);
+ switch (chunk_status) {
+ case LTTNG_TRACE_CHUNK_STATUS_OK:
+ LTTNG_OPTIONAL_SET(&msg.u.close_trace_chunk.close_command,
+ (uint32_t) close_command);
+ break;
+ case LTTNG_TRACE_CHUNK_STATUS_NONE:
+ break;
+ default:
+ ERR("Failed to get trace chunk close command");
+ ret = -1;
+ goto error;
}
chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id);
assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
msg.u.close_trace_chunk.close_timestamp = (uint64_t) close_timestamp;
+ if (msg.u.close_trace_chunk.close_command.is_set) {
+ close_command_name = lttng_trace_chunk_command_type_get_name(
+ close_command);
+ }
DBG("Sending consumer close trace chunk command: relayd_id = %" PRId64
- ", session_id = %" PRIu64
- ", chunk_id = %" PRIu64,
- relayd_id, session_id, chunk_id);
+ ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
+ ", close command = \"%s\"",
+ relayd_id, session_id, chunk_id, close_command_name);
health_code_update();
ret = consumer_send_msg(socket, &msg);
}
int session_close_trace_chunk(const struct ltt_session *session,
- struct lttng_trace_chunk *trace_chunk)
+ struct lttng_trace_chunk *trace_chunk,
+ const enum lttng_trace_chunk_command_type *close_command)
{
int ret = 0;
bool error_occurred = false;
enum lttng_trace_chunk_status chunk_status;
const time_t chunk_close_timestamp = time(NULL);
+ if (close_command) {
+ chunk_status = lttng_trace_chunk_set_close_command(
+ trace_chunk, *close_command);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
+ goto end;
+ }
+ }
+
if (chunk_close_timestamp == (time_t) -1) {
ERR("Failed to sample the close timestamp of the current trace chunk of session \"%s\"",
session->name);
session_notify_destruction(session);
lttng_dynamic_array_reset(&session->destroy_notifiers);
if (session->current_trace_chunk) {
- ret = session_close_trace_chunk(session, session->current_trace_chunk);
+ ret = session_close_trace_chunk(session, session->current_trace_chunk, NULL);
if (ret) {
ERR("Failed to close the current trace chunk of session \"%s\" during its release",
session->name);
* ltt_session itself.
*/
int session_close_trace_chunk(const struct ltt_session *session,
- struct lttng_trace_chunk *trace_chunk);
+ struct lttng_trace_chunk *trace_chunk,
+ const enum lttng_trace_chunk_command_type *close_command);
#endif /* _LTT_SESSION_H */
DBG("Failed to set new trace chunk on existing channels, rolling back");
close_ret = lttng_consumer_close_trace_chunk(relayd_id,
session_id, chunk_id,
- chunk_creation_timestamp);
+ chunk_creation_timestamp, NULL);
if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
session_id, chunk_id);
close_ret = lttng_consumer_close_trace_chunk(relayd_id,
session_id,
chunk_id,
- chunk_creation_timestamp);
+ chunk_creation_timestamp,
+ NULL);
if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
session_id,
enum lttcomm_return_code lttng_consumer_close_trace_chunk(
const uint64_t *relayd_id, uint64_t session_id,
- uint64_t chunk_id, time_t chunk_close_timestamp)
+ uint64_t chunk_id, time_t chunk_close_timestamp,
+ const enum lttng_trace_chunk_command_type *close_command)
{
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct lttng_trace_chunk *chunk;
char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
const char *relayd_id_str = "(none)";
+ const char *close_command_name = "none";
struct lttng_ht_iter iter;
struct lttng_consumer_channel *channel;
enum lttng_trace_chunk_status chunk_status;
} else {
relayd_id_str = "(formatting error)";
}
- }
+ }
+ if (close_command) {
+ close_command_name = lttng_trace_chunk_command_type_get_name(
+ *close_command);
+ }
DBG("Consumer close trace chunk command: relayd_id = %s"
- ", session_id = %" PRIu64
- ", chunk_id = %" PRIu64, relayd_id_str,
- session_id, chunk_id);
+ ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
+ ", close command = %s",
+ relayd_id_str, session_id, chunk_id,
+ close_command_name);
+
chunk = lttng_trace_chunk_registry_find_chunk(
- consumer_data.chunk_registry, session_id,
- chunk_id);
- if (!chunk) {
+ consumer_data.chunk_registry, session_id, chunk_id);
+ if (!chunk) {
ERR("Failed to find chunk: session_id = %" PRIu64
", chunk_id = %" PRIu64,
session_id, chunk_id);
ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
goto end;
}
- /*
- * Release the reference returned by the "find" operation and
- * the session daemon's implicit reference to the chunk.
- */
- lttng_trace_chunk_put(chunk);
- lttng_trace_chunk_put(chunk);
+
+ if (close_command) {
+ chunk_status = lttng_trace_chunk_set_close_command(
+ chunk, *close_command);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+ }
/*
* chunk is now invalid to access as we no longer hold a reference to
ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
}
}
+
+ if (relayd_id) {
+ int ret;
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(*relayd_id);
+ if (relayd) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_close_trace_chunk(
+ &relayd->control_sock, chunk);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ } else {
+ ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64,
+ *relayd_id);
+ }
+
+ if (!relayd || ret) {
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ goto error_unlock;
+ }
+ }
+error_unlock:
rcu_read_unlock();
end:
+ /*
+ * Release the reference returned by the "find" operation and
+ * the session daemon's implicit reference to the chunk.
+ */
+ lttng_trace_chunk_put(chunk);
+ lttng_trace_chunk_put(chunk);
+
return ret_code;
}
struct lttng_directory_handle *chunk_directory_handle);
enum lttcomm_return_code lttng_consumer_close_trace_chunk(
const uint64_t *relayd_id, uint64_t session_id,
- uint64_t chunk_id, time_t chunk_close_timestamp);
+ uint64_t chunk_id, time_t chunk_close_timestamp,
+ const enum lttng_trace_chunk_command_type *close_command);
enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
const uint64_t *relayd_id, uint64_t session_id,
uint64_t chunk_id);
}
case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
{
+ enum lttng_trace_chunk_command_type close_command =
+ msg.u.close_trace_chunk.close_command.value;
const uint64_t relayd_id =
msg.u.close_trace_chunk.relayd_id.value;
ret_code = lttng_consumer_close_trace_chunk(
msg.u.close_trace_chunk.relayd_id.is_set ?
- &relayd_id : NULL,
+ &relayd_id :
+ NULL,
msg.u.close_trace_chunk.session_id,
msg.u.close_trace_chunk.chunk_id,
- (time_t) msg.u.close_trace_chunk.close_timestamp);
+ (time_t) msg.u.close_trace_chunk.close_timestamp,
+ msg.u.close_trace_chunk.close_command.is_set ?
+ &close_command :
+ NULL);
goto end_msg_sessiond;
}
case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
}
}
- ret = send_command(sock,
- RELAYD_CREATE_TRACE_CHUNK,
- payload.data,
- payload.size,
- 0);
+ ret = send_command(sock, RELAYD_CREATE_TRACE_CHUNK, payload.data,
+ payload.size, 0);
if (ret < 0) {
ERR("Failed to send trace chunk creation command to relay daemon");
goto end;
lttng_dynamic_buffer_reset(&payload);
return ret;
}
+
+int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
+ struct lttng_trace_chunk *chunk)
+{
+ int ret = 0;
+ enum lttng_trace_chunk_status status;
+ struct lttcomm_relayd_close_trace_chunk msg = {};
+ struct lttcomm_relayd_generic_reply reply = {};
+ uint64_t chunk_id;
+ time_t close_timestamp;
+ LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {};
+
+ status = lttng_trace_chunk_get_id(chunk, &chunk_id);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to get trace chunk id");
+ ret = -1;
+ goto end;
+ }
+
+ status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to get trace chunk close timestamp");
+ ret = -1;
+ goto end;
+ }
+
+ status = lttng_trace_chunk_get_close_command(chunk,
+ &close_command.value);
+ switch (status) {
+ case LTTNG_TRACE_CHUNK_STATUS_OK:
+ close_command.is_set = 1;
+ break;
+ case LTTNG_TRACE_CHUNK_STATUS_NONE:
+ break;
+ default:
+ ERR("Failed to get trace chunk close command");
+ ret = -1;
+ goto end;
+ }
+
+ msg = (typeof(msg)){
+ .chunk_id = htobe64(chunk_id),
+ .close_timestamp = htobe64((uint64_t) close_timestamp),
+ .close_command = {
+ .value = htobe32((uint32_t) close_command.value),
+ .is_set = close_command.is_set,
+ },
+ };
+
+ ret = send_command(sock, RELAYD_CLOSE_TRACE_CHUNK, &msg, sizeof(msg),
+ 0);
+ if (ret < 0) {
+ ERR("Failed to send trace chunk close command to relay daemon");
+ goto end;
+ }
+
+ ret = recv_reply(sock, &reply, sizeof(reply));
+ if (ret < 0) {
+ ERR("Failed to receive relay daemon trace chunk close command reply");
+ goto end;
+ }
+
+ reply.ret_code = be32toh(reply.ret_code);
+ if (reply.ret_code != LTTNG_OK) {
+ ret = -1;
+ ERR("Relayd trace chunk close replied error %d",
+ reply.ret_code);
+ } else {
+ ret = 0;
+ DBG("Relayd successfully closed trace chunk: chunk_id = %" PRIu64,
+ chunk_id);
+ }
+end:
+ return ret;
+}
uint64_t new_chunk_id, uint64_t seq_num);
int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
struct lttng_trace_chunk *chunk);
+int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
+ struct lttng_trace_chunk *chunk);
#endif /* _RELAYD_H */
char override_name[];
} LTTNG_PACKED;
+struct lttcomm_relayd_close_trace_chunk {
+ uint64_t chunk_id;
+ /* Seconds since EPOCH. */
+ uint64_t close_timestamp;
+ /* enum lttng_trace_chunk_command_type */
+ LTTNG_OPTIONAL_COMM(uint32_t) LTTNG_PACKED close_command;
+} LTTNG_PACKED;
+
#endif /* _RELAYD_COMM */
RELAYD_ROTATE_STREAM = 18,
/* Ask the relay to create a trace chunk (2.11+) */
RELAYD_CREATE_TRACE_CHUNK = 19,
+ /* Ask the relay to close a trace chunk (2.11+) */
+ RELAYD_CLOSE_TRACE_CHUNK = 20,
};
/*
uint64_t session_id;
uint64_t chunk_id;
uint64_t close_timestamp;
+ /* enum lttng_trace_chunk_command_type */
+ LTTNG_OPTIONAL_COMM(uint32_t) LTTNG_PACKED close_command;
} LTTNG_PACKED close_trace_chunk;
struct {
LTTNG_OPTIONAL_COMM(uint64_t) LTTNG_PACKED relayd_id;
LTTNG_OPTIONAL_GET(trace_chunk->timestamp_close);
LTTNG_OPTIONAL(struct lttng_directory_handle) archived_chunks_directory;
- assert(trace_chunk->mode.is_set);
+ if (!trace_chunk->mode.is_set ||
+ trace_chunk->mode.value != TRACE_CHUNK_MODE_OWNER ||
+ !trace_chunk->session_output_directory.is_set) {
+ /*
+ * This command doesn't need to run if the output is remote
+ * or if the trace chunk is not owned by this process.
+ */
+ goto end;
+ }
+
assert(trace_chunk->mode.value == TRACE_CHUNK_MODE_OWNER);
assert(!trace_chunk->name_overriden);
}
}
+LTTNG_HIDDEN
+enum lttng_trace_chunk_status lttng_trace_chunk_get_close_command(
+ struct lttng_trace_chunk *chunk,
+ enum lttng_trace_chunk_command_type *command_type)
+{
+ enum lttng_trace_chunk_status status = LTTNG_TRACE_CHUNK_STATUS_OK;
+
+ pthread_mutex_lock(&chunk->lock);
+ if (chunk->close_command.is_set) {
+ *command_type = chunk->close_command.value;
+ status = LTTNG_TRACE_CHUNK_STATUS_OK;
+ } else {
+ status = LTTNG_TRACE_CHUNK_STATUS_NONE;
+ }
+ pthread_mutex_unlock(&chunk->lock);
+ return status;
+}
+
LTTNG_HIDDEN
enum lttng_trace_chunk_status lttng_trace_chunk_set_close_command(
struct lttng_trace_chunk *chunk,
return status;
}
+LTTNG_HIDDEN
+const char *lttng_trace_chunk_command_type_get_name(
+ enum lttng_trace_chunk_command_type command)
+{
+ switch (command) {
+ case LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED:
+ return "move to completed trace chunk folder";
+ default:
+ abort();
+ }
+}
+
LTTNG_HIDDEN
bool lttng_trace_chunk_get(struct lttng_trace_chunk *chunk)
{
int lttng_trace_chunk_unlink_file(struct lttng_trace_chunk *chunk,
const char *filename);
+LTTNG_HIDDEN
+enum lttng_trace_chunk_status lttng_trace_chunk_get_close_command(
+ struct lttng_trace_chunk *chunk,
+ enum lttng_trace_chunk_command_type *command_type);
+
LTTNG_HIDDEN
enum lttng_trace_chunk_status lttng_trace_chunk_set_close_command(
struct lttng_trace_chunk *chunk,
enum lttng_trace_chunk_command_type command_type);
+LTTNG_HIDDEN
+const char *lttng_trace_chunk_command_type_get_name(
+ enum lttng_trace_chunk_command_type command);
+
/* Returns true on success. */
LTTNG_HIDDEN
bool lttng_trace_chunk_get(struct lttng_trace_chunk *chunk);
}
case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
{
+ enum lttng_trace_chunk_command_type close_command =
+ msg.u.close_trace_chunk.close_command.value;
const uint64_t relayd_id =
msg.u.close_trace_chunk.relayd_id.value;
ret_code = lttng_consumer_close_trace_chunk(
msg.u.close_trace_chunk.relayd_id.is_set ?
- &relayd_id : NULL,
+ &relayd_id :
+ NULL,
msg.u.close_trace_chunk.session_id,
msg.u.close_trace_chunk.chunk_id,
- (time_t) msg.u.close_trace_chunk.close_timestamp);
+ (time_t) msg.u.close_trace_chunk.close_timestamp,
+ msg.u.close_trace_chunk.close_command.is_set ?
+ &close_command :
+ NULL);
goto end_msg_sessiond;
}
case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS: