size_t path_len;
struct lttng_buffer_view new_path_view;
- DBG("Rotate stream received");
-
if (!session || !conn->version_check_done) {
ERR("Trying to rotate a stream before version check");
ret = -1;
return ret;
}
+static int init_session_output_directory_handle(struct relay_session *session,
+ struct lttng_directory_handle *handle)
+{
+ int ret;
+ /* hostname/session_name */
+ char *session_directory = NULL;
+ /*
+ * base path + session_directory
+ * e.g. /home/user/lttng-traces/hostname/session_name
+ */
+ char *full_session_path = NULL;
+
+ pthread_mutex_lock(&session->lock);
+ ret = asprintf(&session_directory, "%s/%s", session->hostname,
+ session->session_name);
+ pthread_mutex_unlock(&session->lock);
+ if (ret < 0) {
+ PERROR("Failed to format session directory name");
+ goto end;
+ }
+
+ full_session_path = create_output_path(session_directory);
+ if (!full_session_path) {
+ ret = -1;
+ goto end;
+ }
+
+ ret = utils_mkdir_recursive(
+ full_session_path, S_IRWXU | S_IRWXG, -1, -1);
+ if (ret) {
+ ERR("Failed to create session output path \"%s\"",
+ full_session_path);
+ goto end;
+ }
+
+ ret = lttng_directory_handle_init(handle, full_session_path);
+ if (ret) {
+ goto end;
+ }
+end:
+ free(session_directory);
+ free(full_session_path);
+ return ret;
+}
+
+/*
+ * relay_create_trace_chunk: create a new trace chunk
+ */
+static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn,
+ const struct lttng_buffer_view *payload)
+{
+ int ret = 0;
+ ssize_t send_ret;
+ struct relay_session *session = conn->session;
+ struct lttcomm_relayd_create_trace_chunk *msg;
+ struct lttcomm_relayd_generic_reply reply = {};
+ struct lttng_buffer_view header_view;
+ struct lttng_buffer_view chunk_name_view;
+ struct lttng_trace_chunk *chunk = NULL, *published_chunk = NULL;
+ enum lttng_error_code reply_code = LTTNG_OK;
+ enum lttng_trace_chunk_status chunk_status;
+ struct lttng_directory_handle session_output;
+
+ if (!session || !conn->version_check_done) {
+ ERR("Trying to create a trace chunk before version check");
+ ret = -1;
+ goto end_no_reply;
+ }
+
+ if (session->major == 2 && session->minor < 11) {
+ ERR("Chunk creation command is unsupported before 2.11");
+ ret = -1;
+ goto end_no_reply;
+ }
+
+ header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg));
+ if (!header_view.data) {
+ ERR("Failed to receive payload of chunk creation command");
+ ret = -1;
+ goto end_no_reply;
+ }
+
+ /* Convert to host endianness. */
+ msg = (typeof(msg)) header_view.data;
+ msg->chunk_id = be64toh(msg->chunk_id);
+ msg->creation_timestamp = be64toh(msg->creation_timestamp);
+ msg->override_name_length = be32toh(msg->override_name_length);
+
+ chunk = lttng_trace_chunk_create(
+ msg->chunk_id, msg->creation_timestamp);
+ if (!chunk) {
+ ERR("Failed to create trace chunk in trace chunk creation command");
+ ret = -1;
+ reply_code = LTTNG_ERR_NOMEM;
+ goto end;
+ }
+
+ if (msg->override_name_length) {
+ const char *name;
+
+ chunk_name_view = lttng_buffer_view_from_view(payload,
+ sizeof(*msg),
+ msg->override_name_length);
+ name = chunk_name_view.data;
+ if (!name || name[msg->override_name_length - 1]) {
+ ERR("Failed to receive payload of chunk creation command");
+ ret = -1;
+ reply_code = LTTNG_ERR_INVALID;
+ goto end;
+ }
+
+ chunk_status = lttng_trace_chunk_override_name(
+ chunk, chunk_name_view.data);
+ switch (chunk_status) {
+ case LTTNG_TRACE_CHUNK_STATUS_OK:
+ break;
+ case LTTNG_TRACE_CHUNK_STATUS_INVALID_ARGUMENT:
+ ERR("Failed to set the name of new trace chunk in trace chunk creation command (invalid name)");
+ reply_code = LTTNG_ERR_INVALID;
+ ret = -1;
+ goto end;
+ default:
+ ERR("Failed to set the name of new trace chunk in trace chunk creation command (unknown error)");
+ reply_code = LTTNG_ERR_UNK;
+ ret = -1;
+ goto end;
+ }
+ }
+
+ ret = init_session_output_directory_handle(
+ conn->session, &session_output);
+ if (ret) {
+ reply_code = LTTNG_ERR_CREATE_DIR_FAIL;
+ goto end;
+ }
+
+ chunk_status = lttng_trace_chunk_set_credentials_current_user(chunk);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ reply_code = LTTNG_ERR_UNK;
+ ret = -1;
+ goto end;
+ }
+
+ chunk_status = lttng_trace_chunk_set_as_owner(chunk, &session_output);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ reply_code = LTTNG_ERR_UNK;
+ ret = -1;
+ goto end;
+ }
+
+ published_chunk = sessiond_trace_chunk_registry_publish_chunk(
+ sessiond_trace_chunk_registry,
+ conn->session->sessiond_uuid,
+ conn->session->id,
+ chunk);
+ if (!published_chunk) {
+ char uuid_str[UUID_STR_LEN];
+
+ lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str);
+ ERR("Failed to publish chunk: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+ uuid_str,
+ conn->session->id,
+ msg->chunk_id);
+ ret = -1;
+ reply_code = LTTNG_ERR_NOMEM;
+ goto end;
+ }
+
+ pthread_mutex_lock(&conn->session->lock);
+ lttng_trace_chunk_put(conn->session->current_trace_chunk);
+ conn->session->current_trace_chunk = published_chunk;
+ pthread_mutex_unlock(&conn->session->lock);
+ published_chunk = NULL;
+
+end:
+ reply.ret_code = htobe32((uint32_t) reply_code);
+ send_ret = conn->sock->ops->sendmsg(conn->sock,
+ &reply,
+ sizeof(struct lttcomm_relayd_generic_reply),
+ 0);
+ if (send_ret < (ssize_t) sizeof(reply)) {
+ ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)",
+ send_ret);
+ ret = -1;
+ }
+end_no_reply:
+ lttng_trace_chunk_put(chunk);
+ lttng_trace_chunk_put(published_chunk);
+ lttng_directory_handle_fini(&session_output);
+ return ret;
+}
+
#define DBG_CMD(cmd_name, conn) \
DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd);
DBG_CMD("RELAYD_ROTATE_STREAM", conn);
ret = relay_rotate_session_stream(header, conn, payload);
break;
+ case RELAYD_CREATE_TRACE_CHUNK:
+ DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn);
+ ret = relay_create_trace_chunk(header, conn, payload);
+ break;
case RELAYD_UPDATE_SYNC_INFO:
default:
ERR("Received unknown command (%u)", header->cmd);
goto error;
}
msg.u.create_trace_chunk.chunk_id = chunk_id;
- /* Only used for logging purposes. */
if (chunk_has_local_output) {
chunk_status = lttng_trace_chunk_get_chunk_directory_handle(
chunk_dirfd = lttng_directory_handle_get_dirfd(
chunk_directory_handle);
assert(chunk_dirfd >= 0);
- }
- chunk_status = lttng_trace_chunk_get_credentials(chunk,
- &chunk_credentials);
- if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
- /*
- * Not associating credentials to a sessiond chunk is a fatal
- * internal error.
- */
- ret = -LTTNG_ERR_FATAL;
- goto error;
+ chunk_status = lttng_trace_chunk_get_credentials(
+ chunk, &chunk_credentials);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ /*
+ * Not associating credentials to a sessiond chunk is a
+ * fatal internal error.
+ */
+ ret = -LTTNG_ERR_FATAL;
+ goto error;
+ }
+ msg.u.create_trace_chunk.credentials.value.uid =
+ chunk_credentials.uid;
+ msg.u.create_trace_chunk.credentials.value.gid =
+ chunk_credentials.gid;
+ msg.u.create_trace_chunk.credentials.is_set = 1;
}
- msg.u.create_trace_chunk.credentials.uid = chunk_credentials.uid;
- msg.u.create_trace_chunk.credentials.gid = chunk_credentials.gid;
DBG("Sending consumer create trace chunk command: relayd_id = %" PRId64
", session_id = %" PRIu64 ", chunk_id = %" PRIu64
struct lttng_trace_chunk *current_trace_chunk;
uint64_t chunk_id;
enum lttng_trace_chunk_status chunk_status;
- const uint64_t relayd_id = session->consumer->net_seq_index;
- const bool is_local_trace = relayd_id == -1ULL;
rcu_read_lock();
/*
}
if (session->ust_session) {
+ const uint64_t relayd_id =
+ session->ust_session->consumer->net_seq_index;
+ const bool is_local_trace =
+ session->ust_session->consumer->type ==
+ CONSUMER_DST_LOCAL;
+
session->ust_session->current_trace_chunk = new_trace_chunk;
if (is_local_trace) {
enum lttng_error_code ret_error_code;
}
}
if (session->kernel_session) {
+ const uint64_t relayd_id =
+ session->kernel_session->consumer->net_seq_index;
+ const bool is_local_trace =
+ session->kernel_session->consumer->type ==
+ CONSUMER_DST_LOCAL;
+
session->kernel_session->current_trace_chunk = new_trace_chunk;
if (is_local_trace) {
enum lttng_error_code ret_error_code;
break;
}
}
- rcu_read_unlock();
+ if (relayd_id) {
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(*relayd_id);
+ if (relayd) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_create_trace_chunk(
+ &relayd->control_sock, published_chunk);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ } else {
+ ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
+ }
+
+ if (!relayd || ret) {
+ enum lttcomm_return_code close_ret;
+
+ close_ret = lttng_consumer_close_trace_chunk(relayd_id,
+ session_id,
+ chunk_id,
+ chunk_creation_timestamp);
+ if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+ session_id,
+ chunk_id);
+ }
+
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error;
+ }
+ }
+error:
+ rcu_read_unlock();
/* Release the reference returned by the "publish" operation. */
lttng_trace_chunk_put(published_chunk);
end:
case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
{
const struct lttng_credentials credentials = {
- .uid = msg.u.create_trace_chunk.credentials.uid,
- .gid = msg.u.create_trace_chunk.credentials.gid,
+ .uid = msg.u.create_trace_chunk.credentials.value.uid,
+ .gid = msg.u.create_trace_chunk.credentials.value.gid,
};
const bool is_local_trace =
!msg.u.create_trace_chunk.relayd_id.is_set;
!is_local_trace ? &relayd_id : NULL,
msg.u.create_trace_chunk.session_id,
msg.u.create_trace_chunk.chunk_id,
- (time_t) msg.u.create_trace_chunk.creation_timestamp,
+ (time_t) msg.u.create_trace_chunk
+ .creation_timestamp,
chunk_override_name,
- &credentials,
+ msg.u.create_trace_chunk.credentials.is_set ?
+ &credentials :
+ NULL,
chunk_directory_handle.is_set ?
&chunk_directory_handle.value :
NULL);
free(msg);
return ret;
}
+
+int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
+ struct lttng_trace_chunk *chunk)
+{
+ int ret = 0;
+ enum lttng_trace_chunk_status status;
+ struct lttcomm_relayd_create_trace_chunk msg = {};
+ struct lttcomm_relayd_generic_reply reply = {};
+ struct lttng_dynamic_buffer payload;
+ uint64_t chunk_id;
+ time_t creation_timestamp;
+ const char *chunk_name;
+ size_t chunk_name_length;
+ bool overriden_name;
+
+ lttng_dynamic_buffer_init(&payload);
+
+ status = lttng_trace_chunk_get_id(chunk, &chunk_id);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
+ goto end;
+ }
+
+ status = lttng_trace_chunk_get_creation_timestamp(
+ chunk, &creation_timestamp);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
+ goto end;
+ }
+
+ status = lttng_trace_chunk_get_name(
+ chunk, &chunk_name, &overriden_name);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK &&
+ status != LTTNG_TRACE_CHUNK_STATUS_NONE) {
+ ret = -1;
+ goto end;
+ }
+
+ chunk_name_length = overriden_name ? (strlen(chunk_name) + 1) : 0;
+ msg = (typeof(msg)){
+ .chunk_id = htobe64(chunk_id),
+ .creation_timestamp = htobe64((uint64_t) creation_timestamp),
+ .override_name_length = htobe32((uint32_t) chunk_name_length),
+ };
+
+ ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
+ if (ret) {
+ goto end;
+ }
+ if (chunk_name_length) {
+ ret = lttng_dynamic_buffer_append(
+ &payload, chunk_name, chunk_name_length);
+ if (ret) {
+ goto end;
+ }
+ }
+
+ ret = send_command(sock,
+ RELAYD_CREATE_TRACE_CHUNK,
+ payload.data,
+ payload.size,
+ 0);
+ if (ret < 0) {
+ ERR("Failed to send trace chunk creation command to relay daemon");
+ goto end;
+ }
+
+ ret = recv_reply(sock, &reply, sizeof(reply));
+ if (ret < 0) {
+ ERR("Failed to receive relay daemon trace chunk creation command reply");
+ goto end;
+ }
+
+ reply.ret_code = be32toh(reply.ret_code);
+ if (reply.ret_code != LTTNG_OK) {
+ ret = -1;
+ ERR("Relayd trace chunk create replied error %d",
+ reply.ret_code);
+ } else {
+ ret = 0;
+ DBG("Relayd successfully created trace chunk: chunk_id = %" PRIu64,
+ chunk_id);
+ }
+
+end:
+ lttng_dynamic_buffer_reset(&payload);
+ return ret;
+}
uint64_t stream_id, uint64_t version);
int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id,
uint64_t new_chunk_id, uint64_t seq_num);
+int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
+ struct lttng_trace_chunk *chunk);
#endif /* _RELAYD_H */
char new_pathname[];
} LTTNG_PACKED;
+struct lttcomm_relayd_create_trace_chunk {
+ uint64_t chunk_id;
+ /* Seconds since EPOCH. */
+ uint64_t creation_timestamp;
+ /* Includes trailing NULL. */
+ uint32_t override_name_length;
+ char override_name[];
+} LTTNG_PACKED;
+
#endif /* _RELAYD_COMM */
RELAYD_RESET_METADATA = 17,
/* Ask the relay to rotate a stream file (2.11+) */
RELAYD_ROTATE_STREAM = 18,
- /* Rename a chunk after the rotation is completed (2.11+) */
- RELAYD_ROTATE_RENAME = 19,
- /* Check if a chunk has data pending (2.11+) */
- RELAYD_ROTATE_PENDING = 20,
- /* Create a folder on the relayd FS (2.11+) */
- RELAYD_MKDIR = 21,
+ /* Ask the relay to create a trace chunk (2.11+) */
+ RELAYD_CREATE_TRACE_CHUNK = 19,
};
/*
uint64_t session_id;
uint64_t chunk_id;
uint64_t creation_timestamp;
- struct {
+ LTTNG_OPTIONAL_COMM(struct {
uint32_t uid;
uint32_t gid;
- } LTTNG_PACKED credentials;
+ } LTTNG_PACKED ) LTTNG_PACKED credentials;
} LTTNG_PACKED create_trace_chunk;
struct {
LTTNG_OPTIONAL_COMM(uint64_t) LTTNG_PACKED relayd_id;
case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
{
const struct lttng_credentials credentials = {
- .uid = msg.u.create_trace_chunk.credentials.uid,
- .gid = msg.u.create_trace_chunk.credentials.gid,
+ .uid = msg.u.create_trace_chunk.credentials.value.uid,
+ .gid = msg.u.create_trace_chunk.credentials.value.gid,
};
const bool is_local_trace =
!msg.u.create_trace_chunk.relayd_id.is_set;
!is_local_trace ? &relayd_id : NULL,
msg.u.create_trace_chunk.session_id,
msg.u.create_trace_chunk.chunk_id,
- (time_t) msg.u.create_trace_chunk.creation_timestamp,
+ (time_t) msg.u.create_trace_chunk
+ .creation_timestamp,
chunk_override_name,
- &credentials,
+ msg.u.create_trace_chunk.credentials.is_set ?
+ &credentials :
+ NULL,
chunk_directory_handle.is_set ?
&chunk_directory_handle.value :
NULL);