assert(stream);
assert(relayd);
- uatomic_dec(&relayd->refcount);
- assert(uatomic_read(&relayd->refcount) >= 0);
+ if (stream->sent_to_relayd) {
+ uatomic_dec(&relayd->refcount);
+ assert(uatomic_read(&relayd->refcount) >= 0);
+ }
/* Closing streams requires to lock the control socket. */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
consumer_destroy_relayd(relayd);
}
stream->net_seq_idx = (uint64_t) -1ULL;
+ stream->sent_to_relayd = 0;
}
/*
struct lttng_ht *ht)
{
int ret = 0;
- struct consumer_relayd_sock_pair *relayd;
assert(stream);
assert(ht);
*/
lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
- /* Check and cleanup relayd */
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd != NULL) {
- uatomic_inc(&relayd->refcount);
- }
-
/*
* When nb_init_stream_left reaches 0, we don't need to trigger any action
* in terms of destroying the associated channel, because the action that
goto end;
}
uatomic_inc(&relayd->refcount);
+ stream->sent_to_relayd = 1;
} else {
ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
stream->key, stream->net_seq_idx);
struct lttng_ht *ht)
{
int ret = 0;
- struct consumer_relayd_sock_pair *relayd;
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
node = lttng_ht_iter_get_node_u64(&iter);
assert(!node);
- /* Find relayd and, if one is found, increment refcount. */
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd != NULL) {
- uatomic_inc(&relayd->refcount);
- }
-
/*
* When nb_init_stream_left reaches 0, we don't need to trigger any action
* in terms of destroying the associated channel, because the action that
gid_t gid;
/* Network sequence number. Indicating on which relayd socket it goes. */
uint64_t net_seq_idx;
+ /*
+ * Indicate if this stream was successfully sent to a relayd. This is set
+ * after the refcount of the relayd is incremented and is checked when the
+ * stream is closed before decrementing the refcount in order to avoid an
+ * unbalanced state.
+ */
+ unsigned int sent_to_relayd;
+
/* Identify if the stream is the metadata */
unsigned int metadata_flag;
/* Used when the stream is set for network streaming */
return ret;
}
-/*
- * Search for a relayd object related to the stream. If found, send the stream
- * to the relayd.
- *
- * On success, returns 0 else a negative value.
- */
-static int send_stream_to_relayd(struct lttng_consumer_stream *stream)
-{
- int ret = 0;
- struct consumer_relayd_sock_pair *relayd;
-
- assert(stream);
-
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd != NULL) {
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- /* Add stream on the relayd */
- ret = relayd_add_stream(&relayd->control_sock, stream->name,
- stream->chan->pathname, &stream->relayd_stream_id,
- stream->chan->tracefile_size,
- stream->chan->tracefile_count);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret < 0) {
- goto error;
- }
- } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
- ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
- stream->net_seq_idx);
- ret = -1;
- goto error;
- }
-
-error:
- return ret;
-}
-
/*
* Create streams for the given channel using liblttng-ust-ctl.
*
cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
/* Try to send the stream to the relayd if one is available. */
- ret = send_stream_to_relayd(stream);
+ ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
if (ret < 0) {
/*
* Flag that the relayd was the problem here probably due to a
}
/* Send metadata stream to relayd if needed. */
- ret = send_stream_to_relayd(metadata->metadata_stream);
+ ret = consumer_send_relayd_stream(metadata->metadata_stream,
+ metadata->pathname);
if (ret < 0) {
ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
goto error;