int cmd_create_session_2_11(const struct lttng_buffer_view *payload,
char *session_name, char *hostname,
uint32_t *live_timer, bool *snapshot,
- lttng_uuid sessiond_uuid)
+ uint64_t *id_sessiond, lttng_uuid sessiond_uuid,
+ uint64_t *current_chunk_id)
{
int ret;
struct lttcomm_relayd_create_session_2_11 header;
int cmd_create_session_2_11(const struct lttng_buffer_view *payload,
char *session_name, char *hostname,
uint32_t *live_timer, bool *snapshot,
- lttng_uuid sessiond_uuid);
+ uint64_t *id_sessiond, lttng_uuid sessiond_uuid,
+ uint64_t *current_chunk_id);
int cmd_recv_stream_2_11(const struct lttng_buffer_view *payload,
char **ret_path_name, char **ret_channel_name,
int ret = 0;
ssize_t send_ret;
struct relay_session *session = NULL;
- struct lttcomm_relayd_status_session reply;
- char session_name[LTTNG_NAME_MAX];
- char hostname[LTTNG_HOST_NAME_MAX];
+ struct lttcomm_relayd_status_session reply = {};
+ char session_name[LTTNG_NAME_MAX] = {};
+ char hostname[LTTNG_HOST_NAME_MAX] = {};
uint32_t live_timer = 0;
bool snapshot = false;
/* Left nil for peers < 2.11. */
lttng_uuid sessiond_uuid = {};
-
- memset(session_name, 0, LTTNG_NAME_MAX);
- memset(hostname, 0, LTTNG_HOST_NAME_MAX);
-
- memset(&reply, 0, sizeof(reply));
+ LTTNG_OPTIONAL(uint64_t) id_sessiond = {};
+ LTTNG_OPTIONAL(uint64_t) current_chunk_id = {};
if (conn->minor < 4) {
/* From 2.1 to 2.3 */
/* From 2.11 to ... */
ret = cmd_create_session_2_11(payload, session_name,
hostname, &live_timer, &snapshot,
- sessiond_uuid);
+ &id_sessiond.value, sessiond_uuid,
+ ¤t_chunk_id.value);
if (lttng_uuid_is_nil(sessiond_uuid)) {
/* The nil UUID is reserved for pre-2.11 clients. */
ERR("Illegal nil UUID announced by peer in create session command");
ret = -1;
goto send_reply;
}
+ id_sessiond.is_set = true;
+ current_chunk_id.is_set = true;
}
if (ret < 0) {
}
session = session_create(session_name, hostname, live_timer,
- snapshot, sessiond_uuid, conn->major, conn->minor);
+ snapshot, sessiond_uuid,
+ id_sessiond.is_set ? &id_sessiond.value : NULL,
+ current_chunk_id.is_set ? ¤t_chunk_id.value : NULL,
+ conn->major, conn->minor);
if (!session) {
ret = -1;
goto send_reply;
session->current_trace_chunk =
sessiond_trace_chunk_registry_get_anonymous_chunk(
sessiond_trace_chunk_registry, sessiond_uuid,
- session->id,
- opt_output_path);
+ session->id);
if (!session->current_trace_chunk) {
ret = -1;
}
#define _LGPL_SOURCE
#include <common/common.h>
+#include <common/compat/uuid.h>
#include <urcu/rculist.h>
#include "lttng-relayd.h"
static uint64_t last_relay_session_id;
static pthread_mutex_t last_relay_session_id_lock = PTHREAD_MUTEX_INITIALIZER;
+static int session_set_anonymous_chunk(struct relay_session *session)
+{
+ int ret = 0;
+ struct lttng_trace_chunk *chunk = NULL;
+ enum lttng_trace_chunk_status status;
+ struct lttng_directory_handle output_directory;
+
+ ret = lttng_directory_handle_init(&output_directory, opt_output_path);
+ if (ret) {
+ goto end;
+ }
+
+ chunk = lttng_trace_chunk_create_anonymous();
+ if (!chunk) {
+ goto end;
+ }
+
+ status = lttng_trace_chunk_set_credentials_current_user(chunk);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
+ goto end;
+ }
+
+ status = lttng_trace_chunk_set_as_owner(chunk, &output_directory);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
+ goto end;
+ }
+ session->current_trace_chunk = chunk;
+ chunk = NULL;
+end:
+ lttng_trace_chunk_put(chunk);
+ lttng_directory_handle_fini(&output_directory);
+ return ret;
+}
+
/*
* Create a new session by assigning a new session ID.
*
struct relay_session *session_create(const char *session_name,
const char *hostname, uint32_t live_timer,
bool snapshot, const lttng_uuid sessiond_uuid,
+ uint64_t *id_sessiond, uint64_t *current_chunk_id,
uint32_t major, uint32_t minor)
{
int ret;
session = zmalloc(sizeof(*session));
if (!session) {
- PERROR("relay session zmalloc");
+ PERROR("Failed to allocate session");
goto error;
}
if (lttng_strncpy(session->session_name, session_name,
sizeof(session->session_name))) {
+ WARN("Session name exceeds maximal allowed length");
goto error;
}
if (lttng_strncpy(session->hostname, hostname,
sizeof(session->hostname))) {
+ WARN("Hostname exceeds maximal allowed length");
goto error;
}
session->ctf_traces_ht = lttng_ht_new(0, LTTNG_HT_TYPE_STRING);
session->snapshot = snapshot;
lttng_uuid_copy(session->sessiond_uuid, sessiond_uuid);
+ if (id_sessiond) {
+ LTTNG_OPTIONAL_SET(&session->id_sessiond, *id_sessiond);
+ }
+
ret = sessiond_trace_chunk_registry_session_created(
sessiond_trace_chunk_registry, sessiond_uuid);
if (ret) {
goto error;
}
+ if (id_sessiond && current_chunk_id) {
+ session->current_trace_chunk =
+ sessiond_trace_chunk_registry_get_chunk(
+ sessiond_trace_chunk_registry,
+ session->sessiond_uuid,
+ session->id_sessiond.value,
+ *current_chunk_id);
+ if (!session->current_trace_chunk) {
+ char uuid_str[UUID_STR_LEN];
+
+ lttng_uuid_to_str(sessiond_uuid, uuid_str);
+ ERR("Could not find trace chunk: sessiond = {%s}, sessiond session id = %" PRIu64 ", trace chunk id = %" PRIu64,
+ uuid_str, *id_sessiond,
+ *current_chunk_id);
+ }
+ } else if (!id_sessiond) {
+ /*
+ * Pre-2.11 peers will not announce trace chunks. An
+ * anonymous trace chunk which will remain set for the
+ * duration of the session is created.
+ */
+ ret = session_set_anonymous_chunk(session);
+ if (ret) {
+ goto error;
+ }
+ }
+
lttng_ht_add_unique_u64(sessions_ht, &session->session_n);
return session;
error:
- free(session);
+ session_put(session);
return NULL;
}
#include <common/hashtable/hashtable.h>
#include <common/compat/uuid.h>
#include <common/trace-chunk.h>
+#include <common/optional.h>
/*
* Represents a session for the relay point of view
* It is used to match a set of streams to their session.
*/
uint64_t id;
+ /*
+ * ID of the session in the session daemon's domain.
+ * This information is only provided by 2.11+ peers.
+ */
+ LTTNG_OPTIONAL(uint64_t) id_sessiond;
+ /*
+ * Only provided by 2.11+ peers. However, the UUID is set to 'nil' in
+ * the other cases.
+ */
lttng_uuid sessiond_uuid;
char session_name[LTTNG_NAME_MAX];
char hostname[LTTNG_HOST_NAME_MAX];
struct relay_session *session_create(const char *session_name,
const char *hostname, uint32_t live_timer,
bool snapshot, const lttng_uuid sessiond_uuid,
+ uint64_t *id_sessiond, uint64_t *current_chunk_id,
uint32_t major, uint32_t minor);
struct relay_session *session_get_by_id(uint64_t id);
bool session_get(struct relay_session *session);
struct consumer_output *consumer,
struct consumer_socket *consumer_sock,
const char *session_name, const char *hostname,
- int session_live_timer)
+ int session_live_timer,
+ const uint64_t *current_chunk_id)
{
int ret;
struct lttcomm_relayd_sock *rsock = NULL;
/* Send relayd socket to consumer. */
ret = consumer_send_relayd_socket(consumer_sock, rsock, consumer,
relayd_uri->stype, session_id,
- session_name, hostname, session_live_timer);
+ session_name, hostname, session_live_timer,
+ current_chunk_id);
if (ret < 0) {
status = LTTNG_ERR_ENABLE_CONSUMER_FAIL;
goto close_sock;
enum lttng_domain_type domain,
unsigned int session_id, struct consumer_output *consumer,
struct consumer_socket *sock, const char *session_name,
- const char *hostname, int session_live_timer)
+ const char *hostname, int session_live_timer,
+ const uint64_t *current_chunk_id)
{
enum lttng_error_code status = LTTNG_OK;
if (!sock->control_sock_sent) {
status = send_consumer_relayd_socket(session_id,
&consumer->dst.net.control, consumer, sock,
- session_name, hostname, session_live_timer);
+ session_name, hostname, session_live_timer,
+ current_chunk_id);
if (status != LTTNG_OK) {
goto error;
}
if (!sock->data_sock_sent) {
status = send_consumer_relayd_socket(session_id,
&consumer->dst.net.data, consumer, sock,
- session_name, hostname, session_live_timer);
+ session_name, hostname, session_live_timer,
+ current_chunk_id);
if (status != LTTNG_OK) {
goto error;
}
struct ltt_kernel_session *ksess;
struct consumer_socket *socket;
struct lttng_ht_iter iter;
+ LTTNG_OPTIONAL(uint64_t) current_chunk_id = {};
- assert(session);
+ assert(session);
usess = session->ust_session;
ksess = session->kernel_session;
DBG("Setting relayd for session %s", session->name);
+ if (session->current_trace_chunk) {
+ enum lttng_trace_chunk_status status = lttng_trace_chunk_get_id(
+ session->current_trace_chunk, ¤t_chunk_id.value);
+
+ if (status == LTTNG_TRACE_CHUNK_STATUS_OK) {
+ current_chunk_id.is_set = true;
+ } else {
+ ERR("Failed to get current trace chunk id");
+ ret = LTTNG_ERR_UNK;
+ goto error;
+ }
+ }
+
rcu_read_lock();
if (usess && usess->consumer && usess->consumer->type == CONSUMER_DST_NET
ret = send_consumer_relayd_sockets(LTTNG_DOMAIN_UST, session->id,
usess->consumer, socket,
session->name, session->hostname,
- session->live_timer);
+ session->live_timer,
+ current_chunk_id.is_set ? ¤t_chunk_id.value : NULL);
pthread_mutex_unlock(socket->lock);
if (ret != LTTNG_OK) {
goto error;
ret = send_consumer_relayd_sockets(LTTNG_DOMAIN_KERNEL, session->id,
ksess->consumer, socket,
session->name, session->hostname,
- session->live_timer);
+ session->live_timer,
+ current_chunk_id.is_set ? ¤t_chunk_id.value : NULL);
pthread_mutex_unlock(socket->lock);
if (ret != LTTNG_OK) {
goto error;
enum lttng_error_code status = LTTNG_OK;
struct lttng_ht_iter iter;
struct consumer_socket *socket;
+ LTTNG_OPTIONAL(uint64_t) current_chunk_id = {};
assert(consumer);
assert(snap_output);
DBG2("Set relayd object from snapshot output");
+ if (session->current_trace_chunk) {
+ enum lttng_trace_chunk_status status = lttng_trace_chunk_get_id(
+ session->current_trace_chunk, ¤t_chunk_id.value);
+
+ if (status == LTTNG_TRACE_CHUNK_STATUS_OK) {
+ current_chunk_id.is_set = true;
+ } else {
+ ERR("Failed to get current trace chunk id");
+ status = LTTNG_ERR_UNK;
+ goto error;
+ }
+ }
+
/* Ignore if snapshot consumer output is not network. */
if (snap_output->consumer->type != CONSUMER_DST_NET) {
goto error;
status = send_consumer_relayd_sockets(0, session->id,
snap_output->consumer, socket,
session->name, session->hostname,
- session->live_timer);
+ session->live_timer,
+ current_chunk_id.is_set ? ¤t_chunk_id.value : NULL);
pthread_mutex_unlock(socket->lock);
if (status != LTTNG_OK) {
rcu_read_unlock();
struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer,
enum lttng_stream_type type, uint64_t session_id,
const char *session_name, const char *hostname,
- int session_live_timer)
+ int session_live_timer, const uint64_t *current_chunk_id)
{
int ret;
struct lttcomm_consumer_msg msg;
&msg.u.relayd_sock.relayd_session_id,
session_name, hostname, session_live_timer,
consumer->snapshot, session_id,
- sessiond_uuid);
+ sessiond_uuid, current_chunk_id);
if (ret < 0) {
/* Close the control socket. */
(void) relayd_close(rsock);
struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer,
enum lttng_stream_type type, uint64_t session_id,
const char *session_name, const char *hostname,
- int session_live_timer);
+ int session_live_timer, const uint64_t *current_chunk_id);
int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock,
int pipe);
int consumer_send_destroy_relayd(struct consumer_socket *sock,
int relayd_connect(struct lttcomm_relayd_sock *sock);
int relayd_close(struct lttcomm_relayd_sock *sock);
-int relayd_create_session(struct lttcomm_relayd_sock *sock,
- uint64_t *relayd_session_id, const char *session_name,
- const char *hostname, int session_live_timer,
+int relayd_create_session(struct lttcomm_relayd_sock *rsock,
+ uint64_t *relayd_session_id,
+ const char *session_name, const char *hostname,
+ int session_live_timer,
unsigned int snapshot, uint64_t sessiond_session_id,
- const lttng_uuid sessiond_uuid);
+ const lttng_uuid sessiond_uuid,
+ const uint64_t *current_chunk_id);
int relayd_add_stream(struct lttcomm_relayd_sock *sock, const char *channel_name,
const char *pathname, uint64_t *stream_id,
uint64_t tracefile_size, uint64_t tracefile_count,