ERR("Path name too long");
goto error;
}
- path_name = create_output_path(stream_info.pathname);
+ path_name = strdup(stream_info.pathname);
if (!path_name) {
PERROR("Path name allocation");
ret = -ENOMEM;
header.pathname_len = be32toh(header.pathname_len);
header.tracefile_size = be64toh(header.tracefile_size);
header.tracefile_count = be64toh(header.tracefile_count);
- header.trace_archive_id = be64toh(header.trace_archive_id);
+ header.trace_chunk_id = be64toh(header.trace_chunk_id);
received_names_size = header.channel_name_len + header.pathname_len;
if (payload->size < header_len + received_names_size) {
goto error;
}
- path_name = create_output_path(pathname_view.data);
+ path_name = strdup(pathname_view.data);
if (!path_name) {
PERROR("Path name allocation");
ret = -ENOMEM;
*tracefile_size = header.tracefile_size;
*tracefile_count = header.tracefile_count;
- *trace_archive_id = header.trace_archive_id;
+ *trace_archive_id = header.trace_chunk_id;
*ret_path_name = path_name;
*ret_channel_name = channel_name;
/* Move ownership to caller */
ERR("Path name too long");
goto error;
}
- path_name = create_output_path(stream_info.pathname);
+ path_name = strdup(stream_info.pathname);
if (!path_name) {
PERROR("Path name allocation");
ret = -ENOMEM;
assert(cds_list_empty(&trace->stream_list));
session_put(trace->session);
trace->session = NULL;
+ free(trace->path);
+ trace->path = NULL;
call_rcu(&trace->rcu_node, rcu_destroy_ctf_trace);
}
* put their reference, its refcount drops to 0.
*/
static struct ctf_trace *ctf_trace_create(struct relay_session *session,
- char *path_name)
+ const char *subpath)
{
struct ctf_trace *trace;
trace = zmalloc(sizeof(*trace));
if (!trace) {
- PERROR("ctf_trace alloc");
- goto error;
+ PERROR("Failed to allocate ctf_trace");
+ goto end;
}
+ urcu_ref_init(&trace->ref);
if (!session_get(session)) {
- ERR("Cannot get session");
- free(trace);
- trace = NULL;
+ ERR("Failed to acquire session reference");
goto error;
}
trace->session = session;
+ trace->path = strdup(subpath);
+ if (!trace->path) {
+ goto error;
+ }
CDS_INIT_LIST_HEAD(&trace->stream_list);
trace->id = ++last_relay_ctf_trace_id;
pthread_mutex_unlock(&last_relay_ctf_trace_id_lock);
- lttng_ht_node_init_str(&trace->node, path_name);
+ lttng_ht_node_init_str(&trace->node, trace->path);
trace->session = session;
- urcu_ref_init(&trace->ref);
pthread_mutex_init(&trace->lock, NULL);
pthread_mutex_init(&trace->stream_list_lock, NULL);
lttng_ht_add_str(session->ctf_traces_ht, &trace->node);
- DBG("Created ctf_trace %" PRIu64 " with path: %s", trace->id, path_name);
+ DBG("Created ctf_trace %" PRIu64 "of session \"%s\" from host \"%s\" with path: %s",
+ trace->id, session->session_name, session->hostname,
+ subpath);
-error:
+end:
return trace;
+error:
+ ctf_trace_put(trace);
+ return NULL;
}
/*
* ctf_trace_put().
*/
struct ctf_trace *ctf_trace_get_by_path_or_create(struct relay_session *session,
- char *path_name)
+ const char *subpath)
{
struct lttng_ht_node_str *node;
struct lttng_ht_iter iter;
struct ctf_trace *trace = NULL;
rcu_read_lock();
- lttng_ht_lookup(session->ctf_traces_ht, (void *) path_name, &iter);
+ lttng_ht_lookup(session->ctf_traces_ht, subpath, &iter);
node = lttng_ht_iter_get_node_str(&iter);
if (!node) {
- DBG("CTF Trace path %s not found", path_name);
+ DBG("CTF Trace path %s not found", subpath);
goto end;
}
trace = caa_container_of(node, struct ctf_trace, node);
rcu_read_unlock();
if (!trace) {
/* Try to create */
- trace = ctf_trace_create(session, path_name);
+ trace = ctf_trace_create(session, subpath);
}
return trace;
}
struct urcu_ref ref; /* Every stream has a ref on the trace. */
struct relay_session *session; /* Back ref to trace session */
+ /* Trace sub-folder relative to the session output path. */
+ char *path;
+ bool index_folder_created;
+
/*
* The ctf_trace lock nests inside the session lock.
*/
};
struct ctf_trace *ctf_trace_get_by_path_or_create(struct relay_session *session,
- char *path_name);
+ const char *subpath);
bool ctf_trace_get(struct ctf_trace *trace);
void ctf_trace_put(struct ctf_trace *trace);
pthread_mutex_unlock(&session->lock);
}
+static int conform_channel_path(char *channel_path)
+{
+ int ret = 0;
+
+ if (strstr("../", channel_path)) {
+ ERR("Refusing channel path as it walks up the path hierarchy: \"%s\"",
+ channel_path);
+ ret = -1;
+ goto end;
+ }
+
+ if (*channel_path == '/') {
+ const size_t len = strlen(channel_path);
+
+ /*
+ * Channel paths from peers prior to 2.11 are expressed as an
+ * absolute path that is, in reality, relative to the relay
+ * daemon's output directory. Remove the leading slash so it
+ * is correctly interpreted as a relative path later on.
+ *
+ * len (and not len - 1) is used to copy the trailing NULL.
+ */
+ bcopy(channel_path + 1, channel_path, len);
+ }
+end:
+ return ret;
+}
+
/*
* relay_add_stream: allocate a new stream for a session
*/
uint64_t stream_handle = -1ULL;
char *path_name = NULL, *channel_name = NULL;
uint64_t tracefile_size = 0, tracefile_count = 0;
- struct relay_stream_chunk_id stream_chunk_id = { 0 };
+ LTTNG_OPTIONAL(uint64_t) stream_chunk_id = {};
if (!session || !conn->version_check_done) {
ERR("Trying to add a stream before version check");
goto send_reply;
}
+ if (conform_channel_path(path_name)) {
+ goto send_reply;
+ }
+
trace = ctf_trace_get_by_path_or_create(session, path_name);
if (!trace) {
goto send_reply;
/* We pass ownership of path_name and channel_name. */
stream = stream_create(trace, stream_handle, path_name,
- channel_name, tracefile_size, tracefile_count,
- &stream_chunk_id);
+ channel_name, tracefile_size, tracefile_count);
path_name = NULL;
channel_name = NULL;
*/
static
int create_rotate_index_file(struct relay_stream *stream,
- const char *stream_path)
+ const char *channel_path)
{
int ret;
uint32_t major, minor;
+ ASSERT_LOCKED(stream->lock);
+
/* Put ref on previous index_file. */
if (stream->index_file) {
lttng_index_file_put(stream->index_file);
}
major = stream->trace->session->major;
minor = stream->trace->session->minor;
- stream->index_file = lttng_index_file_create(stream_path,
- stream->channel_name,
- -1, -1, stream->tracefile_size,
- tracefile_array_get_file_index_head(stream->tfa),
+ if (!stream->trace->index_folder_created) {
+ char *index_subpath = NULL;
+
+ ret = asprintf(&index_subpath, "%s/%s", channel_path, DEFAULT_INDEX_DIR);
+ if (ret < 0) {
+ goto end;
+ }
+
+ ret = lttng_trace_chunk_create_subdirectory(stream->trace_chunk, index_subpath);
+ free(index_subpath);
+ if (ret) {
+ goto end;
+ }
+ stream->trace->index_folder_created = true;
+ }
+ stream->index_file = lttng_index_file_create_from_trace_chunk(
+ stream->trace_chunk, channel_path, stream->channel_name,
+ stream->tracefile_size, stream->tracefile_count,
lttng_to_index_major(major, minor),
- lttng_to_index_minor(major, minor));
+ lttng_to_index_minor(major, minor), true);
if (!stream->index_file) {
ret = -1;
goto end;
goto end_stream_unlock;
}
- assert(stream->current_chunk_id.is_set);
- stream->current_chunk_id.value = stream_info.new_chunk_id;
-
if (stream->is_metadata) {
/*
* Metadata streams have no index; consider its rotation
* e.g. /home/user/lttng-traces/hostname/session_name
*/
char *full_session_path = NULL;
+ char creation_time_str[16];
+ struct tm *timeinfo;
+
+ assert(session->creation_time.is_set);
+ timeinfo = localtime(&session->creation_time.value);
+ if (!timeinfo) {
+ ret = -1;
+ goto end;
+ }
+ strftime(creation_time_str, sizeof(creation_time_str), "%Y%m%d-%H%M%S",
+ timeinfo);
pthread_mutex_lock(&session->lock);
- ret = asprintf(&session_directory, "%s/%s", session->hostname,
- session->session_name);
+ ret = asprintf(&session_directory, "%s/%s-%s", session->hostname,
+ session->session_name, creation_time_str);
pthread_mutex_unlock(&session->lock);
if (ret < 0) {
PERROR("Failed to format session directory name");
#include "stream.h"
#include "viewer-stream.h"
+#include <sys/types.h>
+#include <fcntl.h>
+
/* Should be called with RCU read-side lock held. */
bool stream_get(struct relay_stream *stream)
{
return stream;
}
+static int stream_create_data_output_file(struct relay_stream *stream)
+{
+ int ret, fd;
+ enum lttng_trace_chunk_status status;
+ const int flags = O_RDWR | O_CREAT | O_TRUNC;
+ const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
+ char stream_path[LTTNG_PATH_MAX];
+
+ ASSERT_LOCKED(stream->lock);
+ assert(stream->trace_chunk);
+
+ if (stream->stream_fd) {
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = NULL;
+ }
+
+ ret = utils_stream_file_path(stream->path_name, stream->channel_name,
+ stream->tracefile_size, stream->tracefile_count, NULL,
+ stream_path, sizeof(stream_path));
+ if (ret < 0) {
+ goto end;
+ }
+
+ DBG("Opening stream output file \"%s\"", stream_path);
+ status = lttng_trace_chunk_open_file(
+ stream->trace_chunk, stream_path, flags, mode, &fd);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to open stream file \"%s\"", stream->channel_name);
+ ret = -1;
+ goto end;
+ }
+
+ stream->stream_fd = stream_fd_create(fd);
+ if (!stream->stream_fd) {
+ if (close(ret)) {
+ PERROR("Error closing stream file descriptor %d", ret);
+ }
+ ret = -1;
+ goto end;
+ }
+end:
+ return ret;
+}
+
+static int stream_set_trace_chunk(struct relay_stream *stream,
+ struct lttng_trace_chunk *chunk)
+{
+ int ret = 0;
+ enum lttng_trace_chunk_status status;
+ bool acquired_reference;
+
+ pthread_mutex_lock(&stream->lock);
+ status = lttng_trace_chunk_create_subdirectory(chunk,
+ stream->path_name);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
+ goto end;
+ }
+
+ lttng_trace_chunk_put(stream->trace_chunk);
+ acquired_reference = lttng_trace_chunk_get(chunk);
+ assert(acquired_reference);
+ stream->trace_chunk = chunk;
+ ret = stream_create_data_output_file(stream);
+end:
+ pthread_mutex_unlock(&stream->lock);
+ return ret;
+}
+
/*
* We keep ownership of path_name and channel_name.
*/
struct relay_stream *stream_create(struct ctf_trace *trace,
uint64_t stream_handle, char *path_name,
char *channel_name, uint64_t tracefile_size,
- uint64_t tracefile_count,
- const struct relay_stream_chunk_id *chunk_id)
+ uint64_t tracefile_count)
{
int ret;
struct relay_stream *stream = NULL;
struct relay_session *session = trace->session;
+ bool acquired_reference = false;
+ struct lttng_trace_chunk *current_trace_chunk;
stream = zmalloc(sizeof(struct relay_stream));
if (stream == NULL) {
urcu_ref_init(&stream->ref);
ctf_trace_get(trace);
stream->trace = trace;
- stream->current_chunk_id = *chunk_id;
- stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
- if (!stream->indexes_ht) {
- ERR("Cannot created indexes_ht");
+ pthread_mutex_lock(&trace->session->lock);
+ current_trace_chunk = trace->session->current_trace_chunk;
+ if (current_trace_chunk) {
+ acquired_reference = lttng_trace_chunk_get(current_trace_chunk);
+ }
+ pthread_mutex_unlock(&trace->session->lock);
+ if (!acquired_reference) {
+ ERR("Cannot create stream for channel \"%s\" as a reference to the session's current trace chunk could not be acquired",
+ channel_name);
ret = -1;
goto end;
}
- ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG,
- -1, -1);
- if (ret < 0) {
- ERR("relay creating output directory");
+ stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!stream->indexes_ht) {
+ ERR("Cannot created indexes_ht");
+ ret = -1;
goto end;
}
- /*
- * No need to use run_as API here because whatever we receive,
- * the relayd uses its own credentials for the stream files.
- */
- ret = utils_create_stream_file(stream->path_name, stream->channel_name,
- stream->tracefile_size, 0, -1, -1, NULL);
- if (ret < 0) {
- ERR("Create output file");
- goto end;
- }
- stream->stream_fd = stream_fd_create(ret);
- if (!stream->stream_fd) {
- if (close(ret)) {
- PERROR("Error closing file %d", ret);
- }
+ ret = stream_set_trace_chunk(stream, current_trace_chunk);
+ if (ret) {
+ ERR("Failed to set the current trace chunk of session \"%s\" on newly created stream of channel \"%s\"",
+ trace->session->session_name,
+ stream->channel_name);
ret = -1;
goto end;
}
ret = -1;
goto end;
}
- if (stream->tracefile_size) {
- DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
- } else {
- DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
- }
-
- if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, LTTNG_NAME_MAX)) {
- stream->is_metadata = 1;
- }
+ stream->is_metadata = !strcmp(stream->channel_name,
+ DEFAULT_METADATA_NAME);
stream->in_recv_list = true;
/*
stream_put(stream);
stream = NULL;
}
+ lttng_trace_chunk_put(current_trace_chunk);
return stream;
error_no_alloc:
ctf_trace_put(stream->trace);
stream->trace = NULL;
}
+ lttng_trace_chunk_put(stream->trace_chunk);
+ stream->trace_chunk = NULL;
call_rcu(&stream->rcu_node, stream_destroy_rcu);
}
#include <urcu/list.h>
#include <common/hashtable/hashtable.h>
+#include <common/trace-chunk.h>
#include "session.h"
#include "stream-fd.h"
#include "tracefile-array.h"
-struct relay_stream_chunk_id {
- bool is_set;
- uint64_t value;
-};
-
/*
* Represents a stream in the relay
*/
bool data_rotated;
bool index_rotated;
/*
- * This is the id of the chunk where we are writing to if no rotation is
- * pending (rotate_at_seq_num == -1ULL). If a rotation is pending, this
- * is the chunk_id we will have after the rotation. It must be updated
- * atomically with rotate_at_seq_num.
- *
- * Always access with stream lock held.
- *
- * This attribute is not set if the stream is created by a pre-2.11
- * consumer.
+ * `trace_chunk` is the trace chunk to which the file currently
+ * being produced (if any) belongs.
*/
- struct relay_stream_chunk_id current_chunk_id;
+ struct lttng_trace_chunk *trace_chunk;
};
struct relay_stream *stream_create(struct ctf_trace *trace,
uint64_t stream_handle, char *path_name,
char *channel_name, uint64_t tracefile_size,
- uint64_t tracefile_count, const struct relay_stream_chunk_id *chunk_id);
+ uint64_t tracefile_count);
struct relay_stream *stream_get_by_id(uint64_t stream_id);
bool stream_get(struct relay_stream *stream);
struct lttng_trace_chunk *trace_chunk;
trace_chunk = session_create_new_trace_chunk(
- session, NULL, NULL);
+ session, NULL, NULL, NULL);
if (!trace_chunk) {
ret = LTTNG_ERR_CREATE_DIR_FAIL;
goto error;
* Return LTTNG_OK on success or a LTTNG_ERR code.
*/
static enum lttng_error_code set_relayd_for_snapshot(
- struct consumer_output *consumer,
- const struct snapshot_output *snap_output,
+ struct consumer_output *output,
const struct ltt_session *session)
{
enum lttng_error_code status = LTTNG_OK;
struct consumer_socket *socket;
LTTNG_OPTIONAL(uint64_t) current_chunk_id = {};
- assert(consumer);
- assert(snap_output);
+ assert(output);
assert(session);
DBG2("Set relayd object from snapshot output");
if (session->current_trace_chunk) {
- enum lttng_trace_chunk_status status = lttng_trace_chunk_get_id(
- session->current_trace_chunk, ¤t_chunk_id.value);
+ enum lttng_trace_chunk_status chunk_status =
+ lttng_trace_chunk_get_id(
+ session->current_trace_chunk,
+ ¤t_chunk_id.value);
- if (status == LTTNG_TRACE_CHUNK_STATUS_OK) {
+ if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK) {
current_chunk_id.is_set = true;
} else {
ERR("Failed to get current trace chunk id");
}
/* Ignore if snapshot consumer output is not network. */
- if (snap_output->consumer->type != CONSUMER_DST_NET) {
+ if (output->type != CONSUMER_DST_NET) {
goto error;
}
* snapshot output.
*/
rcu_read_lock();
- cds_lfht_for_each_entry(snap_output->consumer->socks->ht, &iter.iter,
+ cds_lfht_for_each_entry(output->socks->ht, &iter.iter,
socket, node.node) {
pthread_mutex_lock(socket->lock);
status = send_consumer_relayd_sockets(0, session->id,
- snap_output->consumer, socket,
+ output, socket,
session->name, session->hostname,
session->live_timer,
current_chunk_id.is_set ? ¤t_chunk_id.value : NULL,
*/
static enum lttng_error_code record_kernel_snapshot(
struct ltt_kernel_session *ksess,
- const struct snapshot_output *output,
+ const struct consumer_output *output,
const struct ltt_session *session,
int wait, uint64_t nb_packets_per_stream)
{
- int ret;
enum lttng_error_code status;
assert(ksess);
assert(output);
assert(session);
- /*
- * Copy kernel session sockets so we can communicate with the right
- * consumer for the snapshot record command.
- */
- ret = consumer_copy_sockets(output->consumer, ksess->consumer);
- if (ret < 0) {
- status = LTTNG_ERR_NOMEM;
- goto error;
- }
-
- status = set_relayd_for_snapshot(ksess->consumer, output, session);
- if (status != LTTNG_OK) {
- goto error_snapshot;
- }
-
- status = kernel_snapshot_record(ksess, output, wait, nb_packets_per_stream);
- if (status != LTTNG_OK) {
- goto error_snapshot;
- }
-
- goto end;
-
-error_snapshot:
- /* Clean up copied sockets so this output can use some other later on. */
- consumer_destroy_output_sockets(output->consumer);
-error:
-end:
+ status = kernel_snapshot_record(
+ ksess, output, wait, nb_packets_per_stream);
return status;
}
* Returns LTTNG_OK on success or a LTTNG_ERR error code.
*/
static enum lttng_error_code record_ust_snapshot(struct ltt_ust_session *usess,
- const struct snapshot_output *output,
- const struct ltt_session *session, int wait,
- uint64_t nb_packets_per_stream)
+ const struct consumer_output *output,
+ const struct ltt_session *session,
+ int wait, uint64_t nb_packets_per_stream)
{
- int ret;
enum lttng_error_code status;
assert(usess);
assert(output);
assert(session);
- /*
- * Copy UST session sockets so we can communicate with the right
- * consumer for the snapshot record command.
- */
- ret = consumer_copy_sockets(output->consumer, usess->consumer);
- if (ret < 0) {
- status = LTTNG_ERR_NOMEM;
- goto error;
- }
-
- status = set_relayd_for_snapshot(usess->consumer, output, session);
- if (status != LTTNG_OK) {
- goto error_snapshot;
- }
-
- status = ust_app_snapshot_record(usess, output, wait,
- nb_packets_per_stream);
- if (status != LTTNG_OK) {
- goto error_snapshot;
- }
-
- goto end;
-
-error_snapshot:
- /* Clean up copied sockets so this output can use some other later on. */
- consumer_destroy_output_sockets(output->consumer);
-error:
-end:
+ status = ust_app_snapshot_record(
+ usess, output, wait, nb_packets_per_stream);
return status;
}
enum lttng_error_code snapshot_record(struct ltt_session *session,
const struct snapshot_output *snapshot_output, int wait)
{
- int fmt_ret;
int64_t nb_packets_per_stream;
char snapshot_chunk_name[LTTNG_NAME_MAX];
- enum lttng_error_code ret = LTTNG_OK;
+ int ret;
+ enum lttng_error_code ret_code = LTTNG_OK;
struct lttng_trace_chunk *snapshot_trace_chunk;
+ struct consumer_output *original_ust_consumer_output = NULL;
+ struct consumer_output *original_kernel_consumer_output = NULL;
+ struct consumer_output *snapshot_ust_consumer_output = NULL;
+ struct consumer_output *snapshot_kernel_consumer_output = NULL;
- fmt_ret = snprintf(snapshot_chunk_name, sizeof(snapshot_chunk_name),
+ ret = snprintf(snapshot_chunk_name, sizeof(snapshot_chunk_name),
"%s-%s-%" PRIu64,
snapshot_output->name,
snapshot_output->datetime,
snapshot_output->nb_snapshot);
- if (fmt_ret < 0 || fmt_ret >= sizeof(snapshot_chunk_name)) {
+ if (ret < 0 || ret >= sizeof(snapshot_chunk_name)) {
ERR("Failed to format snapshot name");
- ret = LTTNG_ERR_INVALID;
- goto end;
+ ret_code = LTTNG_ERR_INVALID;
+ goto error;
}
DBG("Recording snapshot \"%s\" for session \"%s\" with chunk name \"%s\"",
snapshot_output->name, session->name,
snapshot_chunk_name);
+ if (!session->kernel_session && !session->ust_session) {
+ ERR("Failed to record snapshot as no channels exist");
+ ret_code = LTTNG_ERR_NO_CHANNEL;
+ goto error;
+ }
+
+ if (session->kernel_session) {
+ original_kernel_consumer_output =
+ session->kernel_session->consumer;
+ snapshot_kernel_consumer_output =
+ consumer_copy_output(snapshot_output->consumer);
+ ret = consumer_copy_sockets(snapshot_kernel_consumer_output,
+ original_kernel_consumer_output);
+ if (ret < 0) {
+ ERR("Failed to copy consumer sockets from snapshot output configuration");
+ ret_code = LTTNG_ERR_NOMEM;
+ goto error;
+ }
+ ret_code = set_relayd_for_snapshot(
+ snapshot_kernel_consumer_output, session);
+ if (ret_code != LTTNG_OK) {
+ ERR("Failed to setup relay daemon for kernel tracer snapshot");
+ goto error;
+ }
+ session->kernel_session->consumer =
+ snapshot_kernel_consumer_output;
+ }
+ if (session->ust_session) {
+ original_ust_consumer_output = session->ust_session->consumer;
+ snapshot_ust_consumer_output =
+ consumer_copy_output(snapshot_output->consumer);
+ ret = consumer_copy_sockets(snapshot_ust_consumer_output,
+ original_ust_consumer_output);
+ if (ret < 0) {
+ ERR("Failed to copy consumer sockets from snapshot output configuration");
+ ret_code = LTTNG_ERR_NOMEM;
+ goto error;
+ }
+ ret_code = set_relayd_for_snapshot(
+ snapshot_ust_consumer_output, session);
+ if (ret_code != LTTNG_OK) {
+ ERR("Failed to setup relay daemon for userspace tracer snapshot");
+ goto error;
+ }
+ session->ust_session->consumer =
+ snapshot_ust_consumer_output;
+ }
+
snapshot_trace_chunk = session_create_new_trace_chunk(session,
- snapshot_output_get_base_path(snapshot_output),
+ snapshot_kernel_consumer_output ?:
+ snapshot_ust_consumer_output,
+ consumer_output_get_base_path(
+ snapshot_output->consumer),
snapshot_chunk_name);
if (!snapshot_trace_chunk) {
- ret = LTTNG_ERR_CREATE_DIR_FAIL;
- goto end;
+ ERR("Failed to create temporary trace chunk to record a snapshot of session \"%s\"",
+ session->name);
+ ret_code = LTTNG_ERR_CREATE_DIR_FAIL;
+ goto error;
}
assert(!session->current_trace_chunk);
ret = session_set_trace_chunk(session, snapshot_trace_chunk, NULL);
lttng_trace_chunk_put(snapshot_trace_chunk);
snapshot_trace_chunk = NULL;
if (ret) {
- ret = LTTNG_ERR_CREATE_TRACE_CHUNK_FAIL_CONSUMER;
- goto end;
+ ERR("Failed to set temporary trace chunk to record a snapshot of session \"%s\"",
+ session->name);
+ ret_code = LTTNG_ERR_CREATE_TRACE_CHUNK_FAIL_CONSUMER;
+ goto error;
}
nb_packets_per_stream = get_session_nb_packets_per_stream(session,
snapshot_output->max_size);
if (nb_packets_per_stream < 0) {
- ret = LTTNG_ERR_MAX_SIZE_INVALID;
- goto end;
+ ret_code = LTTNG_ERR_MAX_SIZE_INVALID;
+ goto error;
}
if (session->kernel_session) {
- ret = record_kernel_snapshot(session->kernel_session,
- snapshot_output, session,
+ ret_code = record_kernel_snapshot(session->kernel_session,
+ snapshot_kernel_consumer_output, session,
wait, nb_packets_per_stream);
- if (ret != LTTNG_OK) {
- goto end;
+ if (ret_code != LTTNG_OK) {
+ goto error;
}
}
if (session->ust_session) {
- ret = record_ust_snapshot(session->ust_session,
- snapshot_output, session,
+ ret_code = record_ust_snapshot(session->ust_session,
+ snapshot_ust_consumer_output, session,
wait, nb_packets_per_stream);
- if (ret != LTTNG_OK) {
- goto end;
+ if (ret_code != LTTNG_OK) {
+ goto error;
}
}
*/
ERR("Failed to close snapshot trace chunk of session \"%s\"",
session->name);
- ret = -1;
+ ret_code = LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER;
}
if (session_set_trace_chunk(session, NULL, NULL)) {
ERR("Failed to release the current trace chunk of session \"%s\"",
session->name);
- ret = -1;
+ ret_code = LTTNG_ERR_UNK;
}
-end:
- return ret;
+error:
+ if (original_ust_consumer_output) {
+ session->ust_session->consumer = original_ust_consumer_output;
+ }
+ if (original_kernel_consumer_output) {
+ session->kernel_session->consumer =
+ original_kernel_consumer_output;
+ }
+ consumer_output_put(snapshot_ust_consumer_output);
+ consumer_output_put(snapshot_kernel_consumer_output);
+ return ret_code;
}
/*
session->rotation_state = LTTNG_ROTATION_STATE_ONGOING;
if (session->active) {
- new_trace_chunk = session_create_new_trace_chunk(session,
+ new_trace_chunk = session_create_new_trace_chunk(session, NULL,
NULL, NULL);
if (!new_trace_chunk) {
cmd_ret = LTTNG_ERR_CREATE_DIR_FAIL;
* object reference is not needed anymore.
*/
struct consumer_socket *consumer_find_socket_by_bitness(int bits,
- struct consumer_output *consumer)
+ const struct consumer_output *consumer)
{
int consumer_fd;
struct consumer_socket *socket = NULL;
* returned consumer_socket.
*/
struct consumer_socket *consumer_find_socket(int key,
- struct consumer_output *consumer)
+ const struct consumer_output *consumer)
{
struct lttng_ht_iter iter;
struct lttng_ht_node_ulong *node;
}
/*
- * Delte consumer socket to consumer output object. Read side lock must be
+ * Delete consumer socket to consumer output object. Read side lock must be
* acquired before calling this function.
*/
void consumer_del_socket(struct consumer_socket *sock,
* Returns LTTNG_OK on success or else an LTTng error code.
*/
enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket,
- uint64_t key, const struct snapshot_output *output, int metadata,
+ uint64_t key, const struct consumer_output *output, int metadata,
uid_t uid, gid_t gid, const char *channel_path, int wait,
uint64_t nb_packets_per_stream)
{
assert(socket);
assert(output);
- assert(output->consumer);
DBG("Consumer snapshot channel key %" PRIu64, key);
msg.u.snapshot_channel.nb_packets_per_stream = nb_packets_per_stream;
msg.u.snapshot_channel.metadata = metadata;
- if (output->consumer->type == CONSUMER_DST_NET) {
+ if (output->type == CONSUMER_DST_NET) {
msg.u.snapshot_channel.relayd_id =
- output->consumer->net_seq_index;
+ output->net_seq_index;
msg.u.snapshot_channel.use_relayd = 1;
} else {
msg.u.snapshot_channel.relayd_id = (uint64_t) -1ULL;
};
struct consumer_socket *consumer_find_socket(int key,
- struct consumer_output *consumer);
+ const struct consumer_output *consumer);
struct consumer_socket *consumer_find_socket_by_bitness(int bits,
- struct consumer_output *consumer);
+ const struct consumer_output *consumer);
struct consumer_socket *consumer_allocate_socket(int *fd);
void consumer_add_socket(struct consumer_socket *sock,
struct consumer_output *consumer);
/* Snapshot command. */
enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket,
- uint64_t key, const struct snapshot_output *output, int metadata,
+ uint64_t key, const struct consumer_output *output, int metadata,
uid_t uid, gid_t gid, const char *channel_path, int wait,
uint64_t nb_packets_per_stream);
*/
enum lttng_error_code kernel_snapshot_record(
struct ltt_kernel_session *ksess,
- const struct snapshot_output *output, int wait,
+ const struct consumer_output *output, int wait,
uint64_t nb_packets_per_stream)
{
int err, ret, saved_metadata_fd;
}
/* Send metadata to consumer and snapshot everything. */
- cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter,
+ cds_lfht_for_each_entry(output->socks->ht, &iter.iter,
socket, node.node) {
- struct consumer_output *saved_output;
struct ltt_kernel_channel *chan;
- /*
- * Temporarly switch consumer output for our snapshot output. As long
- * as the session lock is taken, this is safe.
- */
- saved_output = ksess->consumer;
- ksess->consumer = output->consumer;
-
pthread_mutex_lock(socket->lock);
/* This stream must not be monitored by the consumer. */
ret = kernel_consumer_add_metadata(socket, ksess, 0);
pthread_mutex_unlock(socket->lock);
- /* Put back the saved consumer output into the session. */
- ksess->consumer = saved_output;
if (ret < 0) {
status = LTTNG_ERR_KERN_META_FAIL;
goto error_consumer;
void kernel_destroy_channel(struct ltt_kernel_channel *kchan);
enum lttng_error_code kernel_snapshot_record(
struct ltt_kernel_session *ksess,
- const struct snapshot_output *output, int wait,
+ const struct consumer_output *output, int wait,
uint64_t nb_packets_per_stream);
int kernel_syscall_mask(int chan_fd, char **syscall_mask, uint32_t *nr_bits);
enum lttng_error_code kernel_rotate_session(struct ltt_session *session);
}
static
-bool output_supports_trace_chunks(const struct ltt_session *session)
+bool output_supports_trace_chunks(const struct consumer_output *output)
{
- if (session->consumer->type == CONSUMER_DST_LOCAL) {
+ if (output->type == CONSUMER_DST_LOCAL) {
return true;
} else {
- struct consumer_output *output;
-
- if (session->ust_session) {
- output = session->ust_session->consumer;
- } else if (session->kernel_session) {
- output = session->kernel_session->consumer;
- } else {
- abort();
- }
-
if (output->relay_major_version > 2) {
return true;
} else if (output->relay_major_version == 2 &&
}
struct lttng_trace_chunk *session_create_new_trace_chunk(
- struct ltt_session *session,
+ const struct ltt_session *session,
+ const struct consumer_output *consumer_output_override,
const char *session_base_path_override,
const char *chunk_name_override)
{
struct lttng_trace_chunk *trace_chunk = NULL;
enum lttng_trace_chunk_status chunk_status;
const time_t chunk_creation_ts = time(NULL);
- const bool is_local_trace =
- session->consumer->type == CONSUMER_DST_LOCAL;
- const char *base_path = session_base_path_override ? :
- session_get_base_path(session);
+ bool is_local_trace;
+ const char *base_path;
struct lttng_directory_handle session_output_directory;
const struct lttng_credentials session_credentials = {
.uid = session->uid,
.gid = session->gid,
};
uint64_t next_chunk_id;
+ const struct consumer_output *output;
+
+ if (consumer_output_override) {
+ output = consumer_output_override;
+ } else {
+ assert(session->ust_session || session->kernel_session);
+ output = session->ust_session ?
+ session->ust_session->consumer :
+ session->kernel_session->consumer;
+ }
+
+ is_local_trace = output->type == CONSUMER_DST_LOCAL;
+ base_path = session_base_path_override ? :
+ consumer_output_get_base_path(output);
if (chunk_creation_ts == (time_t) -1) {
PERROR("Failed to sample time while creation session \"%s\" trace chunk",
goto error;
}
- if (!output_supports_trace_chunks(session)) {
+ if (!output_supports_trace_chunks(output)) {
goto end;
}
next_chunk_id = session->most_recent_chunk_id.is_set ?
/* Create a new trace chunk object from the session's configuration. */
struct lttng_trace_chunk *session_create_new_trace_chunk(
- struct ltt_session *session,
+ const struct ltt_session *session,
+ const struct consumer_output *consumer_output_override,
const char *session_base_path_override,
const char *chunk_name_override);
*/
enum lttng_error_code ust_app_snapshot_record(
const struct ltt_ust_session *usess,
- const struct snapshot_output *output, int wait,
+ const struct consumer_output *output, int wait,
uint64_t nb_packets_per_stream)
{
int ret = 0;
/* Get the right consumer socket for the application. */
socket = consumer_find_socket_by_bitness(app->bits_per_long,
- output->consumer);
+ output);
if (!socket) {
status = LTTNG_ERR_INVALID;
goto error;
void ust_app_destroy(struct ust_app *app);
enum lttng_error_code ust_app_snapshot_record(
const struct ltt_ust_session *usess,
- const struct snapshot_output *output, int wait,
+ const struct consumer_output *output, int wait,
uint64_t nb_packets_per_stream);
uint64_t ust_app_get_size_one_more_packet_per_stream(
const struct ltt_ust_session *usess, uint64_t cur_nr_packets);
}
static inline
enum lttng_error_code ust_app_snapshot_record(struct ltt_ust_session *usess,
- struct snapshot_output *output, int wait, uint64_t max_stream_size)
+ struct consumer_output *output, int wait, uint64_t max_stream_size)
{
return 0;
}
}
/* Get correct path name destination */
- if (consumer->type == CONSUMER_DST_LOCAL) {
- /* Set application path to the destination path */
- ret = snprintf(pathname, LTTNG_PATH_MAX, "%s%s",
- consumer->domain_subdir, ua_sess->path);
- DBG3("Userspace local consumer trace path relative to current trace chunk: \"%s\"",
- pathname);
- } else {
+ if (consumer->type == CONSUMER_DST_NET &&
+ consumer->relay_major_version == 2 &&
+ consumer->relay_minor_version < 11) {
ret = snprintf(pathname, LTTNG_PATH_MAX, "%s%s/%s%s",
consumer->dst.net.base_dir,
- consumer->chunk_path,
- consumer->domain_subdir,
+ consumer->chunk_path, consumer->domain_subdir,
ua_sess->path);
+ } else {
+ ret = snprintf(pathname, LTTNG_PATH_MAX, "%s%s",
+ consumer->domain_subdir, ua_sess->path);
}
+ DBG3("Userspace consumer trace path relative to current trace chunk: \"%s\"",
+ pathname);
if (ret < 0) {
PERROR("Failed to format channel path");
goto error;
}
return pathname;
-
error:
free(pathname);
return NULL;
const char *session_get_base_path(const struct ltt_session *session)
{
- return session->net_handle > 0 ?
- session->consumer->dst.net.base_dir :
- session->consumer->dst.session_root_path;
+ return consumer_output_get_base_path(session->consumer);
}
-const char *snapshot_output_get_base_path(
- const struct snapshot_output *snapshot_output)
+const char *consumer_output_get_base_path(const struct consumer_output *output)
{
- return snapshot_output->consumer->type == CONSUMER_DST_LOCAL ?
- snapshot_output->consumer->dst.session_root_path :
- snapshot_output->consumer->dst.net.base_dir;
+ return output->type == CONSUMER_DST_LOCAL ?
+ output->dst.session_root_path :
+ output->dst.net.base_dir;
}
struct lttng_ht;
struct ltt_session;
-struct snapshot_output;
+struct consumer_output;
const char *get_home_dir(void);
int notify_thread_pipe(int wpipe);
int loglevels_match(int a_loglevel_type, int a_loglevel_value,
int b_loglevel_type, int b_loglevel_value, int loglevel_all_type);
const char *session_get_base_path(const struct ltt_session *session);
-const char *snapshot_output_get_base_path(
- const struct snapshot_output *snapshot_output);
+const char *consumer_output_get_base_path(const struct consumer_output *output);
#endif /* _LTT_UTILS_H */
* a session with per-PID buffers.
*/
uint64_t session_id_per_pid;
- /* Channel trace file path name. */
+ /*
+ * In the case of local streams, this field contains the channel's
+ * output path; a path relative to the session's output path.
+ * e.g. ust/uid/1000/64-bit
+ *
+ * In the case of remote streams, the contents of this field depends
+ * on the version of the relay daemon peer. For 2.11+ peers, the
+ * contents are the same as in the local case. However, for legacy
+ * peers, this contains a path of the form:
+ * /hostname/session_path/ust/uid/1000/64-bit
+ */
char pathname[PATH_MAX];
/* Channel name. */
char name[LTTNG_SYMBOL_NAME_LEN];
#include "index.h"
-/*
- * Create the index file associated with a trace file.
- *
- * Return allocated struct lttng_index_file, NULL on error.
- */
-struct lttng_index_file *lttng_index_file_create(const char *path_name,
- char *stream_name, int uid, int gid,
- uint64_t size, uint64_t count, uint32_t major, uint32_t minor)
-{
- struct lttng_index_file *index_file;
- int ret, fd = -1;
- ssize_t size_ret;
- struct ctf_packet_index_file_hdr hdr;
- char fullpath[PATH_MAX];
- uint32_t element_len = ctf_packet_index_len(major, minor);
-
- index_file = zmalloc(sizeof(*index_file));
- if (!index_file) {
- PERROR("allocating lttng_index_file");
- goto error;
- }
-
- ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR,
- path_name);
- if (ret < 0) {
- PERROR("snprintf index path");
- goto error;
- }
-
- /* Create index directory if necessary. */
- ret = utils_mkdir(fullpath, S_IRWXU | S_IRWXG, uid, gid);
- if (ret < 0) {
- if (errno != EEXIST) {
- PERROR("Index trace directory creation error");
- goto error;
- }
- }
-
- /*
- * For tracefile rotation. We need to unlink the old
- * file if present to synchronize with the tail of the
- * live viewer which could be working on this same file.
- * By doing so, any reference to the old index file
- * stays valid even if we re-create a new file with the
- * same name afterwards.
- */
- ret = utils_unlink_stream_file(fullpath, stream_name, size, count, uid,
- gid, DEFAULT_INDEX_FILE_SUFFIX);
- if (ret < 0 && errno != ENOENT) {
- goto error;
- }
- ret = utils_create_stream_file(fullpath, stream_name, size, count, uid,
- gid, DEFAULT_INDEX_FILE_SUFFIX);
- if (ret < 0) {
- goto error;
- }
- fd = ret;
-
- ctf_packet_index_file_hdr_init(&hdr, major, minor);
- size_ret = lttng_write(fd, &hdr, sizeof(hdr));
- if (size_ret < sizeof(hdr)) {
- PERROR("write index header");
- goto error;
- }
- index_file->fd = fd;
- index_file->major = major;
- index_file->minor = minor;
- index_file->element_len = element_len;
- urcu_ref_init(&index_file->ref);
-
- return index_file;
-
-error:
- if (fd >= 0) {
- int close_ret;
-
- close_ret = close(fd);
- if (close_ret < 0) {
- PERROR("close index fd");
- }
- }
- free(index_file);
- return NULL;
-}
-
struct lttng_index_file *lttng_index_file_create_from_trace_chunk(
struct lttng_trace_chunk *chunk,
const char *channel_path, char *stream_name,
* stays valid even if we re-create a new file with the
* same name afterwards.
*/
- chunk_status = lttng_trace_chunk_unlink_file(chunk,
- index_file_path);
- if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ chunk_status = lttng_trace_chunk_unlink_file(
+ chunk, index_file_path);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK &&
+ !(chunk_status == LTTNG_TRACE_CHUNK_STATUS_ERROR &&
+ errno == ENOENT)) {
goto error;
}
- }
+ }
chunk_status = lttng_trace_chunk_open_file(chunk, index_file_path,
flags, mode, &fd);
* create and open have refcount of 1. Use put to decrement the
* refcount. Destroys when reaching 0. Use "get" to increment refcount.
*/
-struct lttng_index_file *lttng_index_file_create(const char *path_name,
- char *stream_name, int uid, int gid, uint64_t size,
- uint64_t count, uint32_t major, uint32_t minor);
struct lttng_index_file *lttng_index_file_create_from_trace_chunk(
struct lttng_trace_chunk *chunk,
const char *channel_path, char *stream_name,
msg->tracefile_size = htobe64(tracefile_size);
msg->tracefile_count = htobe64(tracefile_count);
- msg->trace_archive_id = htobe64(trace_archive_id);
+ msg->trace_chunk_id = htobe64(trace_archive_id);
/* Send command */
ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) msg, msg_length, 0);
uint32_t pathname_len;
uint64_t tracefile_size;
uint64_t tracefile_count;
- uint64_t trace_archive_id;
+ uint64_t trace_chunk_id;
char names[];
} LTTNG_PACKED;