/* Data path for network streaming. */
struct lttng_uri data;
+
+ /* Flag if network sockets were sent to the consumer. */
+ unsigned int relayd_socks_sent;
};
/*
/*
* The net_seq_index is the index of the network stream on the consumer
- * side. It's basically the relayd socket file descriptor value so the
- * consumer can identify which streams goes with which socket.
+ * side. It tells the consumer which streams goes to which relayd with this
+ * index. The relayd sockets are index with it on the consumer side.
*/
int net_seq_index;
* lock. This means that there CAN NOT be stream(s) being sent to a
* consumer since this action also requires the session lock at any time.
*
- * At this point, we are sure that not streams data will be lost after this
- * command is issued.
+ * At this point, we are sure that no data will be lost after this command
+ * is issued.
*/
if (usess->consumer && usess->consumer->type == CONSUMER_DST_NET) {
cds_lfht_for_each_entry(usess->consumer->socks->ht, &iter.iter, socket,
{
int ret;
+ assert(session);
+ assert(consumer);
+
+ /* Don't resend the sockets to the consumer. */
+ if (consumer->dst.net.relayd_socks_sent) {
+ ret = LTTCOMM_OK;
+ goto error;
+ }
+
/* Sending control relayd socket. */
ret = send_socket_relayd_consumer(domain, session,
&consumer->dst.net.control, consumer, fd);
goto error;
}
+ /* Flag that all relayd sockets were sent to the consumer. */
+ consumer->dst.net.relayd_socks_sent = 1;
+
error:
return ret;
}
session_error:
session_destroy(session);
error:
+ rcu_read_lock();
consumer_destroy_output(consumer);
+ rcu_read_unlock();
consumer_error:
return ret;
}
case LTTNG_DOMAIN_KERNEL:
/* Code flow error if we don't have a kernel session here. */
assert(ksess);
- assert(ksess->consumer);
/* Create consumer output if none exists */
consumer = ksess->tmp_consumer;
goto error;
}
- /* Don't send relayd socket if URI is NOT remote */
- if (uris[i].dtype == LTTNG_DST_PATH) {
+ /*
+ * Don't send relayd socket if URI is NOT remote or if the relayd
+ * sockets for the session are already sent.
+ */
+ if (uris[i].dtype == LTTNG_DST_PATH ||
+ consumer->dst.net.relayd_socks_sent) {
continue;
}
* session without this lock hence freeing the consumer output object
* is valid.
*/
+ rcu_read_lock();
consumer_destroy_output(ksess->consumer);
+ rcu_read_unlock();
ksess->consumer = consumer;
ksess->tmp_consumer = NULL;
* session without this lock hence freeing the consumer output object
* is valid.
*/
+ rcu_read_lock();
consumer_destroy_output(usess->consumer);
+ rcu_read_unlock();
usess->consumer = consumer;
usess->tmp_consumer = NULL;
destroy_domain_pid(session->domain_pid);
destroy_domain_exec(session->domain_exec);
+ consumer_destroy_output(session->consumer);
+ consumer_destroy_output(session->tmp_consumer);
+
free(session);
rcu_read_unlock();
uatomic_dec(&relayd->refcount);
assert(uatomic_read(&relayd->refcount) >= 0);
+ /* Closing streams requires to lock the control socket. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_send_close_stream(&relayd->control_sock,
stream->relayd_stream_id,
stream->next_net_seq_num - 1);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
DBG("Unable to close stream on the relayd. Continuing");
/*
}
/*
- * Add relayd socket to global consumer data hashtable.
+ * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
+ * be acquired before calling this.
*/
+
int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
{
int ret = 0;
goto end;
}
- rcu_read_lock();
-
lttng_ht_lookup(consumer_data.relayd_ht,
(void *)((unsigned long) relayd->net_seq_idx), &iter);
node = lttng_ht_iter_get_node_ulong(&iter);
if (node != NULL) {
- rcu_read_unlock();
/* Relayd already exist. Ignore the insertion */
goto end;
}
lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
- rcu_read_unlock();
-
end:
return ret;
}
/* Poll on consumer socket. */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ rcu_read_unlock();
return -EINTR;
}
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ rcu_read_unlock();
return -EINTR;
}
ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
if (ret != sizeof(fd)) {
lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+ rcu_read_unlock();
return ret;
}
msg.u.stream.metadata_flag);
if (new_stream == NULL) {
lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
- goto end;
+ goto end_nosignal;
}
/* The stream is not metadata. Get relayd reference if exists. */
&new_stream->relayd_stream_id);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
- goto end;
+ goto end_nosignal;
}
} else if (msg.u.stream.net_index != -1) {
ERR("Network sequence index %d unknown. Not adding stream.",
msg.u.stream.net_index);
free(new_stream);
- goto end;
+ goto end_nosignal;
}
if (ctx->on_recv_stream != NULL) {
if (ret == 0) {
consumer_add_stream(new_stream);
} else if (ret < 0) {
- goto end;
+ goto end_nosignal;
}
} else {
consumer_add_stream(new_stream);
}
case LTTNG_CONSUMER_UPDATE_STREAM:
{
- if (ctx->on_update_stream != NULL) {
- ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
- if (ret == 0) {
- consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state);
- } else if (ret < 0) {
- goto end;
- }
- } else {
- consumer_change_stream_state(msg.u.stream.stream_key,
- msg.u.stream.state);
+ rcu_read_unlock();
+ return -ENOSYS;
+ }
+ case LTTNG_CONSUMER_DESTROY_RELAYD:
+ {
+ struct consumer_relayd_sock_pair *relayd;
+
+ DBG("Kernel consumer destroying relayd %zu",
+ msg.u.destroy_relayd.net_seq_idx);
+
+ /* Get relayd reference if exists. */
+ relayd = consumer_find_relayd(msg.u.destroy_relayd.net_seq_idx);
+ if (relayd == NULL) {
+ ERR("Unable to find relayd %zu", msg.u.destroy_relayd.net_seq_idx);
+ goto end_nosignal;
}
- break;
+
+ /* Set destroy flag for this object */
+ uatomic_set(&relayd->destroy_flag, 1);
+
+ /* Destroy the relayd if refcount is 0 else set the destroy flag. */
+ if (uatomic_read(&relayd->refcount) == 0) {
+ consumer_destroy_relayd(relayd);
+ }
+ goto end_nosignal;
}
default:
- break;
+ goto end_nosignal;
}
-end:
+
/*
- * Wake-up the other end by writing a null byte in the pipe
- * (non-blocking). Important note: Because writing into the
- * pipe is non-blocking (and therefore we allow dropping wakeup
- * data, as long as there is wakeup data present in the pipe
- * buffer to wake up the other end), the other end should
- * perform the following sequence for waiting:
+ * Wake-up the other end by writing a null byte in the pipe (non-blocking).
+ * Important note: Because writing into the pipe is non-blocking (and
+ * therefore we allow dropping wakeup data, as long as there is wakeup data
+ * present in the pipe buffer to wake up the other end), the other end
+ * should perform the following sequence for waiting:
+ *
* 1) empty the pipe (reads).
* 2) perform update operation.
* 3) wait on the pipe (poll).
return -ENOENT;
}
- /* relayd need RCU read-side lock */
+ /* relayd needs RCU read-side lock */
rcu_read_lock();
switch (msg.cmd_type) {
/* Poll on consumer socket. */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ rcu_read_unlock();
return -EINTR;
}
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ rcu_read_unlock();
return -EINTR;
}
ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd);
if (ret != sizeof(fds)) {
lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+ rcu_read_unlock();
return ret;
}
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ rcu_read_unlock();
return -EINTR;
}
ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd);
if (ret != sizeof(fds)) {
lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+ rcu_read_unlock();
return ret;
}
msg.u.stream.metadata_flag);
if (new_stream == NULL) {
lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
- goto end;
+ goto end_nosignal;
}
/* The stream is not metadata. Get relayd reference if exists. */
&new_stream->relayd_stream_id);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
- goto end;
+ goto end_nosignal;
}
} else if (msg.u.stream.net_index != -1) {
ERR("Network sequence index %d unknown. Not adding stream.",
msg.u.stream.net_index);
free(new_stream);
- goto end;
+ goto end_nosignal;
}
if (ctx->on_recv_stream != NULL) {
if (ret == 0) {
consumer_add_stream(new_stream);
} else if (ret < 0) {
- goto end;
+ goto end_nosignal;
}
} else {
consumer_add_stream(new_stream);
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(msg.u.destroy_relayd.net_seq_idx);
if (relayd == NULL) {
- ERR("Unable to find relayd %zu",
- msg.u.destroy_relayd.net_seq_idx);
+ ERR("Unable to find relayd %zu", msg.u.destroy_relayd.net_seq_idx);
+ goto end_nosignal;
}
/* Set destroy flag for this object */
if (uatomic_read(&relayd->refcount) == 0) {
consumer_destroy_relayd(relayd);
}
- break;
+ goto end_nosignal;
}
case LTTNG_CONSUMER_UPDATE_STREAM:
{
+ rcu_read_unlock();
return -ENOSYS;
#if 0
if (ctx->on_update_stream != NULL) {
consumer_change_stream_state(msg.u.stream.stream_key,
msg.u.stream.state);
}
-#endif
break;
+#endif
}
default:
break;
}
-end:
+
/*
- * Wake-up the other end by writing a null byte in the pipe
- * (non-blocking). Important note: Because writing into the
- * pipe is non-blocking (and therefore we allow dropping wakeup
- * data, as long as there is wakeup data present in the pipe
- * buffer to wake up the other end), the other end should
- * perform the following sequence for waiting:
+ * Wake-up the other end by writing a null byte in the pipe (non-blocking).
+ * Important note: Because writing into the pipe is non-blocking (and
+ * therefore we allow dropping wakeup data, as long as there is wakeup data
+ * present in the pipe buffer to wake up the other end), the other end
+ * should perform the following sequence for waiting:
+ *
* 1) empty the pipe (reads).
* 2) perform update operation.
* 3) wait on the pipe (poll).