int ret = 0;
ssize_t send_ret;
struct relay_session *session = NULL;
- struct lttcomm_relayd_status_session reply = {};
+ struct lttcomm_relayd_create_session_reply_2_11 reply = {};
char session_name[LTTNG_NAME_MAX] = {};
char hostname[LTTNG_HOST_NAME_MAX] = {};
uint32_t live_timer = 0;
LTTNG_OPTIONAL(uint64_t) id_sessiond = {};
LTTNG_OPTIONAL(uint64_t) current_chunk_id = {};
LTTNG_OPTIONAL(time_t) creation_time = {};
+ struct lttng_dynamic_buffer reply_payload;
+
+ lttng_dynamic_buffer_init(&reply_payload);
if (conn->minor < 4) {
/* From 2.1 to 2.3 */
conn->session = session;
DBG("Created session %" PRIu64, session->id);
- reply.session_id = htobe64(session->id);
+ reply.generic.session_id = htobe64(session->id);
send_reply:
if (ret < 0) {
- reply.ret_code = htobe32(LTTNG_ERR_FATAL);
+ reply.generic.ret_code = htobe32(LTTNG_ERR_FATAL);
} else {
- reply.ret_code = htobe32(LTTNG_OK);
+ reply.generic.ret_code = htobe32(LTTNG_OK);
}
- send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
- if (send_ret < (ssize_t) sizeof(reply)) {
- ERR("Failed to send \"create session\" command reply (ret = %zd)",
- send_ret);
+ if (conn->minor < 11) {
+ /* From 2.1 to 2.10 */
+ ret = lttng_dynamic_buffer_append(&reply_payload,
+ &reply.generic, sizeof(reply.generic));
+ if (ret) {
+ ERR("Failed to append \"create session\" command reply header to payload buffer");
+ ret = -1;
+ goto end;
+ }
+ } else {
+ const uint32_t output_path_length =
+ strlen(session->output_path) + 1;
+
+ reply.output_path_length = htobe32(output_path_length);
+ ret = lttng_dynamic_buffer_append(&reply_payload, &reply,
+ sizeof(reply));
+ if (ret) {
+ ERR("Failed to append \"create session\" command reply header to payload buffer");
+ goto end;
+ }
+
+ ret = lttng_dynamic_buffer_append(&reply_payload,
+ session->output_path, output_path_length);
+ if (ret) {
+ ERR("Failed to append \"create session\" command reply path to payload buffer");
+ goto end;
+ }
+ }
+
+ send_ret = conn->sock->ops->sendmsg(conn->sock, reply_payload.data,
+ reply_payload.size, 0);
+ if (send_ret < (ssize_t) reply_payload.size) {
+ ERR("Failed to send \"create session\" command reply of %zu bytes (ret = %zd)",
+ reply_payload.size, send_ret);
ret = -1;
}
+end:
if (ret < 0 && session) {
session_put(session);
}
+ lttng_dynamic_buffer_reset(&reply_payload);
return ret;
}
} else {
chunk_id_str = chunk_id_buf;
}
+ session->has_rotated = true;
}
DBG("Rotate %" PRIu32 " streams of session \"%s\" to chunk \"%s\"",
struct lttng_directory_handle *handle)
{
int ret;
- /*
- * session_directory:
- *
- * if base_path is \0'
- * hostname/session_name
- * else
- * hostname/base_path
- */
- char *session_directory = NULL;
/*
* relayd_output_path/session_directory
* e.g. /home/user/lttng-traces/hostname/session_name
*/
char *full_session_path = NULL;
- /*
- * If base path is set, it overrides the session name for the
- * session relative base path. No timestamp is appended if the
- * base path is overridden.
- *
- * If the session name already contains the creation time (e.g.
- * auto-<timestamp>, don't append yet another timestamp after
- * the session name in the generated path.
- *
- * Otherwise, generate the path with session_name-<timestamp>.
- */
- if (session->base_path[0] != '\0') {
- pthread_mutex_lock(&session->lock);
- ret = asprintf(&session_directory, "%s/%s", session->hostname,
- session->base_path);
- pthread_mutex_unlock(&session->lock);
- } else if (session->session_name_contains_creation_time) {
- pthread_mutex_lock(&session->lock);
- ret = asprintf(&session_directory, "%s/%s", session->hostname,
- session->session_name);
- pthread_mutex_unlock(&session->lock);
- } else {
- char session_creation_datetime[16];
- size_t strftime_ret;
- struct tm *timeinfo;
- time_t creation_time;
-
- /*
- * The 2.11+ protocol guarantees that a creation time
- * is provided for a session. This would indicate a
- * protocol error or an improper use of this util.
- */
- if (!session->creation_time.is_set) {
- ERR("Creation time missing for session \"%s\" (protocol error)",
- session->session_name);
- ret = -1;
- goto end;
- }
- creation_time = LTTNG_OPTIONAL_GET(session->creation_time);
-
- timeinfo = localtime(&creation_time);
- if (!timeinfo) {
- ERR("Failed to get timeinfo while initializing session output directory handle");
- ret = -1;
- goto end;
- }
- strftime_ret = strftime(session_creation_datetime,
- sizeof(session_creation_datetime),
- "%Y%m%d-%H%M%S", timeinfo);
- if (strftime_ret == 0) {
- ERR("Failed to format session creation timestamp while initializing session output directory handle");
- ret = -1;
- goto end;
- }
- pthread_mutex_lock(&session->lock);
- ret = asprintf(&session_directory, "%s/%s-%s",
- session->hostname, session->session_name,
- session_creation_datetime);
- pthread_mutex_unlock(&session->lock);
- }
- if (ret < 0) {
- PERROR("Failed to format session directory name");
- goto end;
- }
-
- full_session_path = create_output_path(session_directory);
+ pthread_mutex_lock(&session->lock);
+ full_session_path = create_output_path(session->output_path);
if (!full_session_path) {
ret = -1;
goto end;
goto end;
}
end:
- free(session_directory);
+ pthread_mutex_unlock(&session->lock);
free(full_session_path);
return ret;
}
ssize_t send_ret;
struct relay_session *session = conn->session;
struct lttcomm_relayd_close_trace_chunk *msg;
- struct lttcomm_relayd_generic_reply reply = {};
+ struct lttcomm_relayd_close_trace_chunk_reply reply = {};
struct lttng_buffer_view header_view;
struct lttng_trace_chunk *chunk = NULL;
enum lttng_error_code reply_code = LTTNG_OK;
uint64_t chunk_id;
LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {};
time_t close_timestamp;
+ char closed_trace_chunk_path[LTTNG_PATH_MAX];
+ size_t path_length = 0;
+ const char *chunk_name = NULL;
+ struct lttng_dynamic_buffer reply_payload;
+
+ lttng_dynamic_buffer_init(&reply_payload);
if (!session || !conn->version_check_done) {
ERR("Trying to close a trace chunk before version check");
goto end_unlock_session;
}
}
+ chunk_status = lttng_trace_chunk_get_name(chunk, &chunk_name, NULL);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to get chunk name");
+ ret = -1;
+ reply_code = LTTNG_ERR_UNK;
+ goto end_unlock_session;
+ }
+ if (!session->has_rotated && !session->snapshot) {
+ ret = lttng_strncpy(closed_trace_chunk_path,
+ session->output_path,
+ sizeof(closed_trace_chunk_path));
+ if (ret) {
+ ERR("Failed to send trace chunk path: path length of %zu bytes exceeds the maximal allowed length of %zu bytes",
+ strlen(session->output_path),
+ sizeof(closed_trace_chunk_path));
+ reply_code = LTTNG_ERR_NOMEM;
+ ret = -1;
+ goto end_unlock_session;
+ }
+ } else {
+ if (session->snapshot) {
+ ret = snprintf(closed_trace_chunk_path,
+ sizeof(closed_trace_chunk_path),
+ "%s/%s", session->output_path,
+ chunk_name);
+ } else {
+ ret = snprintf(closed_trace_chunk_path,
+ sizeof(closed_trace_chunk_path),
+ "%s/" DEFAULT_ARCHIVED_TRACE_CHUNKS_DIRECTORY
+ "/%s",
+ session->output_path, chunk_name);
+ }
+ if (ret < 0 || ret == sizeof(closed_trace_chunk_path)) {
+ ERR("Failed to format closed trace chunk resulting path");
+ reply_code = ret < 0 ? LTTNG_ERR_UNK : LTTNG_ERR_NOMEM;
+ ret = -1;
+ goto end_unlock_session;
+ }
+ }
+ DBG("Reply chunk path on close: %s", closed_trace_chunk_path);
+ path_length = strlen(closed_trace_chunk_path) + 1;
+ if (path_length > UINT32_MAX) {
+ ERR("Closed trace chunk path exceeds the maximal length allowed by the protocol");
+ ret = -1;
+ reply_code = LTTNG_ERR_INVALID_PROTOCOL;
+ goto end_unlock_session;
+ }
if (session->current_trace_chunk == chunk) {
/*
pthread_mutex_unlock(&session->lock);
end:
- reply.ret_code = htobe32((uint32_t) reply_code);
+ reply.generic.ret_code = htobe32((uint32_t) reply_code);
+ reply.path_length = htobe32((uint32_t) path_length);
+ ret = lttng_dynamic_buffer_append(
+ &reply_payload, &reply, sizeof(reply));
+ if (ret) {
+ ERR("Failed to append \"close trace chunk\" command reply header to payload buffer");
+ goto end_no_reply;
+ }
+
+ if (reply_code == LTTNG_OK) {
+ ret = lttng_dynamic_buffer_append(&reply_payload,
+ closed_trace_chunk_path, path_length);
+ if (ret) {
+ ERR("Failed to append \"close trace chunk\" command reply path to payload buffer");
+ goto end_no_reply;
+ }
+ }
+
send_ret = conn->sock->ops->sendmsg(conn->sock,
- &reply,
- sizeof(struct lttcomm_relayd_generic_reply),
+ reply_payload.data,
+ reply_payload.size,
0);
- if (send_ret < (ssize_t) sizeof(reply)) {
- ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)",
- send_ret);
+ if (send_ret < reply_payload.size) {
+ ERR("Failed to send \"close trace chunk\" command reply of %zu bytes (ret = %zd)",
+ reply_payload.size, send_ret);
ret = -1;
+ goto end_no_reply;
}
end_no_reply:
lttng_trace_chunk_put(chunk);
+ lttng_dynamic_buffer_reset(&reply_payload);
return ret;
}
static uint64_t last_relay_session_id;
static pthread_mutex_t last_relay_session_id_lock = PTHREAD_MUTEX_INITIALIZER;
+static int init_session_output_path(struct relay_session *session)
+{
+ /*
+ * session_directory:
+ *
+ * if base_path is \0'
+ * hostname/session_name
+ * else
+ * hostname/base_path
+ */
+ char *session_directory = NULL;
+ int ret = 0;
+
+ if (session->output_path[0] != '\0') {
+ goto end;
+ }
+ /*
+ * If base path is set, it overrides the session name for the
+ * session relative base path. No timestamp is appended if the
+ * base path is overridden.
+ *
+ * If the session name already contains the creation time (e.g.
+ * auto-<timestamp>, don't append yet another timestamp after
+ * the session name in the generated path.
+ *
+ * Otherwise, generate the path with session_name-<timestamp>.
+ */
+ if (session->base_path[0] != '\0') {
+ ret = asprintf(&session_directory, "%s/%s", session->hostname,
+ session->base_path);
+ } else if (session->session_name_contains_creation_time) {
+ ret = asprintf(&session_directory, "%s/%s", session->hostname,
+ session->session_name);
+ } else {
+ char session_creation_datetime[16];
+ size_t strftime_ret;
+ struct tm *timeinfo;
+ time_t creation_time;
+
+ /*
+ * The 2.11+ protocol guarantees that a creation time
+ * is provided for a session. This would indicate a
+ * protocol error or an improper use of this util.
+ */
+ if (!session->creation_time.is_set) {
+ ERR("Creation time missing for session \"%s\" (protocol error)",
+ session->session_name);
+ ret = -1;
+ goto end;
+ }
+ creation_time = LTTNG_OPTIONAL_GET(session->creation_time);
+
+ timeinfo = localtime(&creation_time);
+ if (!timeinfo) {
+ ERR("Failed to get timeinfo while initializing session output directory handle");
+ ret = -1;
+ goto end;
+ }
+ strftime_ret = strftime(session_creation_datetime,
+ sizeof(session_creation_datetime),
+ "%Y%m%d-%H%M%S", timeinfo);
+ if (strftime_ret == 0) {
+ ERR("Failed to format session creation timestamp while initializing session output directory handle");
+ ret = -1;
+ goto end;
+ }
+ ret = asprintf(&session_directory, "%s/%s-%s",
+ session->hostname, session->session_name,
+ session_creation_datetime);
+ }
+ if (ret < 0) {
+ PERROR("Failed to format session directory name");
+ goto end;
+ }
+
+ if (strlen(session_directory) >= LTTNG_PATH_MAX) {
+ ERR("Session output directory exceeds maximal length");
+ ret = -1;
+ goto end;
+ }
+ strcpy(session->output_path, session_directory);
+ ret = 0;
+
+end:
+ free(session_directory);
+ return ret;
+}
+
static int session_set_anonymous_chunk(struct relay_session *session)
{
int ret = 0;
LTTNG_OPTIONAL_SET(&session->id_sessiond, *id_sessiond);
}
+ ret = init_session_output_path(session);
+ if (ret) {
+ goto error;
+ }
ret = sessiond_trace_chunk_registry_session_created(
sessiond_trace_chunk_registry, sessiond_uuid);
if (ret) {
char session_name[LTTNG_NAME_MAX];
char hostname[LTTNG_HOST_NAME_MAX];
char base_path[LTTNG_PATH_MAX];
+ /*
+ * Session output path relative to relayd's output path.
+ * Will be empty when interacting with peers < 2.11 since their
+ * streams' path are expressed relative to the relay daemon's
+ * output path.
+ */
+ char output_path[LTTNG_PATH_MAX];
uint32_t live_timer;
/* Session in snapshot mode. */
bool aborted;
bool session_name_contains_creation_time;
+ /* Whether session has performed an explicit rotation. */
+ bool has_rotated;
/* Contains ctf_trace object of that session indexed by path name. */
struct lttng_ht *ctf_traces_ht;
}
if (session_close_trace_chunk(
- session, session->current_trace_chunk, NULL)) {
+ session, session->current_trace_chunk, NULL, NULL)) {
/*
* Don't goto end; make sure the chunk is closed for the session
* to allow future snapshots.
quiet_rotation ?
NULL :
&((enum lttng_trace_chunk_command_type){
- LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED}));
+ LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED}),
+ session->last_chunk_path);
if (ret) {
cmd_ret = LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER;
goto error;
sizeof(info_return->location.local.absolute_path);
info_return->location_type =
(int8_t) LTTNG_TRACE_ARCHIVE_LOCATION_TYPE_LOCAL;
+ fmt_ret = asprintf(&chunk_path,
+ "%s/" DEFAULT_ARCHIVED_TRACE_CHUNKS_DIRECTORY "/%s",
+ session_get_base_path(session),
+ session->last_archived_chunk_name);
+ if (fmt_ret == -1) {
+ PERROR("Failed to format the path of the last archived trace chunk");
+ info_return->status = LTTNG_ROTATION_STATUS_ERROR;
+ cmd_ret = LTTNG_ERR_UNK;
+ goto end;
+ }
break;
case CONSUMER_DST_NET:
current_tracing_path_reply =
&info_return->location.relay.ports.data);
info_return->location_type =
(int8_t) LTTNG_TRACE_ARCHIVE_LOCATION_TYPE_RELAY;
+ chunk_path = strdup(session->last_chunk_path);
+ if (!chunk_path) {
+ ERR("Failed to allocate the path of the last archived trace chunk");
+ info_return->status = LTTNG_ROTATION_STATUS_ERROR;
+ cmd_ret = LTTNG_ERR_UNK;
+ goto end;
+ }
break;
default:
abort();
}
- fmt_ret = asprintf(&chunk_path,
- "%s/" DEFAULT_ARCHIVED_TRACE_CHUNKS_DIRECTORY "/%s",
- session_get_base_path(session),
- session->last_archived_chunk_name);
- if (fmt_ret == -1) {
- PERROR("Failed to format the path of the last archived trace chunk");
- info_return->status = LTTNG_ROTATION_STATUS_ERROR;
- cmd_ret = LTTNG_ERR_UNK;
- goto end;
- }
fmt_ret = lttng_strncpy(current_tracing_path_reply,
chunk_path, current_tracing_path_reply_len);
}
if (type == LTTNG_STREAM_CONTROL) {
+ char output_path[LTTNG_PATH_MAX] = {};
+
ret = relayd_create_session(rsock,
&msg.u.relayd_sock.relayd_session_id,
session_name, hostname, base_path,
consumer->snapshot, session_id,
sessiond_uuid, current_chunk_id,
session_creation_time,
- session_name_contains_creation_time);
+ session_name_contains_creation_time,
+ output_path);
if (ret < 0) {
/* Close the control socket. */
(void) relayd_close(rsock);
goto error;
}
+ DBG("Created session on relay, output path reply: %s",
+ output_path);
}
msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
*/
int consumer_close_trace_chunk(struct consumer_socket *socket,
uint64_t relayd_id, uint64_t session_id,
- struct lttng_trace_chunk *chunk)
+ struct lttng_trace_chunk *chunk,
+ char *closed_trace_chunk_path)
{
int ret;
enum lttng_trace_chunk_status chunk_status;
.cmd_type = LTTNG_CONSUMER_CLOSE_TRACE_CHUNK,
.u.close_trace_chunk.session_id = session_id,
};
+ struct lttcomm_consumer_close_trace_chunk_reply reply;
uint64_t chunk_id;
time_t close_timestamp;
enum lttng_trace_chunk_command_type close_command;
const char *close_command_name = "none";
+ struct lttng_dynamic_buffer path_reception_buffer;
assert(socket);
+ lttng_dynamic_buffer_init(&path_reception_buffer);
if (relayd_id != -1ULL) {
LTTNG_OPTIONAL_SET(
relayd_id, session_id, chunk_id, close_command_name);
health_code_update();
- ret = consumer_send_msg(socket, &msg);
+ ret = consumer_socket_send(socket, &msg, sizeof(struct lttcomm_consumer_msg));
if (ret < 0) {
ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER;
goto error;
}
-
+ ret = consumer_socket_recv(socket, &reply, sizeof(reply));
+ if (ret < 0) {
+ ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER;
+ goto error;
+ }
+ if (reply.path_length >= LTTNG_PATH_MAX) {
+ ERR("Invalid path returned by relay daemon: %" PRIu32 "bytes exceeds maximal allowed length of %d bytes",
+ reply.path_length, LTTNG_PATH_MAX);
+ ret = -LTTNG_ERR_INVALID_PROTOCOL;
+ goto error;
+ }
+ ret = lttng_dynamic_buffer_set_size(&path_reception_buffer,
+ reply.path_length);
+ if (ret) {
+ ERR("Failed to allocate reception buffer of path returned by the \"close trace chunk\" command");
+ ret = -LTTNG_ERR_NOMEM;
+ goto error;
+ }
+ ret = consumer_socket_recv(socket, path_reception_buffer.data,
+ path_reception_buffer.size);
+ if (ret < 0) {
+ ERR("Communication error while receiving path of closed trace chunk");
+ ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER;
+ goto error;
+ }
+ if (path_reception_buffer.data[path_reception_buffer.size - 1] != '\0') {
+ ERR("Invalid path returned by relay daemon: not null-terminated");
+ ret = -LTTNG_ERR_INVALID_PROTOCOL;
+ goto error;
+ }
+ if (closed_trace_chunk_path) {
+ /*
+ * closed_trace_chunk_path is assumed to have a length >=
+ * LTTNG_PATH_MAX
+ */
+ memcpy(closed_trace_chunk_path, path_reception_buffer.data,
+ path_reception_buffer.size);
+ }
error:
+ lttng_dynamic_buffer_reset(&path_reception_buffer);
health_code_update();
return ret;
}
struct lttng_trace_chunk *chunk);
int consumer_close_trace_chunk(struct consumer_socket *socket,
uint64_t relayd_id, uint64_t session_id,
- struct lttng_trace_chunk *chunk);
+ struct lttng_trace_chunk *chunk,
+ char *closed_trace_chunk_path);
int consumer_trace_chunk_exists(struct consumer_socket *socket,
uint64_t relayd_id, uint64_t session_id,
struct lttng_trace_chunk *chunk,
goto end;
}
- ret = asprintf(&chunk_path, "%s/" DEFAULT_ARCHIVED_TRACE_CHUNKS_DIRECTORY "/%s",
- session_get_base_path(session),
- session->last_archived_chunk_name);
- if (ret == -1) {
- goto end;
- }
-
switch (session_get_consumer_destination_type(session)) {
case CONSUMER_DST_LOCAL:
+ ret = asprintf(&chunk_path,
+ "%s/" DEFAULT_ARCHIVED_TRACE_CHUNKS_DIRECTORY "/%s",
+ session_get_base_path(session),
+ session->last_archived_chunk_name);
+ if (ret == -1) {
+ goto end;
+ }
location = lttng_trace_archive_location_local_create(
chunk_path);
break;
location = lttng_trace_archive_location_relay_create(
hostname,
LTTNG_TRACE_ARCHIVE_LOCATION_RELAY_PROTOCOL_TYPE_TCP,
- control_port, data_port, chunk_path);
+ control_port, data_port, session->last_chunk_path);
break;
}
default:
int session_close_trace_chunk(const struct ltt_session *session,
struct lttng_trace_chunk *trace_chunk,
- const enum lttng_trace_chunk_command_type *close_command)
+ const enum lttng_trace_chunk_command_type *close_command,
+ char *closed_trace_chunk_path)
{
int ret = 0;
bool error_occurred = false;
ret = consumer_close_trace_chunk(socket,
relayd_id,
session->id,
- trace_chunk);
+ trace_chunk, closed_trace_chunk_path);
pthread_mutex_unlock(socket->lock);
if (ret) {
ERR("Failed to close trace chunk on user space consumer");
ret = consumer_close_trace_chunk(socket,
relayd_id,
session->id,
- trace_chunk);
+ trace_chunk, closed_trace_chunk_path);
pthread_mutex_unlock(socket->lock);
if (ret) {
ERR("Failed to close trace chunk on kernel consumer");
bool has_auto_generated_name;
bool name_contains_creation_time;
char hostname[HOST_NAME_MAX]; /* Local hostname. */
+ /* Path of the last closed chunk. */
+ char last_chunk_path[LTTNG_PATH_MAX];
time_t creation_time;
struct ltt_kernel_session *kernel_session;
struct ltt_ust_session *ust_session;
*/
int session_close_trace_chunk(const struct ltt_session *session,
struct lttng_trace_chunk *trace_chunk,
- const enum lttng_trace_chunk_command_type *close_command);
+ const enum lttng_trace_chunk_command_type *close_command,
+ char *path);
bool session_output_supports_trace_chunks(const struct ltt_session *session);
* channels.
*/
enum lttcomm_return_code close_ret;
+ char path[LTTNG_PATH_MAX];
DBG("Failed to set new trace chunk on existing channels, rolling back");
close_ret = lttng_consumer_close_trace_chunk(relayd_id,
session_id, chunk_id,
- chunk_creation_timestamp, NULL);
+ chunk_creation_timestamp, NULL,
+ path);
if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
session_id, chunk_id);
if (!relayd || ret) {
enum lttcomm_return_code close_ret;
+ char path[LTTNG_PATH_MAX];
close_ret = lttng_consumer_close_trace_chunk(relayd_id,
session_id,
chunk_id,
chunk_creation_timestamp,
- NULL);
+ NULL, path);
if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
session_id,
enum lttcomm_return_code lttng_consumer_close_trace_chunk(
const uint64_t *relayd_id, uint64_t session_id,
uint64_t chunk_id, time_t chunk_close_timestamp,
- const enum lttng_trace_chunk_command_type *close_command)
+ const enum lttng_trace_chunk_command_type *close_command,
+ char *path)
{
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct lttng_trace_chunk *chunk;
if (relayd) {
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_close_trace_chunk(
- &relayd->control_sock, chunk);
+ &relayd->control_sock, chunk,
+ path);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
} else {
ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64,
enum lttcomm_return_code lttng_consumer_close_trace_chunk(
const uint64_t *relayd_id, uint64_t session_id,
uint64_t chunk_id, time_t chunk_close_timestamp,
- const enum lttng_trace_chunk_command_type *close_command);
+ const enum lttng_trace_chunk_command_type *close_command,
+ char *path);
enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
const uint64_t *relayd_id, uint64_t session_id,
uint64_t chunk_id);
msg.u.close_trace_chunk.close_command.value;
const uint64_t relayd_id =
msg.u.close_trace_chunk.relayd_id.value;
+ struct lttcomm_consumer_close_trace_chunk_reply reply;
+ char path[LTTNG_PATH_MAX];
ret_code = lttng_consumer_close_trace_chunk(
msg.u.close_trace_chunk.relayd_id.is_set ?
(time_t) msg.u.close_trace_chunk.close_timestamp,
msg.u.close_trace_chunk.close_command.is_set ?
&close_command :
- NULL);
- goto end_msg_sessiond;
+ NULL, path);
+ reply.ret_code = ret_code;
+ reply.path_length = strlen(path) + 1;
+ ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
+ if (ret != sizeof(reply)) {
+ goto error_fatal;
+ }
+ ret = lttcomm_send_unix_sock(sock, path, reply.path_length);
+ if (ret != reply.path_length) {
+ goto error_fatal;
+ }
+ goto end_nosignal;
}
case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
{
const char *base_path, int session_live_timer,
unsigned int snapshot, uint64_t sessiond_session_id,
const lttng_uuid sessiond_uuid, const uint64_t *current_chunk_id,
- time_t creation_time, bool session_name_contains_creation_time)
+ time_t creation_time, bool session_name_contains_creation_time,
+ struct lttcomm_relayd_create_session_reply_2_11 *reply,
+ char *output_path)
{
int ret;
struct lttcomm_relayd_create_session_2_11 *msg = NULL;
if (ret < 0) {
goto error;
}
+ /* Receive response */
+ ret = recv_reply(rsock, reply, sizeof(*reply));
+ if (ret < 0) {
+ goto error;
+ }
+ reply->generic.session_id = be64toh(reply->generic.session_id);
+ reply->generic.ret_code = be32toh(reply->generic.ret_code);
+ reply->output_path_length = be32toh(reply->output_path_length);
+ if (reply->output_path_length >= LTTNG_PATH_MAX) {
+ ERR("Invalid session output path length in reply (%" PRIu32 " bytes) exceeds maximal allowed length (%d bytes)",
+ reply->output_path_length, LTTNG_PATH_MAX);
+ ret = -1;
+ goto error;
+ }
+ ret = recv_reply(rsock, output_path, reply->output_path_length);
+ if (ret < 0) {
+ goto error;
+ }
error:
free(msg);
return ret;
*/
static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
const char *session_name, const char *hostname,
- int session_live_timer, unsigned int snapshot)
+ int session_live_timer, unsigned int snapshot,
+ struct lttcomm_relayd_status_session *reply)
{
int ret;
struct lttcomm_relayd_create_session_2_4 msg;
goto error;
}
+ /* Receive response */
+ ret = recv_reply(rsock, reply, sizeof(*reply));
+ if (ret < 0) {
+ goto error;
+ }
+ reply->session_id = be64toh(reply->session_id);
+ reply->ret_code = be32toh(reply->ret_code);
error:
return ret;
}
/*
* RELAYD_CREATE_SESSION from 2.1 to 2.3.
*/
-static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock)
+static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock,
+ struct lttcomm_relayd_status_session *reply)
{
int ret;
goto error;
}
+ /* Receive response */
+ ret = recv_reply(rsock, reply, sizeof(*reply));
+ if (ret < 0) {
+ goto error;
+ }
+ reply->session_id = be64toh(reply->session_id);
+ reply->ret_code = be32toh(reply->ret_code);
error:
return ret;
}
unsigned int snapshot, uint64_t sessiond_session_id,
const lttng_uuid sessiond_uuid,
const uint64_t *current_chunk_id,
- time_t creation_time, bool session_name_contains_creation_time)
+ time_t creation_time, bool session_name_contains_creation_time,
+ char *output_path)
{
int ret;
- struct lttcomm_relayd_status_session reply;
+ struct lttcomm_relayd_create_session_reply_2_11 reply = {};
assert(rsock);
assert(relayd_session_id);
if (rsock->minor < 4) {
/* From 2.1 to 2.3 */
- ret = relayd_create_session_2_1(rsock);
+ ret = relayd_create_session_2_1(rsock, &reply.generic);
} else if (rsock->minor >= 4 && rsock->minor < 11) {
/* From 2.4 to 2.10 */
ret = relayd_create_session_2_4(rsock, session_name,
- hostname, session_live_timer, snapshot);
+ hostname, session_live_timer, snapshot,
+ &reply.generic);
} else {
/* From 2.11 to ... */
ret = relayd_create_session_2_11(rsock, session_name,
hostname, base_path, session_live_timer, snapshot,
sessiond_session_id, sessiond_uuid,
current_chunk_id, creation_time,
- session_name_contains_creation_time);
+ session_name_contains_creation_time,
+ &reply, output_path);
}
if (ret < 0) {
goto error;
}
- /* Receive response */
- ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
- if (ret < 0) {
- goto error;
- }
-
- reply.session_id = be64toh(reply.session_id);
- reply.ret_code = be32toh(reply.ret_code);
-
/* Return session id or negative ret code. */
- if (reply.ret_code != LTTNG_OK) {
+ if (reply.generic.ret_code != LTTNG_OK) {
ret = -1;
- ERR("Relayd create session replied error %d", reply.ret_code);
+ ERR("Relayd create session replied error %d",
+ reply.generic.ret_code);
goto error;
} else {
ret = 0;
- *relayd_session_id = reply.session_id;
+ *relayd_session_id = reply.generic.session_id;
}
- DBG("Relayd session created with id %" PRIu64, reply.session_id);
+ DBG("Relayd session created with id %" PRIu64, reply.generic.session_id);
error:
return ret;
}
int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
- struct lttng_trace_chunk *chunk)
+ struct lttng_trace_chunk *chunk,
+ char *path)
{
int ret = 0;
enum lttng_trace_chunk_status status;
struct lttcomm_relayd_close_trace_chunk msg = {};
- struct lttcomm_relayd_generic_reply reply = {};
+ struct lttcomm_relayd_close_trace_chunk_reply reply = {};
uint64_t chunk_id;
time_t close_timestamp;
LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {};
goto end;
}
- reply.ret_code = be32toh(reply.ret_code);
- if (reply.ret_code != LTTNG_OK) {
+ reply.path_length = be32toh(reply.path_length);
+ if (reply.path_length >= LTTNG_PATH_MAX) {
+ ERR("Chunk path too long");
+ ret = -1;
+ goto end;
+ }
+
+ ret = recv_reply(sock, path, reply.path_length);
+ if (ret < 0) {
+ ERR("Failed to receive relay daemon trace chunk close command reply");
+ goto end;
+ }
+ if (path[reply.path_length - 1] != '\0') {
+ ERR("Invalid trace chunk path returned by relay daemon (not null-terminated)");
+ ret = -1;
+ goto end;
+ }
+
+ reply.generic.ret_code = be32toh(reply.generic.ret_code);
+ if (reply.generic.ret_code != LTTNG_OK) {
ret = -1;
ERR("Relayd trace chunk close replied error %d",
- reply.ret_code);
+ reply.generic.ret_code);
} else {
ret = 0;
DBG("Relayd successfully closed trace chunk: chunk_id = %" PRIu64,
unsigned int snapshot, uint64_t sessiond_session_id,
const lttng_uuid sessiond_uuid,
const uint64_t *current_chunk_id,
- time_t creation_time, bool session_name_contains_creation_time);
+ time_t creation_time, bool session_name_contains_creation_time,
+ char *output_path);
int relayd_add_stream(struct lttcomm_relayd_sock *sock, const char *channel_name,
const char *pathname, uint64_t *stream_id,
uint64_t tracefile_size, uint64_t tracefile_count,
int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
struct lttng_trace_chunk *chunk);
int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
- struct lttng_trace_chunk *chunk);
+ struct lttng_trace_chunk *chunk,
+ char *path);
int relayd_trace_chunk_exists(struct lttcomm_relayd_sock *sock,
uint64_t chunk_id, bool *chunk_exists);
char names[];
} LTTNG_PACKED;
+struct lttcomm_relayd_create_session_reply_2_11 {
+ struct lttcomm_relayd_status_session generic;
+ /* Includes the '\0' terminator. */
+ uint32_t output_path_length;
+ char output_path[];
+} LTTNG_PACKED;
+
/*
* Used to ask the relay to reset the metadata trace file (regeneration).
* Send the new version of the metadata (starts at 0).
LTTNG_OPTIONAL_COMM(uint32_t) LTTNG_PACKED close_command;
} LTTNG_PACKED;
+struct lttcomm_relayd_close_trace_chunk_reply {
+ struct lttcomm_relayd_generic_reply generic;
+ /* Includes trailing NULL. */
+ uint32_t path_length;
+ char path[];
+} LTTNG_PACKED;
+
struct lttcomm_relayd_trace_chunk_exists {
uint64_t chunk_id;
} LTTNG_PACKED;
#ifdef HAVE_LIBLTTNG_UST_CTL
+struct lttcomm_consumer_close_trace_chunk_reply {
+ enum lttcomm_return_code ret_code;
+ uint32_t path_length;
+ char path[];
+};
+
#include <lttng/ust-abi.h>
/*
goto end;
}
LTTNG_OPTIONAL_SET(&chunk->timestamp_close, close_ts);
- free(chunk->name);
- chunk->name = generate_chunk_name(LTTNG_OPTIONAL_GET(chunk->id),
- LTTNG_OPTIONAL_GET(chunk->timestamp_creation),
- &close_ts);
- if (!chunk->name) {
- status = LTTNG_TRACE_CHUNK_STATUS_ERROR;
+ if (!chunk->name_overridden) {
+ free(chunk->name);
+ chunk->name = generate_chunk_name(LTTNG_OPTIONAL_GET(chunk->id),
+ LTTNG_OPTIONAL_GET(chunk->timestamp_creation),
+ &close_ts);
+ if (!chunk->name) {
+ status = LTTNG_TRACE_CHUNK_STATUS_ERROR;
+ }
}
end:
pthread_mutex_unlock(&chunk->lock);
msg.u.close_trace_chunk.close_command.value;
const uint64_t relayd_id =
msg.u.close_trace_chunk.relayd_id.value;
+ struct lttcomm_consumer_close_trace_chunk_reply reply;
+ char closed_trace_chunk_path[LTTNG_PATH_MAX];
+ int ret;
ret_code = lttng_consumer_close_trace_chunk(
msg.u.close_trace_chunk.relayd_id.is_set ?
(time_t) msg.u.close_trace_chunk.close_timestamp,
msg.u.close_trace_chunk.close_command.is_set ?
&close_command :
- NULL);
- goto end_msg_sessiond;
+ NULL, closed_trace_chunk_path);
+ reply.ret_code = ret_code;
+ reply.path_length = strlen(closed_trace_chunk_path) + 1;
+ ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
+ if (ret != sizeof(reply)) {
+ goto error_fatal;
+ }
+ ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path,
+ reply.path_length);
+ if (ret != reply.path_length) {
+ goto error_fatal;
+ }
+ goto end_nosignal;
}
case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
{