#include <common/string-utils/format.h>
#include <common/dynamic-array.h>
-struct lttng_consumer_global_data consumer_data = {
- .stream_count = 0,
- .need_update = 1,
- .type = LTTNG_CONSUMER_UNKNOWN,
+struct lttng_consumer_global_data the_consumer_data = {
+ .stream_count = 0,
+ .need_update = 1,
+ .type = LTTNG_CONSUMER_UNKNOWN,
};
enum consumer_channel_action {
static const char *get_consumer_domain(void)
{
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
return DEFAULT_KERNEL_TRACE_DIR;
case LTTNG_CONSUMER64_UST:
return NULL;
}
- lttng_ht_lookup(consumer_data.channel_ht, &key, &iter);
+ lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
if (node != NULL) {
channel = caa_container_of(node, struct lttng_consumer_channel, node);
struct lttng_consumer_channel *channel =
caa_container_of(node, struct lttng_consumer_channel, node);
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
case LTTNG_CONSUMER32_UST:
DBG("Consumer destroy and close relayd socket pair");
iter.iter.node = &relayd->node.node;
- ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
+ ret = lttng_ht_del(the_consumer_data.relayd_ht, &iter);
if (ret != 0) {
/* We assume the relayd is being or is destroyed */
return;
DBG("Consumer delete channel key %" PRIu64, channel->key);
- pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&the_consumer_data.lock);
pthread_mutex_lock(&channel->lock);
/* Destroy streams that might have been left in the stream list. */
consumer_timer_monitor_stop(channel);
}
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
case LTTNG_CONSUMER32_UST:
rcu_read_lock();
iter.iter.node = &channel->node.node;
- ret = lttng_ht_del(consumer_data.channel_ht, &iter);
+ ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
assert(!ret);
iter.iter.node = &channel->channels_by_session_id_ht_node.node;
- ret = lttng_ht_del(consumer_data.channels_by_session_id_ht,
+ ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht,
&iter);
assert(!ret);
rcu_read_unlock();
call_rcu(&channel->node.head, free_channel_rcu);
end:
pthread_mutex_unlock(&channel->lock);
- pthread_mutex_unlock(&consumer_data.lock);
+ pthread_mutex_unlock(&the_consumer_data.lock);
}
/*
rcu_read_lock();
- cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
- node.node) {
+ cds_lfht_for_each_entry(the_consumer_data.relayd_ht->ht, &iter.iter,
+ relayd, node.node) {
consumer_destroy_relayd(relayd);
}
rcu_read_unlock();
- lttng_ht_destroy(consumer_data.relayd_ht);
+ lttng_ht_destroy(the_consumer_data.relayd_ht);
}
/*
DBG3("Adding consumer stream %" PRIu64, stream->key);
- pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&the_consumer_data.lock);
pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
lttng_ht_add_unique_u64(ht, &stream->node);
- lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
+ lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht,
&stream->node_channel_id);
/*
* the key since the HT does not use it and we allow to add redundant keys
* into this table.
*/
- lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
+ lttng_ht_add_u64(the_consumer_data.stream_list_ht,
+ &stream->node_session_id);
/*
* When nb_init_stream_left reaches 0, we don't need to trigger any action
}
/* Update consumer data once the node is inserted. */
- consumer_data.stream_count++;
- consumer_data.need_update = 1;
+ the_consumer_data.stream_count++;
+ the_consumer_data.need_update = 1;
rcu_read_unlock();
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->timer_lock);
pthread_mutex_unlock(&stream->chan->lock);
- pthread_mutex_unlock(&consumer_data.lock);
+ pthread_mutex_unlock(&the_consumer_data.lock);
}
/*
assert(relayd);
- lttng_ht_lookup(consumer_data.relayd_ht,
- &relayd->net_seq_idx, &iter);
+ lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx,
+ &iter);
node = lttng_ht_iter_get_node_u64(&iter);
if (node != NULL) {
goto end;
}
- lttng_ht_add_unique_u64(consumer_data.relayd_ht, &relayd->node);
+ lttng_ht_add_unique_u64(the_consumer_data.relayd_ht, &relayd->node);
end:
return ret;
goto error;
}
- lttng_ht_lookup(consumer_data.relayd_ht, &key,
- &iter);
+ lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
if (node != NULL) {
relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
assert(stream->metadata_flag);
assert(stream->chan->trace_chunk);
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
/*
* Reset the position of what has been read from the
if (chunk_id) {
trace_chunk = lttng_trace_chunk_registry_find_chunk(
- consumer_data.chunk_registry, session_id,
+ the_consumer_data.chunk_registry, session_id,
*chunk_id);
if (!trace_chunk) {
ERR("Failed to find trace chunk reference during creation of channel");
int consumer_add_channel(struct lttng_consumer_channel *channel,
struct lttng_consumer_local_data *ctx)
{
- pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&the_consumer_data.lock);
pthread_mutex_lock(&channel->lock);
pthread_mutex_lock(&channel->timer_lock);
steal_channel_key(channel->key);
rcu_read_lock();
- lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
- lttng_ht_add_u64(consumer_data.channels_by_session_id_ht,
+ lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
+ lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
&channel->channels_by_session_id_ht_node);
rcu_read_unlock();
channel->is_published = true;
pthread_mutex_unlock(&channel->timer_lock);
pthread_mutex_unlock(&channel->lock);
- pthread_mutex_unlock(&consumer_data.lock);
+ pthread_mutex_unlock(&the_consumer_data.lock);
if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
rcu_read_lock();
- cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
- node.node) {
+ cds_lfht_for_each_entry(the_consumer_data.channel_ht->ht, &iter.iter,
+ channel, node.node) {
consumer_del_channel(channel);
}
rcu_read_unlock();
- lttng_ht_destroy(consumer_data.channel_ht);
- lttng_ht_destroy(consumer_data.channels_by_session_id_ht);
+ lttng_ht_destroy(the_consumer_data.channel_ht);
+ lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht);
cleanup_relayd_ht();
- lttng_ht_destroy(consumer_data.stream_per_chan_id_ht);
+ lttng_ht_destroy(the_consumer_data.stream_per_chan_id_ht);
/*
* This HT contains streams that are freed by either the metadata thread or
* the data thread so we do *nothing* on the hash table and simply destroy
* it.
*/
- lttng_ht_destroy(consumer_data.stream_list_ht);
+ lttng_ht_destroy(the_consumer_data.stream_list_ht);
/*
* Trace chunks in the registry may still exist if the session
* to hit.
*/
trace_chunks_left = lttng_trace_chunk_registry_put_each_chunk(
- consumer_data.chunk_registry);
+ the_consumer_data.chunk_registry);
if (trace_chunks_left) {
ERR("%u trace chunks are leaked by lttng-consumerd. "
"This can be caused by an internal error of the session daemon.",
}
/* Run all callbacks freeing each chunk. */
rcu_barrier();
- lttng_trace_chunk_registry_destroy(consumer_data.chunk_registry);
+ lttng_trace_chunk_registry_destroy(the_consumer_data.chunk_registry);
}
/*
int ret;
struct lttng_consumer_local_data *ctx;
- assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
- consumer_data.type == type);
- consumer_data.type = type;
+ assert(the_consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
+ the_consumer_data.type == type);
+ the_consumer_data.type = type;
ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
if (ctx == NULL) {
int *splice_pipe;
unsigned int relayd_hang_up = 0;
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
case LTTNG_CONSUMER32_UST:
*/
int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
{
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
return lttng_kconsumer_sample_snapshot_positions(stream);
case LTTNG_CONSUMER32_UST:
*/
int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
{
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
return lttng_kconsumer_take_snapshot(stream);
case LTTNG_CONSUMER32_UST:
int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
unsigned long *pos)
{
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
return lttng_kconsumer_get_produced_snapshot(stream, pos);
case LTTNG_CONSUMER32_UST:
int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
unsigned long *pos)
{
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
return lttng_kconsumer_get_consumed_snapshot(stream, pos);
case LTTNG_CONSUMER32_UST:
int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll)
{
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
case LTTNG_CONSUMER32_UST:
static
void lttng_consumer_close_all_metadata(void)
{
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
/*
* The Kernel consumer has a different metadata scheme so we don't
DBG3("Consumer delete metadata stream %d", stream->wait_fd);
- pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&the_consumer_data.lock);
/*
* Note that this assumes that a stream's channel is never changed and
* that the stream's lock doesn't need to be taken to sample its
}
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&channel->lock);
- pthread_mutex_unlock(&consumer_data.lock);
+ pthread_mutex_unlock(&the_consumer_data.lock);
if (free_channel) {
consumer_del_channel(channel);
DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
- pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&the_consumer_data.lock);
pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
lttng_ht_add_unique_u64(ht, &stream->node);
- lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
- &stream->node_channel_id);
+ lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht,
+ &stream->node_channel_id);
/*
* Add stream to the stream_list_ht of the consumer data. No need to steal
* the key since the HT does not use it and we allow to add redundant keys
* into this table.
*/
- lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
+ lttng_ht_add_u64(the_consumer_data.stream_list_ht,
+ &stream->node_session_id);
rcu_read_unlock();
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&stream->chan->timer_lock);
- pthread_mutex_unlock(&consumer_data.lock);
+ pthread_mutex_unlock(&the_consumer_data.lock);
}
/*
}
} else if (revents & (LPOLLERR | LPOLLHUP)) {
DBG("Metadata fd %d is hup|err.", pollfd);
- if (!stream->hangup_flush_done
- && (consumer_data.type == LTTNG_CONSUMER32_UST
- || consumer_data.type == LTTNG_CONSUMER64_UST)) {
+ if (!stream->hangup_flush_done &&
+ (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
+ the_consumer_data.type ==
+ LTTNG_CONSUMER64_UST)) {
DBG("Attempting to flush and consume the UST buffers");
lttng_ustconsumer_on_stream_hangup(stream);
* the fds set has been updated, we need to update our
* local array as well
*/
- pthread_mutex_lock(&consumer_data.lock);
- if (consumer_data.need_update) {
+ pthread_mutex_lock(&the_consumer_data.lock);
+ if (the_consumer_data.need_update) {
free(pollfd);
pollfd = NULL;
local_stream = NULL;
/* Allocate for all fds */
- pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd));
+ pollfd = zmalloc((the_consumer_data.stream_count +
+ nb_pipes_fd) *
+ sizeof(struct pollfd));
if (pollfd == NULL) {
PERROR("pollfd malloc");
- pthread_mutex_unlock(&consumer_data.lock);
+ pthread_mutex_unlock(&the_consumer_data.lock);
goto end;
}
- local_stream = zmalloc((consumer_data.stream_count + nb_pipes_fd) *
+ local_stream = zmalloc((the_consumer_data.stream_count +
+ nb_pipes_fd) *
sizeof(struct lttng_consumer_stream *));
if (local_stream == NULL) {
PERROR("local_stream malloc");
- pthread_mutex_unlock(&consumer_data.lock);
+ pthread_mutex_unlock(&the_consumer_data.lock);
goto end;
}
ret = update_poll_array(ctx, &pollfd, local_stream,
if (ret < 0) {
ERR("Error in allocating pollfd or local_outfds");
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
- pthread_mutex_unlock(&consumer_data.lock);
+ pthread_mutex_unlock(&the_consumer_data.lock);
goto end;
}
nb_fd = ret;
- consumer_data.need_update = 0;
+ the_consumer_data.need_update = 0;
}
- pthread_mutex_unlock(&consumer_data.lock);
+ pthread_mutex_unlock(&the_consumer_data.lock);
/* No FDs and consumer_quit, consumer_cleanup the thread */
if (nb_fd == 0 && nb_inactive_fd == 0 &&
}
if (!local_stream[i]->hangup_flush_done
&& (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
- && (consumer_data.type == LTTNG_CONSUMER32_UST
- || consumer_data.type == LTTNG_CONSUMER64_UST)) {
+ && (the_consumer_data.type == LTTNG_CONSUMER32_UST
+ || the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
DBG("fd %d is hup|err|nval. Attempting flush and read.",
pollfd[i].fd);
lttng_ustconsumer_on_stream_hangup(local_stream[i]);
struct lttng_consumer_stream *stream;
struct lttng_ht_iter iter;
- ht = consumer_data.stream_per_chan_id_ht;
+ ht = the_consumer_data.stream_per_chan_id_ht;
rcu_read_lock();
cds_lfht_for_each_entry_duplicate(ht->ht,
if (cds_lfht_is_node_deleted(&stream->node.node)) {
goto next;
}
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
case LTTNG_CONSUMER32_UST:
ret = lttng_ht_del(channel_ht, &iter);
assert(ret == 0);
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
case LTTNG_CONSUMER32_UST:
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
{
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
return lttng_kconsumer_on_recv_stream(stream);
case LTTNG_CONSUMER32_UST:
*/
int lttng_consumer_init(void)
{
- consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
- if (!consumer_data.channel_ht) {
+ the_consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!the_consumer_data.channel_ht) {
goto error;
}
- consumer_data.channels_by_session_id_ht =
+ the_consumer_data.channels_by_session_id_ht =
lttng_ht_new(0, LTTNG_HT_TYPE_U64);
- if (!consumer_data.channels_by_session_id_ht) {
+ if (!the_consumer_data.channels_by_session_id_ht) {
goto error;
}
- consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
- if (!consumer_data.relayd_ht) {
+ the_consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!the_consumer_data.relayd_ht) {
goto error;
}
- consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
- if (!consumer_data.stream_list_ht) {
+ the_consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!the_consumer_data.stream_list_ht) {
goto error;
}
- consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
- if (!consumer_data.stream_per_chan_id_ht) {
+ the_consumer_data.stream_per_chan_id_ht =
+ lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!the_consumer_data.stream_per_chan_id_ht) {
goto error;
}
goto error;
}
- consumer_data.chunk_registry = lttng_trace_chunk_registry_create();
- if (!consumer_data.chunk_registry) {
+ the_consumer_data.chunk_registry = lttng_trace_chunk_registry_create();
+ if (!the_consumer_data.chunk_registry) {
goto error;
}
struct consumer_relayd_sock_pair *relayd = NULL;
/* Iterate over all relayd since they are indexed by net_seq_idx. */
- cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
- node.node) {
+ cds_lfht_for_each_entry(the_consumer_data.relayd_ht->ht, &iter.iter,
+ relayd, node.node) {
/*
* Check by sessiond id which is unique here where the relayd session
* id might not be when having multiple relayd.
DBG("Consumer data pending command on session id %" PRIu64, id);
rcu_read_lock();
- pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&the_consumer_data.lock);
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
data_pending = lttng_kconsumer_data_pending;
break;
}
/* Ease our life a bit */
- ht = consumer_data.stream_list_ht;
+ ht = the_consumer_data.stream_list_ht;
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&id, lttng_ht_seed),
data_not_pending:
/* Data is available to be read by a viewer. */
- pthread_mutex_unlock(&consumer_data.lock);
+ pthread_mutex_unlock(&the_consumer_data.lock);
rcu_read_unlock();
return 0;
data_pending:
/* Data is still being extracted from buffers. */
- pthread_mutex_unlock(&consumer_data.lock);
+ pthread_mutex_unlock(&the_consumer_data.lock);
rcu_read_unlock();
return 1;
}
int ret;
struct lttng_consumer_stream *stream;
struct lttng_ht_iter iter;
- struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+ struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
struct lttng_dynamic_array stream_rotation_positions;
uint64_t next_chunk_id, stream_count = 0;
enum lttng_trace_chunk_status chunk_status;
goto end;
}
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
ret = kernctl_buffer_clear(stream->wait_fd);
if (ret < 0) {
int ret;
struct lttng_consumer_stream *stream;
struct lttng_ht_iter iter;
- struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+ struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
rcu_read_lock();
}
published_chunk = lttng_trace_chunk_registry_publish_chunk(
- consumer_data.chunk_registry, session_id,
+ the_consumer_data.chunk_registry, session_id,
created_chunk);
lttng_trace_chunk_put(created_chunk);
created_chunk = NULL;
}
rcu_read_lock();
- cds_lfht_for_each_entry_duplicate(consumer_data.channels_by_session_id_ht->ht,
- consumer_data.channels_by_session_id_ht->hash_fct(
+ cds_lfht_for_each_entry_duplicate(
+ the_consumer_data.channels_by_session_id_ht->ht,
+ the_consumer_data.channels_by_session_id_ht->hash_fct(
&session_id, lttng_ht_seed),
- consumer_data.channels_by_session_id_ht->match_fct,
+ the_consumer_data.channels_by_session_id_ht->match_fct,
&session_id, &iter.iter, channel,
channels_by_session_id_ht_node.node) {
ret = lttng_consumer_channel_set_trace_chunk(channel,
close_command_name);
chunk = lttng_trace_chunk_registry_find_chunk(
- consumer_data.chunk_registry, session_id, chunk_id);
+ the_consumer_data.chunk_registry, session_id, chunk_id);
if (!chunk) {
ERR("Failed to find chunk: session_id = %" PRIu64
", chunk_id = %" PRIu64,
* current chunk found in the session's channels.
*/
rcu_read_lock();
- cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter,
+ cds_lfht_for_each_entry(the_consumer_data.channel_ht->ht, &iter.iter,
channel, node.node) {
int ret;
", chunk_id = %" PRIu64, relayd_id_str,
chunk_id);
ret = lttng_trace_chunk_registry_chunk_exists(
- consumer_data.chunk_registry, session_id,
- chunk_id, &chunk_exists_local);
+ the_consumer_data.chunk_registry, session_id, chunk_id,
+ &chunk_exists_local);
if (ret) {
/* Internal error. */
ERR("Failed to query the existence of a trace chunk");
struct lttng_ht_iter iter;
int ret;
- ht = consumer_data.stream_per_chan_id_ht;
+ ht = the_consumer_data.stream_per_chan_id_ht;
rcu_read_lock();
cds_lfht_for_each_entry_duplicate(ht->ht,