PERROR("fcntl session fd");
}
+ lks->id = session->id;
lks->consumer_fds_sent = 0;
session->kernel_session = lks;
*/
struct consumer_output *consumer;
struct consumer_output *tmp_consumer;
+ /* Tracing session id */
+ unsigned int id;
};
/*
gid_t gid,
int net_index,
int metadata_flag,
+ uint64_t session_id,
int *alloc_ret)
{
struct lttng_consumer_stream *stream;
stream->gid = gid;
stream->net_seq_idx = net_index;
stream->metadata_flag = metadata_flag;
+ stream->session_id = session_id;
strncpy(stream->path_name, path_name, sizeof(stream->path_name));
stream->path_name[sizeof(stream->path_name) - 1] = '\0';
+ pthread_mutex_init(&stream->lock, NULL);
/*
* Index differently the metadata node because the thread is using an
lttng_ht_node_init_ulong(&stream->node, stream->key);
}
+ /* Init session id node with the stream session id */
+ lttng_ht_node_init_ulong(&stream->node_session_id, stream->session_id);
+
/*
* The cpu number is needed before using any ustctl_* actions. Ignored for
* the kernel so the value does not matter.
pthread_mutex_unlock(&consumer_data.lock);
DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
- " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
- stream->shm_fd, stream->wait_fd,
+ " out_fd %d, net_seq_idx %d, session_id %" PRIu64,
+ stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
(unsigned long long) stream->mmap_len, stream->out_fd,
- stream->net_seq_idx);
+ stream->net_seq_idx, stream->session_id);
return stream;
error:
{
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
assert(metadata_ht);
LTTNG_CONSUMER_ADD_RELAYD_SOCKET,
/* Inform the consumer to kill a specific relayd connection */
LTTNG_CONSUMER_DESTROY_RELAYD,
+ /* Return to the sessiond if there is data pending for a session */
+ LTTNG_CONSUMER_DATA_AVAILABLE,
};
/* State of each fd in consumer */
* uniquely a stream.
*/
struct lttng_consumer_stream {
- /* Hash table node for both metadata and data type */
+ /* HT node used by the data_ht and metadata_ht */
struct lttng_ht_node_ulong node;
+ /* HT node used in consumer_data.stream_list_ht */
+ struct lttng_ht_node_ulong node_session_id;
struct lttng_consumer_channel *chan; /* associated channel */
/*
* key is the key used by the session daemon to refer to the
uint64_t relayd_stream_id;
/* Next sequence number to use for trace packet */
uint64_t next_net_seq_num;
+ /* Lock to use the stream FDs since they are used between threads. */
+ pthread_mutex_t lock;
+ /* Tracing session id */
+ uint64_t session_id;
};
/*
* stream has an index which associate the right relayd socket to use.
*/
struct lttng_ht *relayd_ht;
+
+ /*
+ * This hash table contains all streams (metadata and data) indexed by
+ * session id. In other words, the ht is indexed by session id and each
+ * bucket contains the list of associated streams.
+ *
+ * This HT uses the "node_session_id" of the consumer stream.
+ */
+ struct lttng_ht *stream_list_ht;
};
/* Defined in consumer.c and coupled with explanations */
gid_t gid,
int net_index,
int metadata_flag,
+ uint64_t session_id,
int *alloc_ret);
extern void consumer_del_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht);
msg.u.stream.gid,
msg.u.stream.net_index,
msg.u.stream.metadata_flag,
+ msg.u.stream.session_id,
&alloc_ret);
if (new_stream == NULL) {
switch (alloc_ret) {
goto end_nosignal;
}
+ case LTTNG_CONSUMER_DATA_AVAILABLE:
+ {
+ rcu_read_unlock();
+ return -ENOSYS;
+ }
default:
goto end_nosignal;
}
int net_index;
unsigned int metadata_flag;
char name[LTTNG_SYMBOL_NAME_LEN]; /* Name string of the stream */
+ uint64_t session_id; /* Tracing session id of the stream */
} stream;
struct {
int net_index;
struct {
uint64_t net_seq_idx;
} destroy_relayd;
+ struct {
+ uint64_t session_id;
+ } data_available;
} u;
};
msg.u.stream.gid,
msg.u.stream.net_index,
msg.u.stream.metadata_flag,
+ msg.u.stream.session_id,
&alloc_ret);
if (new_stream == NULL) {
switch (alloc_ret) {
rcu_read_unlock();
return -ENOSYS;
}
+ case LTTNG_CONSUMER_DATA_AVAILABLE:
+ {
+ rcu_read_unlock();
+ return -ENOSYS;
+ }
default:
break;
}