X-Git-Url: http://git.lttng.org./?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.cpp;h=48a01a352e7f5ccd268ac13f4120e2c9ba473f62;hb=f250b40e2179eccdb83766bf4abef5a35036c47b;hp=ea3e3619106654d556bdbf543afca0ce75308bd2;hpb=28ab034a2c3582d07d3423d2d746731f87d3969f;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.cpp b/src/common/ust-consumer/ust-consumer.cpp index ea3e36191..48a01a352 100644 --- a/src/common/ust-consumer/ust-consumer.cpp +++ b/src/common/ust-consumer/ust-consumer.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -63,7 +64,7 @@ static int add_channel(struct lttng_consumer_channel *channel, LTTNG_ASSERT(channel); LTTNG_ASSERT(ctx); - if (ctx->on_recv_channel != NULL) { + if (ctx->on_recv_channel != nullptr) { ret = ctx->on_recv_channel(channel); if (ret == 0) { ret = consumer_add_channel(channel, ctx); @@ -95,7 +96,7 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int *_alloc_ret) { int alloc_ret; - struct lttng_consumer_stream *stream = NULL; + struct lttng_consumer_stream *stream = nullptr; LTTNG_ASSERT(channel); LTTNG_ASSERT(ctx); @@ -111,7 +112,7 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, &alloc_ret, channel->type, channel->monitor); - if (stream == NULL) { + if (stream == nullptr) { switch (alloc_ret) { case -ENOENT: /* @@ -167,7 +168,8 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream, stream->globally_visible = 1; cds_list_del_init(&stream->send_node); - ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream)); + ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream)); /* NOLINT sizeof used on a + pointer. */ if (ret < 0) { ERR("Consumer write %s stream to pipe %d", stream->metadata_flag ? "metadata" : "data", @@ -214,7 +216,7 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, int ret, cpu = 0; struct lttng_ust_ctl_consumer_stream *ustream; struct lttng_consumer_stream *stream; - pthread_mutex_t *current_stream_lock = NULL; + pthread_mutex_t *current_stream_lock = nullptr; LTTNG_ASSERT(channel); LTTNG_ASSERT(ctx); @@ -302,7 +304,7 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, } } pthread_mutex_unlock(&stream->lock); - current_stream_lock = NULL; + current_stream_lock = nullptr; } return 0; @@ -539,7 +541,7 @@ static int send_channel_to_sessiond_and_relayd(int sock, } /* Tell sessiond there is no more stream. */ - ret = lttng_ust_ctl_send_stream_to_sessiond(sock, NULL); + ret = lttng_ust_ctl_send_stream_to_sessiond(sock, nullptr); if (ret < 0) { goto error; } @@ -665,7 +667,7 @@ static int flush_channel(uint64_t chan_key) DBG("UST consumer flush channel key %" PRIu64, chan_key); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; channel = consumer_find_channel(chan_key); if (!channel) { ERR("UST consumer flush channel %" PRIu64 " not found", chan_key); @@ -720,7 +722,6 @@ static int flush_channel(uint64_t chan_key) */ sample_and_send_channel_buffer_stats(channel); error: - rcu_read_unlock(); return ret; } @@ -740,7 +741,7 @@ static int clear_quiescent_channel(uint64_t chan_key) DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; channel = consumer_find_channel(chan_key); if (!channel) { ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key); @@ -766,7 +767,6 @@ static int clear_quiescent_channel(uint64_t chan_key) pthread_mutex_unlock(&stream->lock); } error: - rcu_read_unlock(); return ret; } @@ -928,8 +928,8 @@ error: * the stream is still in the local stream list of the channel. This call * will make sure to clean that list. */ - consumer_stream_destroy(metadata->metadata_stream, NULL); - metadata->metadata_stream = NULL; + consumer_stream_destroy(metadata->metadata_stream, nullptr); + metadata->metadata_stream = nullptr; send_streams_error: error_no_stream: end: @@ -957,7 +957,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", key, path); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; LTTNG_ASSERT(!metadata_channel->monitor); @@ -1011,11 +1011,10 @@ error_stream: * Clean up the stream completely because the next snapshot will use a * new metadata stream. */ - consumer_stream_destroy(metadata_stream, NULL); - metadata_channel->metadata_stream = NULL; + consumer_stream_destroy(metadata_stream, nullptr); + metadata_channel->metadata_stream = nullptr; error: - rcu_read_unlock(); return ret; } @@ -1066,7 +1065,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, LTTNG_ASSERT(ctx); ASSERT_RCU_READ_LOCKED(); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; if (relayd_id != (uint64_t) -1ULL) { use_relayd = 1; @@ -1217,7 +1216,6 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, pthread_mutex_unlock(&stream->lock); } - rcu_read_unlock(); return 0; error_put_subbuf: @@ -1228,7 +1226,6 @@ error_close_stream: consumer_stream_close_output(stream); error_unlock: pthread_mutex_unlock(&stream->lock); - rcu_read_unlock(); return ret; } @@ -1306,7 +1303,7 @@ int lttng_ustconsumer_recv_metadata(int sock, * channel is under a snapshot session type. No need to update * the stream position in that scenario. */ - if (channel->metadata_stream != NULL) { + if (channel->metadata_stream != nullptr) { pthread_mutex_lock(&channel->metadata_stream->lock); metadata_stream_reset_cache_consumed_position(channel->metadata_stream); pthread_mutex_unlock(&channel->metadata_stream->lock); @@ -1368,7 +1365,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int ret_func; enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttcomm_consumer_msg msg; - struct lttng_consumer_channel *channel = NULL; + struct lttng_consumer_channel *channel = nullptr; health_code_update(); @@ -1400,7 +1397,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); /* relayd needs RCU read-side lock */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; switch (msg.cmd_type) { case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: @@ -1432,7 +1429,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Get relayd reference if exists. */ relayd = consumer_find_relayd(index); - if (relayd == NULL) { + if (relayd == nullptr) { DBG("Unable to find relayd %" PRIu64, index); ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } @@ -1455,7 +1452,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_UPDATE_STREAM: { - rcu_read_unlock(); return -ENOSYS; } case LTTNG_CONSUMER_DATA_PENDING: @@ -1495,7 +1491,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, channel = consumer_allocate_channel( msg.u.ask_channel.key, msg.u.ask_channel.session_id, - msg.u.ask_channel.chunk_id.is_set ? &chunk_id : NULL, + msg.u.ask_channel.chunk_id.is_set ? &chunk_id : nullptr, msg.u.ask_channel.pathname, msg.u.ask_channel.name, msg.u.ask_channel.relayd_id, @@ -1876,7 +1872,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, uint64_t key = msg.u.discarded_events.channel_key; DBG("UST consumer discarded events command for session id %" PRIu64, id); - rcu_read_lock(); pthread_mutex_lock(&the_consumer_data.lock); ht = the_consumer_data.stream_list_ht; @@ -1903,7 +1898,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } pthread_mutex_unlock(&the_consumer_data.lock); - rcu_read_unlock(); DBG("UST consumer discarded events command for session id %" PRIu64 ", channel key %" PRIu64, @@ -1932,7 +1926,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, uint64_t key = msg.u.lost_packets.channel_key; DBG("UST consumer lost packets command for session id %" PRIu64, id); - rcu_read_lock(); pthread_mutex_lock(&the_consumer_data.lock); ht = the_consumer_data.stream_list_ht; @@ -1958,7 +1951,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } pthread_mutex_unlock(&the_consumer_data.lock); - rcu_read_unlock(); DBG("UST consumer lost packets command for session id %" PRIu64 ", channel key %" PRIu64, @@ -2131,8 +2123,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, const uint64_t relayd_id = msg.u.create_trace_chunk.relayd_id.value; const char *chunk_override_name = *msg.u.create_trace_chunk.override_name ? msg.u.create_trace_chunk.override_name : - NULL; - struct lttng_directory_handle *chunk_directory_handle = NULL; + nullptr; + struct lttng_directory_handle *chunk_directory_handle = nullptr; /* * The session daemon will only provide a chunk directory file @@ -2172,12 +2164,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } ret_code = lttng_consumer_create_trace_chunk( - !is_local_trace ? &relayd_id : NULL, + !is_local_trace ? &relayd_id : nullptr, msg.u.create_trace_chunk.session_id, msg.u.create_trace_chunk.chunk_id, (time_t) msg.u.create_trace_chunk.creation_timestamp, chunk_override_name, - msg.u.create_trace_chunk.credentials.is_set ? &credentials : NULL, + msg.u.create_trace_chunk.credentials.is_set ? &credentials : nullptr, chunk_directory_handle); lttng_directory_handle_put(chunk_directory_handle); goto end_msg_sessiond; @@ -2192,11 +2184,11 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int ret; ret_code = lttng_consumer_close_trace_chunk( - msg.u.close_trace_chunk.relayd_id.is_set ? &relayd_id : NULL, + msg.u.close_trace_chunk.relayd_id.is_set ? &relayd_id : nullptr, msg.u.close_trace_chunk.session_id, msg.u.close_trace_chunk.chunk_id, (time_t) msg.u.close_trace_chunk.close_timestamp, - msg.u.close_trace_chunk.close_command.is_set ? &close_command : NULL, + msg.u.close_trace_chunk.close_command.is_set ? &close_command : nullptr, closed_trace_chunk_path); reply.ret_code = ret_code; reply.path_length = strlen(closed_trace_chunk_path) + 1; @@ -2215,7 +2207,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, const uint64_t relayd_id = msg.u.trace_chunk_exists.relayd_id.value; ret_code = lttng_consumer_trace_chunk_exists( - msg.u.trace_chunk_exists.relayd_id.is_set ? &relayd_id : NULL, + msg.u.trace_chunk_exists.relayd_id.is_set ? &relayd_id : nullptr, msg.u.trace_chunk_exists.session_id, msg.u.trace_chunk_exists.chunk_id); goto end_msg_sessiond; @@ -2279,7 +2271,7 @@ end_channel_error: { int ret_send_status; - ret_send_status = consumer_send_status_channel(sock, NULL); + ret_send_status = consumer_send_status_channel(sock, nullptr); if (ret_send_status < 0) { /* Stop everything if session daemon can not be notified. */ goto error_fatal; @@ -2295,7 +2287,6 @@ error_fatal: goto end; end: - rcu_read_unlock(); health_code_update(); return ret_func; } @@ -2864,7 +2855,7 @@ static int get_next_subbuffer_common(struct lttng_consumer_stream *stream, subbuffer->buffer.buffer = lttng_buffer_view_init(addr, 0, subbuffer->info.data.padded_subbuf_size); - LTTNG_ASSERT(subbuffer->buffer.buffer.data != NULL); + LTTNG_ASSERT(subbuffer->buffer.buffer.data != nullptr); end: return ret; } @@ -3212,15 +3203,17 @@ void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht) DBG("UST consumer closing all metadata streams"); - rcu_read_lock(); - cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { - health_code_update(); + { + lttng::urcu::read_lock_guard read_lock; - pthread_mutex_lock(&stream->chan->lock); - lttng_ustconsumer_close_metadata(stream->chan); - pthread_mutex_unlock(&stream->chan->lock); + cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { + health_code_update(); + + pthread_mutex_lock(&stream->chan->lock); + lttng_ustconsumer_close_metadata(stream->chan); + pthread_mutex_unlock(&stream->chan->lock); + } } - rcu_read_unlock(); } void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)