+
+ health_code_update();
+
+ /*
+ * Send status code to session daemon only if the recv works. If the
+ * above recv() failed, the session daemon is notified through the
+ * error socket and the teardown is eventually done.
+ */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error_add_stream_nosignal;
+ }
+
+ health_code_update();
+
+ pthread_mutex_lock(&channel->lock);
+ new_stream = consumer_stream_create(
+ channel,
+ channel->key,
+ fd,
+ channel->name,
+ channel->relayd_id,
+ channel->session_id,
+ channel->trace_chunk,
+ msg.u.stream.cpu,
+ &alloc_ret,
+ channel->type,
+ channel->monitor);
+ if (new_stream == NULL) {
+ switch (alloc_ret) {
+ case -ENOMEM:
+ case -EINVAL:
+ default:
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
+ break;
+ }
+ pthread_mutex_unlock(&channel->lock);
+ goto error_add_stream_nosignal;
+ }
+
+ new_stream->wait_fd = fd;
+ ret = kernctl_get_max_subbuf_size(new_stream->wait_fd,
+ &new_stream->max_sb_size);
+ if (ret < 0) {
+ pthread_mutex_unlock(&channel->lock);
+ ERR("Failed to get kernel maximal subbuffer size");
+ goto error_add_stream_nosignal;
+ }
+
+ consumer_stream_update_channel_attributes(new_stream,
+ channel);
+
+ /*
+ * We've just assigned the channel to the stream so increment the
+ * refcount right now. We don't need to increment the refcount for
+ * streams in no monitor because we handle manually the cleanup of
+ * those. It is very important to make sure there is NO prior
+ * consumer_del_stream() calls or else the refcount will be unbalanced.
+ */
+ if (channel->monitor) {
+ uatomic_inc(&new_stream->chan->refcount);
+ }
+
+ /*
+ * The buffer flush is done on the session daemon side for the kernel
+ * so no need for the stream "hangup_flush_done" variable to be
+ * tracked. This is important for a kernel stream since we don't rely
+ * on the flush state of the stream to read data. It's not the case for
+ * user space tracing.
+ */
+ new_stream->hangup_flush_done = 0;
+
+ health_code_update();
+
+ pthread_mutex_lock(&new_stream->lock);
+ if (ctx->on_recv_stream) {
+ ret = ctx->on_recv_stream(new_stream);
+ if (ret < 0) {
+ pthread_mutex_unlock(&new_stream->lock);
+ pthread_mutex_unlock(&channel->lock);
+ consumer_stream_free(new_stream);
+ goto error_add_stream_nosignal;
+ }
+ }
+ health_code_update();
+
+ if (new_stream->metadata_flag) {
+ channel->metadata_stream = new_stream;
+ }
+
+ /* Do not monitor this stream. */
+ if (!channel->monitor) {
+ DBG("Kernel consumer add stream %s in no monitor mode with "
+ "relayd id %" PRIu64, new_stream->name,
+ new_stream->net_seq_idx);
+ cds_list_add(&new_stream->send_node, &channel->streams.head);
+ pthread_mutex_unlock(&new_stream->lock);
+ pthread_mutex_unlock(&channel->lock);
+ goto end_add_stream;
+ }
+
+ /* Send stream to relayd if the stream has an ID. */
+ if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
+ ret = consumer_send_relayd_stream(new_stream,
+ new_stream->chan->pathname);
+ if (ret < 0) {
+ pthread_mutex_unlock(&new_stream->lock);
+ pthread_mutex_unlock(&channel->lock);
+ consumer_stream_free(new_stream);
+ goto error_add_stream_nosignal;
+ }
+
+ /*
+ * If adding an extra stream to an already
+ * existing channel (e.g. cpu hotplug), we need
+ * to send the "streams_sent" command to relayd.
+ */
+ if (channel->streams_sent_to_relayd) {
+ ret = consumer_send_relayd_streams_sent(
+ new_stream->net_seq_idx);
+ if (ret < 0) {
+ pthread_mutex_unlock(&new_stream->lock);
+ pthread_mutex_unlock(&channel->lock);
+ goto error_add_stream_nosignal;
+ }
+ }
+ }
+ pthread_mutex_unlock(&new_stream->lock);
+ pthread_mutex_unlock(&channel->lock);
+
+ /* Get the right pipe where the stream will be sent. */
+ if (new_stream->metadata_flag) {
+ consumer_add_metadata_stream(new_stream);
+ stream_pipe = ctx->consumer_metadata_pipe;
+ } else {
+ consumer_add_data_stream(new_stream);
+ stream_pipe = ctx->consumer_data_pipe;
+ }
+
+ /* Visible to other threads */
+ new_stream->globally_visible = 1;
+
+ health_code_update();
+
+ ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
+ if (ret < 0) {
+ ERR("Consumer write %s stream to pipe %d",
+ new_stream->metadata_flag ? "metadata" : "data",
+ lttng_pipe_get_writefd(stream_pipe));
+ if (new_stream->metadata_flag) {
+ consumer_del_stream_for_metadata(new_stream);
+ } else {
+ consumer_del_stream_for_data(new_stream);
+ }
+ goto error_add_stream_nosignal;
+ }
+
+ DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64,
+ new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id);
+end_add_stream:
+ break;
+error_add_stream_nosignal:
+ goto end_nosignal;
+error_add_stream_fatal:
+ goto error_fatal;