return outfd;
}
-/*
- * Update a stream according to what we just received.
- */
-void consumer_change_stream_state(int stream_key,
- enum lttng_consumer_stream_state state)
-{
- struct lttng_consumer_stream *stream;
-
- pthread_mutex_lock(&consumer_data.lock);
- stream = consumer_find_stream(stream_key, consumer_data.stream_ht);
- if (stream) {
- stream->state = state;
- }
- consumer_data.need_update = 1;
- pthread_mutex_unlock(&consumer_data.lock);
-}
-
static
void consumer_free_channel(struct rcu_head *head)
{
rcu_read_lock();
- /*
- * close all outfd. Called when there are no more threads running (after
- * joining on the threads), no need to protect list iteration with mutex.
- */
- cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node,
- node) {
- struct lttng_consumer_stream *stream =
- caa_container_of(node, struct lttng_consumer_stream, node);
- consumer_del_stream(stream, consumer_data.stream_ht);
- }
-
cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
node) {
struct lttng_consumer_channel *channel =
rcu_read_unlock();
- lttng_ht_destroy(consumer_data.stream_ht);
lttng_ht_destroy(consumer_data.channel_ht);
}
*/
void lttng_consumer_init(void)
{
- consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
* uniquely a stream.
*/
struct lttng_consumer_stream {
+ /* Hash table node for both metadata and data type */
struct lttng_ht_node_ulong node;
- struct lttng_ht_node_ulong waitfd_node;
struct lttng_consumer_channel *chan; /* associated channel */
/*
* key is the key used by the session daemon to refer to the
pthread_mutex_t lock;
/*
- * Number of streams in the hash table. Protected by consumer_data.lock.
+ * Number of streams in the data stream hash table declared outside.
+ * Protected by consumer_data.lock.
*/
int stream_count;
- /*
- * Hash tables of streams and channels. Protected by consumer_data.lock.
- */
- struct lttng_ht *stream_ht;
+
+ /* Channel hash table protected by consumer_data.lock. */
struct lttng_ht *channel_ht;
/*
* Flag specifying if the local array of FDs needs update in the
struct lttng_ht *ht);
extern void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht);
-extern void consumer_change_stream_state(int stream_key,
- enum lttng_consumer_stream_state state);
extern void consumer_del_channel(struct lttng_consumer_channel *channel);
extern struct lttng_consumer_channel *consumer_allocate_channel(
int channel_key,
{
rcu_read_unlock();
return -ENOSYS;
-#if 0
- if (ctx->on_update_stream != NULL) {
- ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
- if (ret == 0) {
- consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state);
- } else if (ret < 0) {
- goto end;
- }
- } else {
- consumer_change_stream_state(msg.u.stream.stream_key,
- msg.u.stream.state);
- }
- break;
-#endif
}
default:
break;