*/
static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
{
- struct lttng_consumer_stream *stream, *stmp;
-
LTTNG_ASSERT(channel);
/* Delete streams that might have been left in the stream list. */
- cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
+ for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+ <tng_consumer_stream::send_node>(
+ channel->streams.head)) {
consumer_stream_destroy(stream, nullptr);
}
}
static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
{
- int ret;
- struct lttng_consumer_stream *stream;
-
const lttng::urcu::read_lock_guard read_lock;
- pthread_mutex_lock(&channel->lock);
- cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+ const lttng::pthread::lock_guard channel_lock(channel->lock);
+
+ for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+ <tng_consumer_stream::send_node>(
+ channel->streams.head)) {
health_code_update();
- pthread_mutex_lock(&stream->lock);
- ret = consumer_clear_stream(stream);
+
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
+ const auto ret = consumer_clear_stream(stream);
if (ret) {
- goto error_unlock;
+ return ret;
}
- pthread_mutex_unlock(&stream->lock);
}
- pthread_mutex_unlock(&channel->lock);
- return 0;
-error_unlock:
- pthread_mutex_unlock(&stream->lock);
- pthread_mutex_unlock(&channel->lock);
- return ret;
+ return 0;
}
/*
enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel)
{
- struct lttng_consumer_stream *stream;
enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
if (channel->metadata_stream) {
ERR("Open channel packets command attempted on a metadata channel");
- ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
- goto end;
+ return LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
}
- {
- const lttng::urcu::read_lock_guard read_lock;
- cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
- enum consumer_stream_open_packet_status status;
-
- pthread_mutex_lock(&stream->lock);
- if (cds_lfht_is_node_deleted(&stream->node.node)) {
- goto next;
- }
+ const lttng::urcu::read_lock_guard read_lock;
+ for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+ <tng_consumer_stream::send_node>(
+ channel->streams.head)) {
+ enum consumer_stream_open_packet_status status;
- status = consumer_stream_open_packet(stream);
- switch (status) {
- case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
- DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key,
- stream->chan->name,
- stream->chan->session_id);
- stream->opened_packet_in_current_trace_chunk = true;
- break;
- case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
- DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key,
- stream->chan->name,
- stream->chan->session_id);
- break;
- case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
- /*
- * Only unexpected internal errors can lead to this
- * failing. Report an unknown error.
- */
- ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
- ", channel id = %" PRIu64 ", channel name = %s"
- ", session id = %" PRIu64,
- stream->key,
- channel->key,
- channel->name,
- channel->session_id);
- ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
- goto error_unlock;
- default:
- abort();
- }
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
+ if (cds_lfht_is_node_deleted(&stream->node.node)) {
+ continue;
+ }
- next:
- pthread_mutex_unlock(&stream->lock);
+ status = consumer_stream_open_packet(stream);
+ switch (status) {
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+ DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key,
+ stream->chan->name,
+ stream->chan->session_id);
+ stream->opened_packet_in_current_trace_chunk = true;
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+ DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key,
+ stream->chan->name,
+ stream->chan->session_id);
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+ /*
+ * Only unexpected internal errors can lead to this
+ * failing. Report an unknown error.
+ */
+ ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel id = %" PRIu64 ", channel name = %s"
+ ", session id = %" PRIu64,
+ stream->key,
+ channel->key,
+ channel->name,
+ channel->session_id);
+ return LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
+ default:
+ abort();
}
}
-end_rcu_unlock:
-end:
- return ret;
-error_unlock:
- pthread_mutex_unlock(&stream->lock);
- goto end_rcu_unlock;
+ return ret;
}
void lttng_consumer_sigbus_handle(void *addr)