From: Julien Desfossez Date: Thu, 14 Dec 2017 18:38:20 +0000 (-0500) Subject: Consumer rotate stream X-Git-Tag: v2.11.0-rc1~350 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=d73bf3d793ee0b0c5b56cb47cb50c27d1789d3bd;p=lttng-tools.git Consumer rotate stream Perform the action of rotating a stream locally or send the command to do it on the relayd. Rotating a stream file consists of: - closing the current tracefile and index, - opening a new tracefile and index in the new chunk folder, - resetting the stream rotation flags, - updating the counter of streams waiting for a rotation in a channel, If the stream is a metadata stream, we also need to trigger the action to re-dump the content of the metadata cache after the rotation has been performed. The caller of lttng_consumer_rotate_stream() always calls consumer_post_rotation() after having released the stream lock to update the counter of streams waiting for a rotation in a channel and notifying the session daemon if this counter reaches 0. Signed-off-by: Julien Desfossez Signed-off-by: Jérémie Galarneau --- diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 07de67578..8e6b63b01 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -2276,6 +2276,73 @@ static void validate_endpoint_status_metadata_stream( rcu_read_unlock(); } +static +int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx, + uint64_t key) +{ + ssize_t ret; + + do { + ret = write(ctx->channel_rotate_pipe, &key, sizeof(key)); + } while (ret == -1 && errno == EINTR); + if (ret == -1) { + PERROR("Failed to write to the channel rotation pipe"); + } else { + DBG("Sent channel rotation notification for channel key %" + PRIu64, key); + ret = 0; + } + + return (int) ret; +} + +/* + * Perform operations that need to be done after a stream has + * rotated and released the stream lock. + * + * Multiple rotations cannot occur simultaneously, so we know the state of the + * "rotated" stream flag cannot change. + * + * This MUST be called WITHOUT the stream lock held. + */ +static +int consumer_post_rotation(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + int ret = 0; + + pthread_mutex_lock(&stream->chan->lock); + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + /* + * The ust_metadata_pushed counter has been reset to 0, so now + * we can wakeup the metadata thread so it dumps the metadata + * cache to the new file. + */ + if (stream->metadata_flag) { + consumer_metadata_wakeup_pipe(stream->chan); + } + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + + if (--stream->chan->nr_stream_rotate_pending == 0) { + DBG("Rotation of channel \"%s\" completed, notifying the session daemon", + stream->chan->name); + ret = rotate_notify_sessiond(ctx, stream->chan->key); + } + assert(stream->chan->nr_stream_rotate_pending >= 0); + pthread_mutex_unlock(&stream->chan->lock); + + return ret; +} + /* * Thread polls on metadata file descriptor and write them on disk or on the * network. @@ -3836,6 +3903,167 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, return start_pos; } +/* + * Reset the state for a stream after a rotation occurred. + */ +void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream) +{ + stream->rotate_position = 0; + stream->rotate_ready = false; +} + +/* + * Perform the rotation a local stream file. + */ +int rotate_local_stream(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream) +{ + int ret; + + DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64 " at path %s", + stream->key, + stream->chan->key, + stream->channel_read_only_attributes.path); + + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Closing trace file (fd %d), stream %" PRIu64, + stream->out_fd, stream->key); + assert(0); + goto error; + } + + ret = utils_create_stream_file( + stream->channel_read_only_attributes.path, + stream->name, + stream->channel_read_only_attributes.tracefile_size, + stream->tracefile_count_current, + stream->uid, stream->gid, NULL); + if (ret < 0) { + ERR("Rotate create stream file"); + goto error; + } + stream->out_fd = ret; + stream->tracefile_size_current = 0; + + if (!stream->metadata_flag) { + struct lttng_index_file *index_file; + + lttng_index_file_put(stream->index_file); + + index_file = lttng_index_file_create( + stream->channel_read_only_attributes.path, + stream->name, stream->uid, stream->gid, + stream->channel_read_only_attributes.tracefile_size, + stream->tracefile_count_current, + CTF_INDEX_MAJOR, CTF_INDEX_MINOR); + if (!index_file) { + ERR("Create index file during rotation"); + goto error; + } + stream->index_file = index_file; + stream->out_fd_offset = 0; + } + + ret = 0; + goto end; + +error: + ret = -1; +end: + return ret; + +} + +/* + * Perform the rotation a stream file on the relay. + */ +int rotate_relay_stream(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream) +{ + int ret; + struct consumer_relayd_sock_pair *relayd; + + DBG("Rotate relay stream"); + relayd = consumer_find_relayd(stream->net_seq_idx); + if (!relayd) { + ERR("Failed to find relayd"); + ret = -1; + goto end; + } + + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_rotate_stream(&relayd->control_sock, + stream->relayd_stream_id, + stream->channel_read_only_attributes.path, + stream->chan->current_chunk_id, + stream->last_sequence_number); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret) { + ERR("Rotate relay stream"); + } + +end: + return ret; +} + +/* + * Performs the stream rotation for the rotate session feature if needed. + * It must be called with the stream lock held. + * + * Return 0 on success, a negative number of error. + */ +int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream) +{ + int ret; + + DBG("Consumer rotate stream %" PRIu64, stream->key); + + if (stream->net_seq_idx != (uint64_t) -1ULL) { + ret = rotate_relay_stream(ctx, stream); + } else { + ret = rotate_local_stream(ctx, stream); + } + if (ret < 0) { + ERR("Rotate stream"); + goto error; + } + + if (stream->metadata_flag) { + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + /* + * Reset the position of what has been read from the metadata + * cache to 0 so we can dump it again. + */ + ret = kernctl_metadata_cache_dump(stream->wait_fd); + if (ret < 0) { + ERR("Failed to dump the kernel metadata cache after rotation"); + goto error; + } + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + /* + * Reset the position pushed from the metadata cache so it + * will write from the beginning on the next push. + */ + stream->ust_metadata_pushed = 0; + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + } + lttng_consumer_reset_stream_rotate_state(stream); + + ret = 0; + +error: + return ret; +} + static int rotate_rename_local(const char *old_path, const char *new_path, uid_t uid, gid_t gid) diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 118794257..9e77ff999 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -229,6 +229,20 @@ struct lttng_consumer_channel { uint64_t lost_packets; bool streams_sent_to_relayd; + + /* + * Account how many streams are waiting for their rotation to be + * complete. When this number reaches 0, we inform the session + * daemon that this channel has finished its rotation. + */ + uint64_t nr_stream_rotate_pending; + + /* + * The chunk id where we currently write the data. This value is sent + * to the relay when we add a stream and when a stream rotates. This + * allows to keep track of where each stream on the relay is writing. + */ + uint64_t current_chunk_id; }; /* @@ -789,8 +803,11 @@ void consumer_del_stream_for_data(struct lttng_consumer_stream *stream); void consumer_add_metadata_stream(struct lttng_consumer_stream *stream); void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream); int consumer_create_index_file(struct lttng_consumer_stream *stream); +int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream); int lttng_consumer_rotate_rename(const char *current_path, const char *new_path, uid_t uid, gid_t gid, uint64_t relayd_id); +void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream); int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid, uint64_t relayd_id); diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index 458553bfe..242abbedf 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -942,6 +942,84 @@ error: return ret; } +int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id, + const char *new_pathname, uint64_t new_chunk_id, + uint64_t seq_num) +{ + int ret; + struct lttcomm_relayd_rotate_stream *msg = NULL; + struct lttcomm_relayd_generic_reply reply; + size_t len; + int msg_len; + + /* Code flow error. Safety net. */ + assert(rsock); + + DBG("Sending rotate stream id %" PRIu64 " command to relayd", stream_id); + + /* Account for the trailing NULL. */ + len = strnlen(new_pathname, LTTNG_PATH_MAX) + 1; + if (len > LTTNG_PATH_MAX) { + ERR("Path used in relayd rotate stream command exceeds the maximal allowed length"); + ret = -1; + goto error; + } + + msg_len = offsetof(struct lttcomm_relayd_rotate_stream, new_pathname) + len; + msg = zmalloc(msg_len); + if (!msg) { + PERROR("Failed to allocate relayd rotate stream command of %d bytes", + msg_len); + ret = -1; + goto error; + } + + if (lttng_strncpy(msg->new_pathname, new_pathname, len)) { + ret = -1; + ERR("Failed to copy relayd rotate stream command's new path name"); + goto error; + } + + msg->pathname_length = htobe32(len); + msg->stream_id = htobe64(stream_id); + msg->new_chunk_id = htobe64(new_chunk_id); + /* + * The seq_num is invalid for metadata streams, but it is ignored on + * the relay. + */ + msg->rotate_at_seq_num = htobe64(seq_num); + + /* Send command. */ + ret = send_command(rsock, RELAYD_ROTATE_STREAM, (void *) msg, msg_len, 0); + if (ret < 0) { + ERR("Send rotate command"); + goto error; + } + + /* Receive response. */ + ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + ERR("Receive rotate reply"); + goto error; + } + + reply.ret_code = be32toh(reply.ret_code); + + /* Return session id or negative ret code. */ + if (reply.ret_code != LTTNG_OK) { + ret = -1; + ERR("Relayd rotate stream replied error %d", reply.ret_code); + } else { + /* Success. */ + ret = 0; + DBG("Relayd rotated stream id %" PRIu64 " successfully", stream_id); + } + +error: + free(msg); + return ret; +} + int relayd_rotate_rename(struct lttcomm_relayd_sock *rsock, const char *old_path, const char *new_path) { diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index 5360ae7c8..f6329eb5b 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -51,6 +51,8 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock, uint64_t net_seq_num); int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock, uint64_t stream_id, uint64_t version); +int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id, + const char *new_pathname, uint64_t new_chunk_id, uint64_t seq_num); int relayd_rotate_rename(struct lttcomm_relayd_sock *sock, const char *current_path, const char *new_path); int relayd_mkdir(struct lttcomm_relayd_sock *rsock, const char *path); diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h index 9a733df1e..e9a7e9ff2 100644 --- a/src/common/sessiond-comm/relayd.h +++ b/src/common/sessiond-comm/relayd.h @@ -197,6 +197,7 @@ struct lttcomm_relayd_reset_metadata { struct lttcomm_relayd_rotate_stream { uint64_t stream_id; + /* Ignored for metadata streams. */ uint64_t rotate_at_seq_num; uint64_t new_chunk_id; /* Includes trailing NULL. */