Observed issue
==============
In the following scenario:
lttng create --snapshot
lttng enable-event -u -a
lttng start
taskset -c 0 <tracepoint producing app>
lttng clear
taskset -c 0 <tracepoint producing app>
lttng snapshot record
lttng destroy
When using the stream-intersection mode, babetrace complains that the
time range for the intersection is invalid since the begin timestamp is
after the end timestamp.
This is caused by the presence of "inactive" streams for which no events
are recorded between the clear action and the recording of the snapshot.
These streams have a begin timestamp roughly equal to the moment when
the snapshot was taken (i.e. the end timestamp). Babeltrace, in
stream-intersection mode, attempts to use the latest beginning timestamp
of all streams as the start of the intersection and the earliest end
timestamp as the end boundary.
Path │File size │Packets │Timestamp: beginning │Timestamp: end │
snapshot-1-
20200622-212617-1/ust/uid/1000/64-bit/channel0_0 │ 4.000 KiB│ 1│2020-06-22 21:26:01.
903685878│2020-06-22 21:26:17.
630456312│
snapshot-1-
20200622-212617-1/ust/uid/1000/64-bit/channel0_1 │ 4.000 KiB│ 1│2020-06-22 21:26:17.
630909310│2020-06-22 21:26:17.
630909310│
snapshot-1-
20200622-212617-1/ust/uid/1000/64-bit/channel0_2 │ 4.000 KiB│ 1│2020-06-22 21:26:17.
631295033│2020-06-22 21:26:17.
631295033│
snapshot-1-
20200622-212617-1/ust/uid/1000/64-bit/channel0_3 │ 4.000 KiB│ 1│2020-06-22 21:26:17.
631673614│2020-06-22 21:26:17.
631673614│
Cause
=====
The packet beginning timestamps of the buffers are initialized on
creation (on the first "start" of a tracing session). When a "clear" is
performed on a session, all open packets are closed and the existing
contents are purged.
If a stream is inactive, it is possible for no packet to be "opened"
until a snapshot of the tracing session is recorded.
Solution
========
A new consumer command, "open channel packets" is added. This command
performs a "flush empty" operation on all streams of a channel.
This command is invoked after a clear (after the tracing is re-started)
and on start. This ensures that streams are opened as soon as possible
after a clear, a rotation, or a session start.
Known drawbacks
===============
In the case of an inactive stream, this results an extra empty packet at
the beginning of the inactive streams (typically 4kB) in the snapshots.
In the case of an active stream, this change will cause the first packet
to be empty or contain few events. If the stream is active enough to
wrap-around, that empty packet will simply be overwritten.
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I19b5c630fa8bddfb13c3c10f86c6cc9dc4990b08
goto end;
}
}
+
+ /*
+ * Open a packet in every stream of the session to ensure that
+ * viewers can correctly identify the boundaries of the periods
+ * during which tracing was active for this session.
+ */
+ ret = session_open_packets(session);
+ if (ret != LTTNG_OK) {
+ goto end;
+ }
}
ret = LTTNG_OK;
end:
}
}
+ /*
+ * Open a packet in every stream of the session to ensure that viewers
+ * can correctly identify the boundaries of the periods during which
+ * tracing was active for this session.
+ */
+ ret = session_open_packets(session);
+ if (ret != LTTNG_OK) {
+ goto error;
+ }
+
/*
* Clear the flag that indicates that a rotation was done while the
* session was stopped.
*
* Return 0 on success else a negative value on error.
*/
-int consumer_socket_send(struct consumer_socket *socket, void *msg, size_t len)
+int consumer_socket_send(
+ struct consumer_socket *socket, const void *msg, size_t len)
{
int fd;
ssize_t size;
* The consumer socket lock must be held by the caller.
*/
int consumer_send_msg(struct consumer_socket *sock,
- struct lttcomm_consumer_msg *msg)
+ const struct lttcomm_consumer_msg *msg)
{
int ret;
return ret;
}
+int consumer_open_channel_packets(struct consumer_socket *socket, uint64_t key)
+{
+ int ret;
+ const struct lttcomm_consumer_msg msg = {
+ .cmd_type = LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS,
+ .u.open_channel_packets.key = key,
+ };
+
+ assert(socket);
+
+ DBG("Consumer open channel packets: channel key = %" PRIu64, key);
+
+ health_code_update();
+
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_send_msg(socket, &msg);
+ pthread_mutex_unlock(socket->lock);
+ if (ret < 0) {
+ goto error_socket;
+ }
+
+error_socket:
+ health_code_update();
+ return ret;
+}
+
int consumer_clear_channel(struct consumer_socket *socket, uint64_t key)
{
int ret;
int consumer_copy_sockets(struct consumer_output *dst,
struct consumer_output *src);
void consumer_destroy_output_sockets(struct consumer_output *obj);
-int consumer_socket_send(struct consumer_socket *socket, void *msg,
+int consumer_socket_send(struct consumer_socket *socket, const void *msg,
size_t len);
int consumer_socket_recv(struct consumer_socket *socket, void *msg,
size_t len);
int consumer_send_fds(struct consumer_socket *sock, const int *fds,
size_t nb_fd);
int consumer_send_msg(struct consumer_socket *sock,
- struct lttcomm_consumer_msg *msg);
+ const struct lttcomm_consumer_msg *msg);
int consumer_send_stream(struct consumer_socket *sock,
struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
const int *fds, size_t nb_fd);
uint64_t relayd_id, uint64_t session_id,
struct lttng_trace_chunk *chunk,
enum consumer_trace_chunk_exists_status *result);
+int consumer_open_channel_packets(struct consumer_socket *socket, uint64_t key);
char *setup_channel_trace_path(struct consumer_output *consumer,
const char *session_path, size_t *consumer_path_offset);
return ret;
}
+/*
+ * This function skips the metadata channel as the begin/end timestamps of a
+ * metadata packet are useless.
+ *
+ * Moreover, opening a packet after a "clear" will cause problems for live
+ * sessions as it will introduce padding that was not part of the first trace
+ * chunk. The relay daemon expects the content of the metadata stream of
+ * successive metadata trace chunks to be strict supersets of one another.
+ *
+ * For example, flushing a packet at the beginning of the metadata stream of
+ * a trace chunk resulting from a "clear" session command will cause the
+ * size of the metadata stream of the new trace chunk to not match the size of
+ * the metadata stream of the original chunk. This will confuse the relay
+ * daemon as the same "offset" in a metadata stream will no longer point
+ * to the same content.
+ */
+static
+enum lttng_error_code session_kernel_open_packets(struct ltt_session *session)
+{
+ enum lttng_error_code ret = LTTNG_OK;
+ struct consumer_socket *socket;
+ struct lttng_ht_iter iter;
+ struct cds_lfht_node *node;
+ struct ltt_kernel_channel *chan;
+
+ rcu_read_lock();
+
+ cds_lfht_first(session->kernel_session->consumer->socks->ht, &iter.iter);
+ node = cds_lfht_iter_get_node(&iter.iter);
+ socket = container_of(node, typeof(*socket), node.node);
+
+ cds_list_for_each_entry(chan,
+ &session->kernel_session->channel_list.head, list) {
+ int open_ret;
+
+ DBG("Open packet of kernel channel: channel key = %" PRIu64
+ ", session name = %s, session_id = %" PRIu64,
+ chan->key, session->name, session->id);
+
+ open_ret = consumer_open_channel_packets(socket, chan->key);
+ if (open_ret < 0) {
+ /* General error (no known error expected). */
+ ret = LTTNG_ERR_UNK;
+ goto end;
+ }
+ }
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
+enum lttng_error_code session_open_packets(struct ltt_session *session)
+{
+ enum lttng_error_code ret = LTTNG_OK;
+
+ DBG("Opening packets of session channels: session name = %s, session id = %" PRIu64,
+ session->name, session->id);
+
+ if (session->ust_session) {
+ ret = ust_app_open_packets(session);
+ if (ret != LTTNG_OK) {
+ goto end;
+ }
+ }
+
+ if (session->kernel_session) {
+ ret = session_kernel_open_packets(session);
+ if (ret != LTTNG_OK) {
+ goto end;
+ }
+ }
+
+end:
+ return ret;
+}
+
/*
* Set a session's current trace chunk.
*
enum lttng_trace_chunk_command_type close_command,
char *path);
+/* Open a packet in all channels of a given session. */
+enum lttng_error_code session_open_packets(struct ltt_session *session);
+
bool session_output_supports_trace_chunks(const struct ltt_session *session);
#endif /* _LTT_SESSION_H */
rcu_read_unlock();
return cmd_ret;
}
+
+/*
+ * This function skips the metadata channel as the begin/end timestamps of a
+ * metadata packet are useless.
+ *
+ * Moreover, opening a packet after a "clear" will cause problems for live
+ * sessions as it will introduce padding that was not part of the first trace
+ * chunk. The relay daemon expects the content of the metadata stream of
+ * successive metadata trace chunks to be strict supersets of one another.
+ *
+ * For example, flushing a packet at the beginning of the metadata stream of
+ * a trace chunk resulting from a "clear" session command will cause the
+ * size of the metadata stream of the new trace chunk to not match the size of
+ * the metadata stream of the original chunk. This will confuse the relay
+ * daemon as the same "offset" in a metadata stream will no longer point
+ * to the same content.
+ */
+enum lttng_error_code ust_app_open_packets(struct ltt_session *session)
+{
+ enum lttng_error_code ret = LTTNG_OK;
+ struct lttng_ht_iter iter;
+ struct ltt_ust_session *usess = session->ust_session;
+
+ assert(usess);
+
+ rcu_read_lock();
+
+ switch (usess->buffer_type) {
+ case LTTNG_BUFFER_PER_UID:
+ {
+ struct buffer_reg_uid *reg;
+
+ cds_list_for_each_entry (
+ reg, &usess->buffer_reg_uid_list, lnode) {
+ struct buffer_reg_channel *reg_chan;
+ struct consumer_socket *socket;
+
+ socket = consumer_find_socket_by_bitness(
+ reg->bits_per_long, usess->consumer);
+ if (!socket) {
+ ret = LTTNG_ERR_FATAL;
+ goto error;
+ }
+
+ cds_lfht_for_each_entry(reg->registry->channels->ht,
+ &iter.iter, reg_chan, node.node) {
+ const int open_ret =
+ consumer_open_channel_packets(
+ socket,
+ reg_chan->consumer_key);
+
+ if (open_ret < 0) {
+ ret = LTTNG_ERR_UNK;
+ goto error;
+ }
+ }
+ }
+ break;
+ }
+ case LTTNG_BUFFER_PER_PID:
+ {
+ struct ust_app *app;
+
+ cds_lfht_for_each_entry (
+ ust_app_ht->ht, &iter.iter, app, pid_n.node) {
+ struct consumer_socket *socket;
+ struct lttng_ht_iter chan_iter;
+ struct ust_app_channel *ua_chan;
+ struct ust_app_session *ua_sess;
+ struct ust_registry_session *registry;
+
+ ua_sess = lookup_session_by_app(usess, app);
+ if (!ua_sess) {
+ /* Session not associated with this app. */
+ continue;
+ }
+
+ /* Get the right consumer socket for the application. */
+ socket = consumer_find_socket_by_bitness(
+ app->bits_per_long, usess->consumer);
+ if (!socket) {
+ ret = LTTNG_ERR_FATAL;
+ goto error;
+ }
+
+ registry = get_session_registry(ua_sess);
+ if (!registry) {
+ DBG("Application session is being torn down. Skip application.");
+ continue;
+ }
+
+ cds_lfht_for_each_entry(ua_sess->channels->ht,
+ &chan_iter.iter, ua_chan, node.node) {
+ const int open_ret =
+ consumer_open_channel_packets(
+ socket,
+ ua_chan->key);
+
+ if (open_ret < 0) {
+ /*
+ * Per-PID buffer and application going
+ * away.
+ */
+ if (ret == -LTTNG_ERR_CHAN_NOT_FOUND) {
+ continue;
+ }
+
+ ret = LTTNG_ERR_UNK;
+ goto error;
+ }
+ }
+ }
+ break;
+ }
+ default:
+ abort();
+ break;
+ }
+
+error:
+ rcu_read_unlock();
+ return ret;
+}
int ust_app_release_object(struct ust_app *app,
struct lttng_ust_object_data *data);
enum lttng_error_code ust_app_clear_session(struct ltt_session *session);
+enum lttng_error_code ust_app_open_packets(struct ltt_session *session);
static inline
int ust_app_supported(void)
return 0;
}
+static inline
+enum lttng_error_code ust_app_open_packets(struct ltt_session *session)
+{
+ return 0;
+}
+
#endif /* HAVE_LIBLTTNG_UST_CTL */
#endif /* _LTT_UST_APP_H */
status = produced_pos_before != produced_pos_after ?
OPEN_PACKET_STATUS_OPENED :
OPEN_PACKET_STATUS_NO_SPACE;
+ if (status == OPEN_PACKET_STATUS_OPENED) {
+ stream->opened_packet_in_current_trace_chunk = true;
+ }
end:
return status;
}
", channel name = %s, session id = %" PRIu64,
stream->key, stream->chan->name,
stream->chan->session_id);
- stream->opened_packet_in_current_trace_chunk =
- true;
break;
case OPEN_PACKET_STATUS_NO_SPACE:
/*
* Can't open a packet as there is no space left.
* This means that new events were produced, resulting
- * in a packet being opened, which is what we want
+ * in a packet being opened, which is what we wanted
* anyhow.
*/
DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64
default:
abort();
}
-
- stream->opened_packet_in_current_trace_chunk = true;
}
sleep_stream:
", channel name = %s, session id = %" PRIu64,
stream->key, stream->chan->name,
stream->chan->session_id);
- stream->opened_packet_in_current_trace_chunk =
- true;
break;
case OPEN_PACKET_STATUS_NO_SPACE:
/*
end:
return ret;
}
+
+enum lttcomm_return_code lttng_consumer_open_channel_packets(
+ struct lttng_consumer_channel *channel)
+{
+ struct lttng_consumer_stream *stream;
+ enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
+
+ if (channel->metadata_stream) {
+ ERR("Open channel packets command attempted on a metadata channel");
+ ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
+ goto end;
+ }
+
+ rcu_read_lock();
+ cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ enum open_packet_status status;
+
+ pthread_mutex_lock(&stream->lock);
+ if (cds_lfht_is_node_deleted(&stream->node.node)) {
+ goto next;
+ }
+
+ status = open_packet(stream);
+ switch (status) {
+ case OPEN_PACKET_STATUS_OPENED:
+ DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ stream->opened_packet_in_current_trace_chunk = true;
+ break;
+ case OPEN_PACKET_STATUS_NO_SPACE:
+ DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ break;
+ case OPEN_PACKET_STATUS_ERROR:
+ /*
+ * Only unexpected internal errors can lead to this
+ * failing. Report an unknown error.
+ */
+ ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel id = %" PRIu64
+ ", channel name = %s"
+ ", session id = %" PRIu64,
+ stream->key, channel->key,
+ channel->name, channel->session_id);
+ ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
+ goto error_unlock;
+ default:
+ abort();
+ }
+
+ next:
+ pthread_mutex_unlock(&stream->lock);
+ }
+
+end_rcu_unlock:
+ rcu_read_unlock();
+end:
+ return ret;
+
+error_unlock:
+ pthread_mutex_unlock(&stream->lock);
+ goto end_rcu_unlock;
+}
LTTNG_CONSUMER_CLOSE_TRACE_CHUNK,
LTTNG_CONSUMER_TRACE_CHUNK_EXISTS,
LTTNG_CONSUMER_CLEAR_CHANNEL,
+ LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS,
};
enum lttng_consumer_type {
struct lttng_consumer_local_data *ctx,
const lttng_uuid sessiond_uuid);
int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel);
+enum lttcomm_return_code lttng_consumer_open_channel_packets(
+ struct lttng_consumer_channel *channel);
#endif /* LIB_CONSUMER_H */
msg.u.trace_chunk_exists.chunk_id);
goto end_msg_sessiond;
}
+ case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
+ {
+ const uint64_t key = msg.u.open_channel_packets.key;
+ struct lttng_consumer_channel *channel =
+ consumer_find_channel(key);
+
+ if (channel) {
+ pthread_mutex_lock(&channel->lock);
+ ret_code = lttng_consumer_open_channel_packets(channel);
+ pthread_mutex_unlock(&channel->lock);
+ } else {
+ WARN("Channel %" PRIu64 " not found", key);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ health_code_update();
+ goto end_msg_sessiond;
+ }
default:
goto end_nosignal;
}
LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE,/* Trace chunk exists on relay daemon. */
LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK, /* Unknown trace chunk. */
LTTCOMM_CONSUMERD_RELAYD_CLEAR_DISALLOWED, /* Relayd does not accept clear command. */
+ LTTCOMM_CONSUMERD_UNKNOWN_ERROR, /* Unknown error. */
/* MUST be last element */
LTTCOMM_NR, /* Last element */
struct {
uint64_t key;
} LTTNG_PACKED clear_channel;
+ struct {
+ uint64_t key;
+ } LTTNG_PACKED open_channel_packets;
} u;
} LTTNG_PACKED;
msg.u.trace_chunk_exists.chunk_id);
goto end_msg_sessiond;
}
+ case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
+ {
+ const uint64_t key = msg.u.open_channel_packets.key;
+ struct lttng_consumer_channel *channel =
+ consumer_find_channel(key);
+
+ if (channel) {
+ pthread_mutex_lock(&channel->lock);
+ ret_code = lttng_consumer_open_channel_packets(channel);
+ pthread_mutex_unlock(&channel->lock);
+ } else {
+ /*
+ * The channel could have disappeared in per-pid
+ * buffering mode.
+ */
+ DBG("Channel %" PRIu64 " not found", key);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ health_code_update();
+ goto end_msg_sessiond;
+ }
default:
break;
}