*
* Returns 0 on success, < 0 on error
*/
-static
-int send_relayd_stream(struct lttng_consumer_stream *stream, char *path)
+static int send_relayd_stream(struct lttng_consumer_stream *stream,
+ char *path)
{
- struct consumer_relayd_sock_pair *relayd;
int ret = 0;
- char *stream_path;
+ const char *stream_path;
+ struct consumer_relayd_sock_pair *relayd;
+
+ assert(stream);
+ assert(stream->net_seq_idx != -1ULL);
if (path != NULL) {
stream_path = path;
} else {
stream_path = stream->chan->pathname;
}
+
/* The stream is not metadata. Get relayd reference if exists. */
rcu_read_lock();
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != NULL) {
/* Add stream on the relayd */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_add_stream(&relayd->control_sock,
- stream->name, stream_path,
- &stream->relayd_stream_id,
- stream->chan->tracefile_size,
- stream->chan->tracefile_count);
+ ret = relayd_add_stream(&relayd->control_sock, stream->name,
+ stream_path, &stream->relayd_stream_id,
+ stream->chan->tracefile_size, stream->chan->tracefile_count);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
goto end;
}
uatomic_inc(&relayd->refcount);
- } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
- ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
- stream->net_seq_idx);
+ } else {
+ ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
+ stream->key, stream->net_seq_idx);
ret = -1;
goto end;
}
+ DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
+ stream->name, stream->key, stream->net_seq_idx);
+
end:
rcu_read_unlock();
return ret;
/*
* Find a relayd and close the stream
*/
-static
-void close_relayd_stream(struct lttng_consumer_stream *stream)
+static void close_relayd_stream(struct lttng_consumer_stream *stream)
{
struct consumer_relayd_sock_pair *relayd;
/* The stream is not metadata. Get relayd reference if exists. */
rcu_read_lock();
relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd != NULL) {
+ if (relayd) {
consumer_stream_relayd_close(stream, relayd);
}
rcu_read_unlock();
ERR("sending stream to relayd");
goto end_unlock;
}
- DBG("Stream %s sent to the relayd", stream->name);
} else {
ret = utils_create_stream_file(path, stream->name,
stream->chan->tracefile_size, stream->tracefile_count_current,