call_rcu(&stream->node.head, free_stream_rcu);
}
+/*
+ * XXX naming of del vs destroy is all mixed up.
+ */
+void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
+{
+ consumer_del_stream(stream, data_ht);
+}
+
+void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
+{
+ consumer_del_stream(stream, metadata_ht);
+}
+
struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
uint64_t stream_key,
enum lttng_consumer_stream_state state,
/*
* Add a stream to the global list protected by a mutex.
*/
-static int add_stream(struct lttng_consumer_stream *stream,
- struct lttng_ht *ht)
+int consumer_add_data_stream(struct lttng_consumer_stream *stream)
{
+ struct lttng_ht *ht = data_ht;
int ret = 0;
struct consumer_relayd_sock_pair *relayd;
return ret;
}
+void consumer_del_data_stream(struct lttng_consumer_stream *stream)
+{
+ consumer_del_stream(stream, data_ht);
+}
+
/*
* Add relayd socket to global consumer data hashtable. RCU read side lock MUST
* be acquired before calling this.
* Action done with the metadata stream when adding it to the consumer internal
* data structures to handle it.
*/
-static int add_metadata_stream(struct lttng_consumer_stream *stream,
- struct lttng_ht *ht)
+int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
{
+ struct lttng_ht *ht = metadata_ht;
int ret = 0;
struct consumer_relayd_sock_pair *relayd;
struct lttng_ht_iter iter;
DBG("Adding metadata stream %d to poll set",
stream->wait_fd);
- ret = add_metadata_stream(stream, metadata_ht);
- if (ret) {
- ERR("Unable to add metadata stream");
- /* Stream was not setup properly. Continuing. */
- consumer_del_metadata_stream(stream, NULL);
- continue;
- }
-
/* Add metadata stream to the global poll events list */
lttng_poll_add(&events, stream->wait_fd,
LPOLLIN | LPOLLPRI);
continue;
}
- ret = add_stream(new_stream, data_ht);
- if (ret) {
- ERR("Consumer add stream %" PRIu64 " failed. Continuing",
- new_stream->key);
- /*
- * At this point, if the add_stream fails, it is not in the
- * hash table thus passing the NULL value here.
- */
- consumer_del_stream(new_stream, NULL);
- }
-
/* Continue to update the local streams and handle prio ones */
continue;
}
/* Get the right pipe where the stream will be sent. */
if (new_stream->metadata_flag) {
+ ret = consumer_add_metadata_stream(new_stream);
+ if (ret) {
+ ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing",
+ new_stream->key);
+ consumer_del_stream(new_stream, NULL);
+ goto end_nosignal;
+ }
stream_pipe = ctx->consumer_metadata_pipe;
} else {
+ ret = consumer_add_data_stream(new_stream);
+ if (ret) {
+ ERR("Consumer add stream %" PRIu64 " failed. Continuing",
+ new_stream->key);
+ consumer_del_stream(new_stream, NULL);
+ goto end_nosignal;
+ }
stream_pipe = ctx->consumer_data_pipe;
}
ERR("Consumer write %s stream to pipe %d",
new_stream->metadata_flag ? "metadata" : "data",
lttng_pipe_get_writefd(stream_pipe));
- consumer_del_stream(new_stream, NULL);
+ if (new_stream->metadata_flag) {
+ consumer_del_stream_for_metadata(new_stream);
+ } else {
+ consumer_del_stream_for_data(new_stream);
+ }
goto end_nosignal;
}
/* Get the right pipe where the stream will be sent. */
if (stream->metadata_flag) {
+ ret = consumer_add_metadata_stream(stream);
+ if (ret) {
+ ERR("Consumer add metadata stream %" PRIu64 " failed.",
+ stream->key);
+ goto error;
+ }
stream_pipe = ctx->consumer_metadata_pipe;
} else {
+ ret = consumer_add_data_stream(stream);
+ if (ret) {
+ ERR("Consumer add stream %" PRIu64 " failed.",
+ stream->key);
+ goto error;
+ }
stream_pipe = ctx->consumer_data_pipe;
}
ERR("Consumer write %s stream to pipe %d",
stream->metadata_flag ? "metadata" : "data",
lttng_pipe_get_writefd(stream_pipe));
+ if (stream->metadata_flag) {
+ consumer_del_stream_for_metadata(stream);
+ } else {
+ consumer_del_stream_for_data(stream);
+ }
}
-
+error:
return ret;
}
* If we are unable to send the stream to the thread, there is
* a big problem so just stop everything.
*/
+ /* Remove node from the channel stream list. */
+ cds_list_del(&stream->send_node);
goto error;
}