*/
volatile int consumer_quit = 0;
+/*
+ * The following two hash tables are visible by all threads which are separated
+ * in different source files.
+ *
+ * Global hash table containing respectively metadata and data streams. The
+ * stream element in this ht should only be updated by the metadata poll thread
+ * for the metadata and the data poll thread for the data.
+ */
+struct lttng_ht *metadata_ht = NULL;
+struct lttng_ht *data_ht = NULL;
+
/*
* Find a stream. The consumer_data.lock must be locked during this
* call.
/*
* Add a stream to the global list protected by a mutex.
*/
-int consumer_add_stream(struct lttng_consumer_stream *stream)
+static int consumer_add_stream(struct lttng_consumer_stream *stream,
+ struct lttng_ht *ht)
{
int ret = 0;
struct consumer_relayd_sock_pair *relayd;
assert(stream);
+ assert(ht);
DBG3("Adding consumer stream %d", stream->key);
pthread_mutex_lock(&consumer_data.lock);
rcu_read_lock();
- lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
+ /* Steal stream identifier to avoid having streams with the same key */
+ consumer_steal_stream_key(stream->key, ht);
+
+ lttng_ht_add_unique_ulong(ht, &stream->node);
/* Check and cleanup relayd */
relayd = consumer_find_relayd(stream->net_seq_idx);
*
* Returns the number of fds in the structures.
*/
-int consumer_update_poll_array(
+static int consumer_update_poll_array(
struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
- struct lttng_consumer_stream **local_stream)
+ struct lttng_consumer_stream **local_stream, struct lttng_ht *ht)
{
int i = 0;
struct lttng_ht_iter iter;
DBG("Updating poll fd array");
rcu_read_lock();
- cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
- node.node) {
+ cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
continue;
}
}
}
+/*
+ * Iterate over all streams of the hashtable and free them properly.
+ *
+ * WARNING: *MUST* be used with data stream only.
+ */
+static void destroy_data_stream_ht(struct lttng_ht *ht)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+
+ if (ht == NULL) {
+ return;
+ }
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+ ret = lttng_ht_del(ht, &iter);
+ assert(!ret);
+
+ call_rcu(&stream->node.head, consumer_free_stream);
+ }
+ rcu_read_unlock();
+
+ lttng_ht_destroy(ht);
+}
+
/*
* Iterate over all streams of the hashtable and free them properly.
*
uatomic_dec(&stream->chan->nb_init_streams);
}
+ /* Steal stream identifier to avoid having streams with the same key */
+ consumer_steal_stream_key(stream->key, ht);
+
lttng_ht_add_unique_ulong(ht, &stream->waitfd_node);
rcu_read_unlock();
struct lttng_consumer_stream *stream = NULL;
struct lttng_ht_iter iter;
struct lttng_ht_node_ulong *node;
- struct lttng_ht *metadata_ht = NULL;
struct lttng_poll_event events;
struct lttng_consumer_local_data *ctx = data;
ssize_t len;
DBG("Thread metadata poll started");
- metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
- if (metadata_ht == NULL) {
- goto end;
- }
-
/* Size is set to 1 for the consumer_metadata pipe */
ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
if (ret < 0) {
rcu_register_thread();
+ data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ if (data_ht == NULL) {
+ goto end;
+ }
+
local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
while (1) {
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
- ret = consumer_update_poll_array(ctx, &pollfd, local_stream);
+ ret = consumer_update_poll_array(ctx, &pollfd, local_stream,
+ data_ht);
if (ret < 0) {
ERR("Error in allocating pollfd or local_outfds");
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
continue;
}
- ret = consumer_add_stream(new_stream);
+ ret = consumer_add_stream(new_stream, data_ht);
if (ret) {
ERR("Consumer add stream %d failed. Continuing",
new_stream->key);
if ((pollfd[i].revents & POLLHUP)) {
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
- consumer_del_stream(local_stream[i],
- consumer_data.stream_ht);
+ consumer_del_stream(local_stream[i], data_ht);
num_hup++;
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
- consumer_del_stream(local_stream[i],
- consumer_data.stream_ht);
+ consumer_del_stream(local_stream[i], data_ht);
num_hup++;
}
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
- consumer_del_stream(local_stream[i],
- consumer_data.stream_ht);
+ consumer_del_stream(local_stream[i], data_ht);
num_hup++;
}
}
*/
close(ctx->consumer_metadata_pipe[1]);
+ if (data_ht) {
+ destroy_data_stream_ht(data_ht);
+ }
+
rcu_unregister_thread();
return NULL;
}
consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+
+ metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ assert(metadata_ht);
+ data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ assert(data_ht);
}
/*