From: Jérémie Galarneau Date: Fri, 26 Jun 2020 22:40:12 +0000 (-0400) Subject: Fix: stream intersection fails on snapshot of cleared session X-Git-Tag: v2.12.2~20 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=1223361a0969c5d67dadc23dc3199baa1cf89b03;p=lttng-tools.git Fix: stream intersection fails on snapshot of cleared session Observed issue ============== In the following scenario: lttng create --snapshot lttng enable-event -u -a lttng start taskset -c 0 lttng clear taskset -c 0 lttng snapshot record lttng destroy When using the stream-intersection mode, babetrace complains that the time range for the intersection is invalid since the begin timestamp is after the end timestamp. This is caused by the presence of "inactive" streams for which no events are recorded between the clear action and the recording of the snapshot. These streams have a begin timestamp roughly equal to the moment when the snapshot was taken (i.e. the end timestamp). Babeltrace, in stream-intersection mode, attempts to use the latest beginning timestamp of all streams as the start of the intersection and the earliest end timestamp as the end boundary. Path │File size │Packets │Timestamp: beginning │Timestamp: end │ snapshot-1-20200622-212617-1/ust/uid/1000/64-bit/channel0_0 │ 4.000 KiB│ 1│2020-06-22 21:26:01.903685878│2020-06-22 21:26:17.630456312│ snapshot-1-20200622-212617-1/ust/uid/1000/64-bit/channel0_1 │ 4.000 KiB│ 1│2020-06-22 21:26:17.630909310│2020-06-22 21:26:17.630909310│ snapshot-1-20200622-212617-1/ust/uid/1000/64-bit/channel0_2 │ 4.000 KiB│ 1│2020-06-22 21:26:17.631295033│2020-06-22 21:26:17.631295033│ snapshot-1-20200622-212617-1/ust/uid/1000/64-bit/channel0_3 │ 4.000 KiB│ 1│2020-06-22 21:26:17.631673614│2020-06-22 21:26:17.631673614│ Cause ===== The packet beginning timestamps of the buffers are initialized on creation (on the first "start" of a tracing session). When a "clear" is performed on a session, all open packets are closed and the existing contents are purged. If a stream is inactive, it is possible for no packet to be "opened" until a snapshot of the tracing session is recorded. Solution ======== A new consumer command, "open channel packets" is added. This command performs a "flush empty" operation on all streams of a channel. This command is invoked after a clear (after the tracing is re-started) and on start. This ensures that streams are opened as soon as possible after a clear, a rotation, or a session start. Known drawbacks =============== In the case of an inactive stream, this results an extra empty packet at the beginning of the inactive streams (typically 4kB) in the snapshots. In the case of an active stream, this change will cause the first packet to be empty or contain few events. If the stream is active enough to wrap-around, that empty packet will simply be overwritten. Signed-off-by: Jérémie Galarneau Change-Id: I19b5c630fa8bddfb13c3c10f86c6cc9dc4990b08 --- diff --git a/src/bin/lttng-sessiond/clear.c b/src/bin/lttng-sessiond/clear.c index 03abbd8e1..3ae70ea2f 100644 --- a/src/bin/lttng-sessiond/clear.c +++ b/src/bin/lttng-sessiond/clear.c @@ -194,6 +194,16 @@ int cmd_clear_session(struct ltt_session *session, int *sock_fd) goto end; } } + + /* + * Open a packet in every stream of the session to ensure that + * viewers can correctly identify the boundaries of the periods + * during which tracing was active for this session. + */ + ret = session_open_packets(session); + if (ret != LTTNG_OK) { + goto end; + } } ret = LTTNG_OK; end: diff --git a/src/bin/lttng-sessiond/cmd.c b/src/bin/lttng-sessiond/cmd.c index e45d4423a..8f2afc326 100644 --- a/src/bin/lttng-sessiond/cmd.c +++ b/src/bin/lttng-sessiond/cmd.c @@ -2791,6 +2791,16 @@ int cmd_start_trace(struct ltt_session *session) } } + /* + * Open a packet in every stream of the session to ensure that viewers + * can correctly identify the boundaries of the periods during which + * tracing was active for this session. + */ + ret = session_open_packets(session); + if (ret != LTTNG_OK) { + goto error; + } + /* * Clear the flag that indicates that a rotation was done while the * session was stopped. diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index d282f59c9..552ff95cb 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -93,7 +93,8 @@ error: * * Return 0 on success else a negative value on error. */ -int consumer_socket_send(struct consumer_socket *socket, void *msg, size_t len) +int consumer_socket_send( + struct consumer_socket *socket, const void *msg, size_t len) { int fd; ssize_t size; @@ -861,7 +862,7 @@ error: * The consumer socket lock must be held by the caller. */ int consumer_send_msg(struct consumer_socket *sock, - struct lttcomm_consumer_msg *msg) + const struct lttcomm_consumer_msg *msg) { int ret; @@ -1721,6 +1722,32 @@ error: return ret; } +int consumer_open_channel_packets(struct consumer_socket *socket, uint64_t key) +{ + int ret; + const struct lttcomm_consumer_msg msg = { + .cmd_type = LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS, + .u.open_channel_packets.key = key, + }; + + assert(socket); + + DBG("Consumer open channel packets: channel key = %" PRIu64, key); + + health_code_update(); + + pthread_mutex_lock(socket->lock); + ret = consumer_send_msg(socket, &msg); + pthread_mutex_unlock(socket->lock); + if (ret < 0) { + goto error_socket; + } + +error_socket: + health_code_update(); + return ret; +} + int consumer_clear_channel(struct consumer_socket *socket, uint64_t key) { int ret; diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index 54c962a86..eaa04d302 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -187,7 +187,7 @@ void consumer_destroy_socket(struct consumer_socket *sock); int consumer_copy_sockets(struct consumer_output *dst, struct consumer_output *src); void consumer_destroy_output_sockets(struct consumer_output *obj); -int consumer_socket_send(struct consumer_socket *socket, void *msg, +int consumer_socket_send(struct consumer_socket *socket, const void *msg, size_t len); int consumer_socket_recv(struct consumer_socket *socket, void *msg, size_t len); @@ -202,7 +202,7 @@ int consumer_set_network_uri(const struct ltt_session *session, int consumer_send_fds(struct consumer_socket *sock, const int *fds, size_t nb_fd); int consumer_send_msg(struct consumer_socket *sock, - struct lttcomm_consumer_msg *msg); + const struct lttcomm_consumer_msg *msg); int consumer_send_stream(struct consumer_socket *sock, struct consumer_output *dst, struct lttcomm_consumer_msg *msg, const int *fds, size_t nb_fd); @@ -320,6 +320,7 @@ int consumer_trace_chunk_exists(struct consumer_socket *socket, uint64_t relayd_id, uint64_t session_id, struct lttng_trace_chunk *chunk, enum consumer_trace_chunk_exists_status *result); +int consumer_open_channel_packets(struct consumer_socket *socket, uint64_t key); char *setup_channel_trace_path(struct consumer_output *consumer, const char *session_path, size_t *consumer_path_offset); diff --git a/src/bin/lttng-sessiond/session.c b/src/bin/lttng-sessiond/session.c index 03bec43fd..c3a65aafb 100644 --- a/src/bin/lttng-sessiond/session.c +++ b/src/bin/lttng-sessiond/session.c @@ -792,6 +792,83 @@ end: return ret; } +/* + * This function skips the metadata channel as the begin/end timestamps of a + * metadata packet are useless. + * + * Moreover, opening a packet after a "clear" will cause problems for live + * sessions as it will introduce padding that was not part of the first trace + * chunk. The relay daemon expects the content of the metadata stream of + * successive metadata trace chunks to be strict supersets of one another. + * + * For example, flushing a packet at the beginning of the metadata stream of + * a trace chunk resulting from a "clear" session command will cause the + * size of the metadata stream of the new trace chunk to not match the size of + * the metadata stream of the original chunk. This will confuse the relay + * daemon as the same "offset" in a metadata stream will no longer point + * to the same content. + */ +static +enum lttng_error_code session_kernel_open_packets(struct ltt_session *session) +{ + enum lttng_error_code ret = LTTNG_OK; + struct consumer_socket *socket; + struct lttng_ht_iter iter; + struct cds_lfht_node *node; + struct ltt_kernel_channel *chan; + + rcu_read_lock(); + + cds_lfht_first(session->kernel_session->consumer->socks->ht, &iter.iter); + node = cds_lfht_iter_get_node(&iter.iter); + socket = container_of(node, typeof(*socket), node.node); + + cds_list_for_each_entry(chan, + &session->kernel_session->channel_list.head, list) { + int open_ret; + + DBG("Open packet of kernel channel: channel key = %" PRIu64 + ", session name = %s, session_id = %" PRIu64, + chan->key, session->name, session->id); + + open_ret = consumer_open_channel_packets(socket, chan->key); + if (open_ret < 0) { + /* General error (no known error expected). */ + ret = LTTNG_ERR_UNK; + goto end; + } + } + +end: + rcu_read_unlock(); + return ret; +} + +enum lttng_error_code session_open_packets(struct ltt_session *session) +{ + enum lttng_error_code ret = LTTNG_OK; + + DBG("Opening packets of session channels: session name = %s, session id = %" PRIu64, + session->name, session->id); + + if (session->ust_session) { + ret = ust_app_open_packets(session); + if (ret != LTTNG_OK) { + goto end; + } + } + + if (session->kernel_session) { + ret = session_kernel_open_packets(session); + if (ret != LTTNG_OK) { + goto end; + } + } + +end: + return ret; +} + /* * Set a session's current trace chunk. * diff --git a/src/bin/lttng-sessiond/session.h b/src/bin/lttng-sessiond/session.h index 1df70a474..34e51fe5a 100644 --- a/src/bin/lttng-sessiond/session.h +++ b/src/bin/lttng-sessiond/session.h @@ -273,6 +273,9 @@ int session_close_trace_chunk(struct ltt_session *session, enum lttng_trace_chunk_command_type close_command, char *path); +/* Open a packet in all channels of a given session. */ +enum lttng_error_code session_open_packets(struct ltt_session *session); + bool session_output_supports_trace_chunks(const struct ltt_session *session); #endif /* _LTT_SESSION_H */ diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c index 8c314ec1c..befbfab32 100644 --- a/src/bin/lttng-sessiond/ust-app.c +++ b/src/bin/lttng-sessiond/ust-app.c @@ -6644,3 +6644,126 @@ end: rcu_read_unlock(); return cmd_ret; } + +/* + * This function skips the metadata channel as the begin/end timestamps of a + * metadata packet are useless. + * + * Moreover, opening a packet after a "clear" will cause problems for live + * sessions as it will introduce padding that was not part of the first trace + * chunk. The relay daemon expects the content of the metadata stream of + * successive metadata trace chunks to be strict supersets of one another. + * + * For example, flushing a packet at the beginning of the metadata stream of + * a trace chunk resulting from a "clear" session command will cause the + * size of the metadata stream of the new trace chunk to not match the size of + * the metadata stream of the original chunk. This will confuse the relay + * daemon as the same "offset" in a metadata stream will no longer point + * to the same content. + */ +enum lttng_error_code ust_app_open_packets(struct ltt_session *session) +{ + enum lttng_error_code ret = LTTNG_OK; + struct lttng_ht_iter iter; + struct ltt_ust_session *usess = session->ust_session; + + assert(usess); + + rcu_read_lock(); + + switch (usess->buffer_type) { + case LTTNG_BUFFER_PER_UID: + { + struct buffer_reg_uid *reg; + + cds_list_for_each_entry ( + reg, &usess->buffer_reg_uid_list, lnode) { + struct buffer_reg_channel *reg_chan; + struct consumer_socket *socket; + + socket = consumer_find_socket_by_bitness( + reg->bits_per_long, usess->consumer); + if (!socket) { + ret = LTTNG_ERR_FATAL; + goto error; + } + + cds_lfht_for_each_entry(reg->registry->channels->ht, + &iter.iter, reg_chan, node.node) { + const int open_ret = + consumer_open_channel_packets( + socket, + reg_chan->consumer_key); + + if (open_ret < 0) { + ret = LTTNG_ERR_UNK; + goto error; + } + } + } + break; + } + case LTTNG_BUFFER_PER_PID: + { + struct ust_app *app; + + cds_lfht_for_each_entry ( + ust_app_ht->ht, &iter.iter, app, pid_n.node) { + struct consumer_socket *socket; + struct lttng_ht_iter chan_iter; + struct ust_app_channel *ua_chan; + struct ust_app_session *ua_sess; + struct ust_registry_session *registry; + + ua_sess = lookup_session_by_app(usess, app); + if (!ua_sess) { + /* Session not associated with this app. */ + continue; + } + + /* Get the right consumer socket for the application. */ + socket = consumer_find_socket_by_bitness( + app->bits_per_long, usess->consumer); + if (!socket) { + ret = LTTNG_ERR_FATAL; + goto error; + } + + registry = get_session_registry(ua_sess); + if (!registry) { + DBG("Application session is being torn down. Skip application."); + continue; + } + + cds_lfht_for_each_entry(ua_sess->channels->ht, + &chan_iter.iter, ua_chan, node.node) { + const int open_ret = + consumer_open_channel_packets( + socket, + ua_chan->key); + + if (open_ret < 0) { + /* + * Per-PID buffer and application going + * away. + */ + if (ret == -LTTNG_ERR_CHAN_NOT_FOUND) { + continue; + } + + ret = LTTNG_ERR_UNK; + goto error; + } + } + } + break; + } + default: + abort(); + break; + } + +error: + rcu_read_unlock(); + return ret; +} diff --git a/src/bin/lttng-sessiond/ust-app.h b/src/bin/lttng-sessiond/ust-app.h index 6f3588a54..164bc2c6f 100644 --- a/src/bin/lttng-sessiond/ust-app.h +++ b/src/bin/lttng-sessiond/ust-app.h @@ -354,6 +354,7 @@ enum lttng_error_code ust_app_create_channel_subdirectories( int ust_app_release_object(struct ust_app *app, struct lttng_ust_object_data *data); enum lttng_error_code ust_app_clear_session(struct ltt_session *session); +enum lttng_error_code ust_app_open_packets(struct ltt_session *session); static inline int ust_app_supported(void) @@ -600,6 +601,12 @@ enum lttng_error_code ust_app_clear_session(struct ltt_session *session) return 0; } +static inline +enum lttng_error_code ust_app_open_packets(struct ltt_session *session) +{ + return 0; +} + #endif /* HAVE_LIBLTTNG_UST_CTL */ #endif /* _LTT_UST_APP_H */ diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 6505490cd..be440e694 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -3425,6 +3425,9 @@ static enum open_packet_status open_packet(struct lttng_consumer_stream *stream) status = produced_pos_before != produced_pos_after ? OPEN_PACKET_STATUS_OPENED : OPEN_PACKET_STATUS_NO_SPACE; + if (status == OPEN_PACKET_STATUS_OPENED) { + stream->opened_packet_in_current_trace_chunk = true; + } end: return status; } @@ -3565,14 +3568,12 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, ", channel name = %s, session id = %" PRIu64, stream->key, stream->chan->name, stream->chan->session_id); - stream->opened_packet_in_current_trace_chunk = - true; break; case OPEN_PACKET_STATUS_NO_SPACE: /* * Can't open a packet as there is no space left. * This means that new events were produced, resulting - * in a packet being opened, which is what we want + * in a packet being opened, which is what we wanted * anyhow. */ DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64 @@ -3588,8 +3589,6 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, default: abort(); } - - stream->opened_packet_in_current_trace_chunk = true; } sleep_stream: @@ -4294,8 +4293,6 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, ", channel name = %s, session id = %" PRIu64, stream->key, stream->chan->name, stream->chan->session_id); - stream->opened_packet_in_current_trace_chunk = - true; break; case OPEN_PACKET_STATUS_NO_SPACE: /* @@ -5172,3 +5169,70 @@ int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel) end: return ret; } + +enum lttcomm_return_code lttng_consumer_open_channel_packets( + struct lttng_consumer_channel *channel) +{ + struct lttng_consumer_stream *stream; + enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS; + + if (channel->metadata_stream) { + ERR("Open channel packets command attempted on a metadata channel"); + ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS; + goto end; + } + + rcu_read_lock(); + cds_list_for_each_entry(stream, &channel->streams.head, send_node) { + enum open_packet_status status; + + pthread_mutex_lock(&stream->lock); + if (cds_lfht_is_node_deleted(&stream->node.node)) { + goto next; + } + + status = open_packet(stream); + switch (status) { + case OPEN_PACKET_STATUS_OPENED: + DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + stream->opened_packet_in_current_trace_chunk = true; + break; + case OPEN_PACKET_STATUS_NO_SPACE: + DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + break; + case OPEN_PACKET_STATUS_ERROR: + /* + * Only unexpected internal errors can lead to this + * failing. Report an unknown error. + */ + ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64 + ", channel id = %" PRIu64 + ", channel name = %s" + ", session id = %" PRIu64, + stream->key, channel->key, + channel->name, channel->session_id); + ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR; + goto error_unlock; + default: + abort(); + } + + next: + pthread_mutex_unlock(&stream->lock); + } + +end_rcu_unlock: + rcu_read_unlock(); +end: + return ret; + +error_unlock: + pthread_mutex_unlock(&stream->lock); + goto end_rcu_unlock; +} diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 6dedb7525..73189660c 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -64,6 +64,7 @@ enum lttng_consumer_command { LTTNG_CONSUMER_CLOSE_TRACE_CHUNK, LTTNG_CONSUMER_TRACE_CHUNK_EXISTS, LTTNG_CONSUMER_CLEAR_CHANNEL, + LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS, }; enum lttng_consumer_type { @@ -1048,5 +1049,7 @@ enum lttcomm_return_code lttng_consumer_init_command( struct lttng_consumer_local_data *ctx, const lttng_uuid sessiond_uuid); int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel); +enum lttcomm_return_code lttng_consumer_open_channel_packets( + struct lttng_consumer_channel *channel); #endif /* LIB_CONSUMER_H */ diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index eeec3a65e..de1a14c21 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1311,6 +1311,24 @@ error_rotate_channel: msg.u.trace_chunk_exists.chunk_id); goto end_msg_sessiond; } + case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS: + { + const uint64_t key = msg.u.open_channel_packets.key; + struct lttng_consumer_channel *channel = + consumer_find_channel(key); + + if (channel) { + pthread_mutex_lock(&channel->lock); + ret_code = lttng_consumer_open_channel_packets(channel); + pthread_mutex_unlock(&channel->lock); + } else { + WARN("Channel %" PRIu64 " not found", key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + goto end_msg_sessiond; + } default: goto end_nosignal; } diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index 0b4c05fb7..cca48d6ab 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -179,6 +179,7 @@ enum lttcomm_return_code { LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE,/* Trace chunk exists on relay daemon. */ LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK, /* Unknown trace chunk. */ LTTCOMM_CONSUMERD_RELAYD_CLEAR_DISALLOWED, /* Relayd does not accept clear command. */ + LTTCOMM_CONSUMERD_UNKNOWN_ERROR, /* Unknown error. */ /* MUST be last element */ LTTCOMM_NR, /* Last element */ @@ -705,6 +706,9 @@ struct lttcomm_consumer_msg { struct { uint64_t key; } LTTNG_PACKED clear_channel; + struct { + uint64_t key; + } LTTNG_PACKED open_channel_packets; } u; } LTTNG_PACKED; diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index ce2e17f74..209931f56 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -2181,6 +2181,28 @@ end_rotate_channel_nosignal: msg.u.trace_chunk_exists.chunk_id); goto end_msg_sessiond; } + case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS: + { + const uint64_t key = msg.u.open_channel_packets.key; + struct lttng_consumer_channel *channel = + consumer_find_channel(key); + + if (channel) { + pthread_mutex_lock(&channel->lock); + ret_code = lttng_consumer_open_channel_packets(channel); + pthread_mutex_unlock(&channel->lock); + } else { + /* + * The channel could have disappeared in per-pid + * buffering mode. + */ + DBG("Channel %" PRIu64 " not found", key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + goto end_msg_sessiond; + } default: break; }