/* Send relayd socket to consumer. */
ret = consumer_send_relayd_socket(consumer_sock, sock,
- consumer, relayd_uri->stype);
+ consumer, relayd_uri->stype, session->id);
if (ret < 0) {
ret = LTTNG_ERR_ENABLE_CONSUMER_FAIL;
goto close_sock;
*/
int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
struct lttcomm_sock *sock, struct consumer_output *consumer,
- enum lttng_stream_type type)
+ enum lttng_stream_type type, unsigned int session_id)
{
int ret;
struct lttcomm_consumer_msg msg;
*/
msg.u.relayd_sock.net_index = consumer->net_seq_index;
msg.u.relayd_sock.type = type;
+ msg.u.relayd_sock.session_id = session_id;
memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd);
struct lttcomm_consumer_msg *msg);
int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
struct lttcomm_sock *sock, struct consumer_output *consumer,
- enum lttng_stream_type type);
+ enum lttng_stream_type type, unsigned int session_id);
int consumer_send_destroy_relayd(struct consumer_socket *sock,
struct consumer_output *consumer);
int consumer_recv_status_reply(struct consumer_socket *sock);
static struct lttng_ht *metadata_ht;
static struct lttng_ht *data_ht;
+/*
+ * This hash table contains the mapping between the session id of the sessiond
+ * and the relayd session id. Element of the ht are indexed by sessiond_id.
+ *
+ * Node can be added when a relayd communication is opened in the sessiond
+ * thread.
+ *
+ * Note that a session id of the session daemon is unique to a tracing session
+ * and not to a domain session. However, a domain session has one consumer
+ * which forces the 1-1 mapping between a consumer and a domain session (ex:
+ * UST). This means that we can't have duplicate in this ht.
+ */
+static struct lttng_ht *relayd_session_id_ht;
+
/*
* Notify a thread pipe to poll back again. This usually means that some global
* state has changed so we just send back the thread in a poll wait call.
{
int ret;
struct lttng_ht_iter iter;
+ struct lttng_ht_node_ulong *node;
if (relayd == NULL) {
return;
DBG("Consumer destroy and close relayd socket pair");
+ lttng_ht_lookup(relayd_session_id_ht,
+ (void *)((unsigned long) relayd->session_id), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node != NULL) {
+ /* We assume the relayd is being or is destroyed */
+ return;
+ }
+
+ ret = lttng_ht_del(relayd_session_id_ht, &iter);
+ if (ret != 0) {
+ /* We assume the relayd is being or is destroyed */
+ return;
+ }
+
iter.iter.node = &relayd->node.node;
ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
if (ret != 0) {
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ relayd_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
}
/*
*/
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
- struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
+ struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock,
+ unsigned int sessiond_id)
{
int fd = -1, ret = -1;
enum lttng_error_code ret_code = LTTNG_OK;
struct consumer_relayd_sock_pair *relayd;
+ struct consumer_relayd_session_id *relayd_id_node;
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
goto error;
}
+ /* Set up a relayd session id node. */
+ relayd_id_node = zmalloc(sizeof(struct consumer_relayd_session_id));
+ if (!relayd_id_node) {
+ PERROR("zmalloc relayd id node");
+ goto error;
+ }
+
+ relayd_id_node->relayd_id = relayd->session_id;
+ relayd_id_node->sessiond_id = (uint64_t) sessiond_id;
+
+ /* Indexed by session id of the session daemon. */
+ lttng_ht_node_init_ulong(&relayd_id_node->node,
+ relayd_id_node->sessiond_id);
+ rcu_read_lock();
+ lttng_ht_add_unique_ulong(relayd_session_id_ht, &relayd_id_node->node);
+ rcu_read_unlock();
+
break;
case LTTNG_STREAM_DATA:
/* Copy received lttcomm socket */
struct lttng_ht *stream_list_ht;
};
+/*
+ * Session id mapping structure storred in relayd_session_id_ht.
+ */
+struct consumer_relayd_session_id {
+ uint64_t sessiond_id;
+ uint64_t relayd_id;
+ struct lttng_ht_node_ulong node;
+};
+
/*
* Init consumer data structures.
*/
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
- struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock);
+ struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock,
+ unsigned int sessiond_id);
void consumer_flag_relayd_for_destroy(
struct consumer_relayd_sock_pair *relayd);
int consumer_data_pending(uint64_t id);
/* Session daemon status message are handled in the following call. */
ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
- &msg.u.relayd_sock.sock);
+ &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id);
goto end_nosignal;
}
case LTTNG_CONSUMER_ADD_CHANNEL:
enum lttng_stream_type type;
/* Open socket to the relayd */
struct lttcomm_sock sock;
+ /* Tracing session id associated to the relayd. */
+ uint64_t session_id;
} relayd_sock;
struct {
uint64_t net_seq_idx;
/* Session daemon status message are handled in the following call. */
ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
- &msg.u.relayd_sock.sock);
+ &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id);
goto end_nosignal;
}
case LTTNG_CONSUMER_ADD_CHANNEL: