X-Git-Url: http://git.lttng.org./?a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=9db9d085001f5ca1c6979bf58125301c255f4266;hb=f618151398055a753d5c5fb519d98a166ddde09f;hp=4ace64920109dd38e58e75dad6452a78518b47b8;hpb=fa29bfbf73e837b936d80b4d5a1206dfb8496f07;p=lttng-tools.git diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 4ace64920..9db9d0850 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2011 Julien Desfossez + * Copyright (C) 2011 EfficiOS Inc. * Copyright (C) 2011 Mathieu Desnoyers * Copyright (C) 2017 Jérémie Galarneau * @@ -136,10 +136,34 @@ error: return ret; } +static void finalize_snapshot_stream( + struct lttng_consumer_stream *stream, uint64_t relayd_id) +{ + ASSERT_LOCKED(stream->lock); + + if (relayd_id == (uint64_t) -1ULL) { + if (stream->out_fd >= 0) { + const int ret = close(stream->out_fd); + + if (ret < 0) { + PERROR("Failed to close stream snapshot output file descriptor"); + } + + stream->out_fd = -1; + } + } else { + close_relayd_stream(stream); + stream->net_seq_idx = (uint64_t) -1ULL; + } + + lttng_trace_chunk_put(stream->trace_chunk); + stream->trace_chunk = NULL; +} + /* * Take a snapshot of all the stream of a channel * RCU read-side lock must be held across this function to ensure existence of - * channel. The channel lock must be held by the caller. + * channel. * * Returns 0 on success, < 0 on error */ @@ -154,6 +178,9 @@ static int lttng_kconsumer_snapshot_channel( DBG("Kernel consumer snapshot channel %" PRIu64, key); + /* Prevent channel modifications while we perform the snapshot.*/ + pthread_mutex_lock(&channel->lock); + rcu_read_lock(); /* Splice is not supported yet for channel snapshot. */ @@ -197,13 +224,13 @@ static int lttng_kconsumer_snapshot_channel( ret = consumer_send_relayd_stream(stream, path); if (ret < 0) { ERR("sending stream to relayd"); - goto end_unlock; + goto error_finalize_stream; } } else { ret = consumer_stream_create_output_files(stream, false); if (ret < 0) { - goto end_unlock; + goto error_finalize_stream; } DBG("Kernel consumer snapshot stream (%" PRIu64 ")", stream->key); @@ -221,7 +248,7 @@ static int lttng_kconsumer_snapshot_channel( ret = kernctl_buffer_flush(stream->wait_fd); if (ret < 0) { ERR("Failed to flush kernel stream"); - goto end_unlock; + goto error_finalize_stream; } goto end_unlock; } @@ -229,19 +256,19 @@ static int lttng_kconsumer_snapshot_channel( ret = lttng_kconsumer_take_snapshot(stream); if (ret < 0) { ERR("Taking kernel snapshot"); - goto end_unlock; + goto error_finalize_stream; } ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos); if (ret < 0) { ERR("Produced kernel snapshot position"); - goto end_unlock; + goto error_finalize_stream; } ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos); if (ret < 0) { ERR("Consumerd kernel snapshot position"); - goto end_unlock; + goto error_finalize_stream; } consumed_pos = consumer_get_consume_start_pos(consumed_pos, @@ -261,7 +288,7 @@ static int lttng_kconsumer_snapshot_channel( if (ret < 0) { if (ret != -EAGAIN) { PERROR("kernctl_get_subbuf snapshot"); - goto end_unlock; + goto error_finalize_stream; } DBG("Kernel consumer get subbuf failed. Skipping it."); consumed_pos += stream->max_sb_size; @@ -311,26 +338,12 @@ static int lttng_kconsumer_snapshot_channel( ret = kernctl_put_subbuf(stream->wait_fd); if (ret < 0) { ERR("Snapshot kernctl_put_subbuf"); - goto end_unlock; + goto error_finalize_stream; } consumed_pos += stream->max_sb_size; } - if (relayd_id == (uint64_t) -1ULL) { - if (stream->out_fd >= 0) { - ret = close(stream->out_fd); - if (ret < 0) { - PERROR("Kernel consumer snapshot close out_fd"); - goto end_unlock; - } - stream->out_fd = -1; - } - } else { - close_relayd_stream(stream); - stream->net_seq_idx = (uint64_t) -1ULL; - } - lttng_trace_chunk_put(stream->trace_chunk); - stream->trace_chunk = NULL; + finalize_snapshot_stream(stream, relayd_id); pthread_mutex_unlock(&stream->lock); } @@ -343,17 +356,20 @@ error_put_subbuf: if (ret < 0) { ERR("Snapshot kernctl_put_subbuf error path"); } +error_finalize_stream: + finalize_snapshot_stream(stream, relayd_id); end_unlock: pthread_mutex_unlock(&stream->lock); end: rcu_read_unlock(); + pthread_mutex_unlock(&channel->lock); return ret; } /* * Read the whole metadata available for a snapshot. * RCU read-side lock must be held across this function to ensure existence of - * metadata_channel. The channel lock must be held by the caller. + * metadata_channel. * * Returns 0 on success, < 0 on error */ @@ -376,7 +392,7 @@ static int lttng_kconsumer_snapshot_metadata( metadata_stream = metadata_channel->metadata_stream; assert(metadata_stream); - pthread_mutex_lock(&metadata_stream->lock); + metadata_stream->read_subbuffer_ops.lock(metadata_stream); assert(metadata_channel->trace_chunk); assert(metadata_stream->trace_chunk); @@ -403,16 +419,12 @@ static int lttng_kconsumer_snapshot_metadata( ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true); if (ret_read < 0) { - if (ret_read != -EAGAIN) { - ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", - ret_read); - ret = ret_read; - goto error_snapshot; - } - /* ret_read is negative at this point so we will exit the loop. */ - continue; + ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", + ret_read); + ret = ret_read; + goto error_snapshot; } - } while (ret_read >= 0); + } while (ret_read > 0); if (use_relayd) { close_relayd_stream(metadata_stream); @@ -435,8 +447,7 @@ static int lttng_kconsumer_snapshot_metadata( ret = 0; error_snapshot: - pthread_mutex_unlock(&metadata_stream->lock); - cds_list_del(&metadata_stream->send_node); + metadata_stream->read_subbuffer_ops.unlock(metadata_stream); consumer_stream_destroy(metadata_stream, NULL); metadata_channel->metadata_stream = NULL; rcu_read_unlock(); @@ -484,17 +495,23 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, switch (msg.cmd_type) { case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { + uint32_t major = msg.u.relayd_sock.major; + uint32_t minor = msg.u.relayd_sock.minor; + enum lttcomm_sock_proto protocol = (enum lttcomm_sock_proto) + msg.u.relayd_sock.relayd_socket_protocol; + /* Session daemon status message are handled in the following call. */ consumer_add_relayd_socket(msg.u.relayd_sock.net_index, - msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, - &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id, - msg.u.relayd_sock.relayd_session_id); + msg.u.relayd_sock.type, ctx, sock, + consumer_sockpoll, msg.u.relayd_sock.session_id, + msg.u.relayd_sock.relayd_session_id, major, + minor, protocol); goto end_nosignal; } case LTTNG_CONSUMER_ADD_CHANNEL: { struct lttng_consumer_channel *new_channel; - int ret_send_status, ret_add_channel; + int ret_send_status, ret_add_channel = 0; const uint64_t chunk_id = msg.u.channel.chunk_id.value; health_code_update(); @@ -965,7 +982,6 @@ error_streams_sent_nosignal: ERR("Channel %" PRIu64 " not found", key); ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; } else { - pthread_mutex_lock(&channel->lock); if (msg.u.snapshot_channel.metadata == 1) { int ret_snapshot; @@ -993,7 +1009,6 @@ error_streams_sent_nosignal: ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED; } } - pthread_mutex_unlock(&channel->lock); } health_code_update(); @@ -1587,68 +1602,94 @@ end: } static -int get_subbuffer_common(struct lttng_consumer_stream *stream, +enum get_next_subbuffer_status get_subbuffer_common( + struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer) { int ret; + enum get_next_subbuffer_status status; ret = kernctl_get_next_subbuf(stream->wait_fd); - if (ret) { + switch (ret) { + case 0: + status = GET_NEXT_SUBBUFFER_STATUS_OK; + break; + case -ENODATA: + case -EAGAIN: + /* + * The caller only expects -ENODATA when there is no data to + * read, but the kernel tracer returns -EAGAIN when there is + * currently no data for a non-finalized stream, and -ENODATA + * when there is no data for a finalized stream. Those can be + * combined into a -ENODATA return value. + */ + status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA; + goto end; + default: + status = GET_NEXT_SUBBUFFER_STATUS_ERROR; goto end; } ret = stream->read_subbuffer_ops.extract_subbuffer_info( - stream, subbuffer); + stream, subbuffer); + if (ret) { + status = GET_NEXT_SUBBUFFER_STATUS_ERROR; + } end: - return ret; + return status; } static -int get_next_subbuffer_splice(struct lttng_consumer_stream *stream, +enum get_next_subbuffer_status get_next_subbuffer_splice( + struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer) { - int ret; + const enum get_next_subbuffer_status status = + get_subbuffer_common(stream, subbuffer); - ret = get_subbuffer_common(stream, subbuffer); - if (ret) { + if (status != GET_NEXT_SUBBUFFER_STATUS_OK) { goto end; } subbuffer->buffer.fd = stream->wait_fd; end: - return ret; + return status; } static -int get_next_subbuffer_mmap(struct lttng_consumer_stream *stream, +enum get_next_subbuffer_status get_next_subbuffer_mmap( + struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer) { int ret; + enum get_next_subbuffer_status status; const char *addr; - ret = get_subbuffer_common(stream, subbuffer); - if (ret) { + status = get_subbuffer_common(stream, subbuffer); + if (status != GET_NEXT_SUBBUFFER_STATUS_OK) { goto end; } ret = get_current_subbuf_addr(stream, &addr); if (ret) { + status = GET_NEXT_SUBBUFFER_STATUS_ERROR; goto end; } subbuffer->buffer.buffer = lttng_buffer_view_init( addr, 0, subbuffer->info.data.padded_subbuf_size); end: - return ret; + return status; } static -int get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream, +enum get_next_subbuffer_status get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer) { int ret; const char *addr; bool coherent; + enum get_next_subbuffer_status status; ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd, &coherent); @@ -1675,7 +1716,33 @@ int get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream, subbuffer->info.metadata.padded_subbuf_size, coherent ? "true" : "false"); end: - return ret; + /* + * The caller only expects -ENODATA when there is no data to read, but + * the kernel tracer returns -EAGAIN when there is currently no data + * for a non-finalized stream, and -ENODATA when there is no data for a + * finalized stream. Those can be combined into a -ENODATA return value. + */ + switch (ret) { + case 0: + status = GET_NEXT_SUBBUFFER_STATUS_OK; + break; + case -ENODATA: + case -EAGAIN: + /* + * The caller only expects -ENODATA when there is no data to + * read, but the kernel tracer returns -EAGAIN when there is + * currently no data for a non-finalized stream, and -ENODATA + * when there is no data for a finalized stream. Those can be + * combined into a -ENODATA return value. + */ + status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA; + break; + default: + status = GET_NEXT_SUBBUFFER_STATUS_ERROR; + break; + } + + return status; } static @@ -1699,8 +1766,23 @@ int put_next_subbuffer(struct lttng_consumer_stream *stream, static bool is_get_next_check_metadata_available(int tracer_fd) { - return kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL) != - -ENOTTY; + const int ret = kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL); + const bool available = ret != -ENOTTY; + + if (ret == 0) { + /* get succeeded, make sure to put the subbuffer. */ + kernctl_put_subbuf(tracer_fd); + } + + return available; +} + +static +int signal_metadata(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + ASSERT_LOCKED(stream->metadata_rdv_lock); + return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0; } static @@ -1733,6 +1815,8 @@ int lttng_kconsumer_set_stream_ops( metadata_bucket_destroy(stream->metadata_bucket); stream->metadata_bucket = NULL; } + + stream->read_subbuffer_ops.on_sleep = signal_metadata; } if (!stream->read_subbuffer_ops.get_next_subbuffer) {