consumer_stream_destroy(stream, ht);
}
+/*
+ * XXX naming of del vs destroy is all mixed up.
+ */
+void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
+{
+ consumer_stream_destroy(stream, data_ht);
+}
+
+void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
+{
+ consumer_stream_destroy(stream, metadata_ht);
+}
+
struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
uint64_t stream_key,
enum lttng_consumer_stream_state state,
/*
* Add a stream to the global list protected by a mutex.
*/
-static int add_stream(struct lttng_consumer_stream *stream,
- struct lttng_ht *ht)
+int consumer_add_data_stream(struct lttng_consumer_stream *stream)
{
+ struct lttng_ht *ht = data_ht;
int ret = 0;
assert(stream);
return ret;
}
+void consumer_del_data_stream(struct lttng_consumer_stream *stream)
+{
+ consumer_del_stream(stream, data_ht);
+}
+
/*
* Add relayd socket to global consumer data hashtable. RCU read side lock MUST
* be acquired before calling this.
* Action done with the metadata stream when adding it to the consumer internal
* data structures to handle it.
*/
-static int add_metadata_stream(struct lttng_consumer_stream *stream,
- struct lttng_ht *ht)
+int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
{
+ struct lttng_ht *ht = metadata_ht;
int ret = 0;
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
DBG("Adding metadata stream %d to poll set",
stream->wait_fd);
- ret = add_metadata_stream(stream, metadata_ht);
- if (ret) {
- ERR("Unable to add metadata stream");
- /* Stream was not setup properly. Continuing. */
- consumer_del_metadata_stream(stream, NULL);
- continue;
- }
-
/* Add metadata stream to the global poll events list */
lttng_poll_add(&events, stream->wait_fd,
LPOLLIN | LPOLLPRI);
continue;
}
- ret = add_stream(new_stream, data_ht);
- if (ret) {
- ERR("Consumer add stream %" PRIu64 " failed. Continuing",
- new_stream->key);
- /*
- * At this point, if the add_stream fails, it is not in the
- * hash table thus passing the NULL value here.
- */
- consumer_del_stream(new_stream, NULL);
- }
-
/* Continue to update the local streams and handle prio ones */
continue;
}
void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd);
unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
unsigned long produced_pos, uint64_t max_stream_size);
+int consumer_add_data_stream(struct lttng_consumer_stream *stream);
+void consumer_del_stream_for_data(struct lttng_consumer_stream *stream);
+int consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
+void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
#endif /* LIB_CONSUMER_H */
/* Get the right pipe where the stream will be sent. */
if (new_stream->metadata_flag) {
+ ret = consumer_add_metadata_stream(new_stream);
+ if (ret) {
+ ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing",
+ new_stream->key);
+ consumer_stream_free(new_stream);
+ goto end_nosignal;
+ }
stream_pipe = ctx->consumer_metadata_pipe;
} else {
+ ret = consumer_add_data_stream(new_stream);
+ if (ret) {
+ ERR("Consumer add stream %" PRIu64 " failed. Continuing",
+ new_stream->key);
+ consumer_stream_free(new_stream);
+ goto end_nosignal;
+ }
stream_pipe = ctx->consumer_data_pipe;
}
+ /* Vitible to other threads */
+ new_stream->globally_visible = 1;
+
ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
if (ret < 0) {
ERR("Consumer write %s stream to pipe %d",
new_stream->metadata_flag ? "metadata" : "data",
lttng_pipe_get_writefd(stream_pipe));
- consumer_stream_free(new_stream);
+ if (new_stream->metadata_flag) {
+ consumer_del_stream_for_metadata(new_stream);
+ } else {
+ consumer_del_stream_for_data(new_stream);
+ }
goto end_nosignal;
}
- /* Successfully sent to the right thread. */
- new_stream->globally_visible = 1;
DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64,
new_stream->name, fd, new_stream->relayd_stream_id);
/* Get the right pipe where the stream will be sent. */
if (stream->metadata_flag) {
+ ret = consumer_add_metadata_stream(stream);
+ if (ret) {
+ ERR("Consumer add metadata stream %" PRIu64 " failed.",
+ stream->key);
+ goto error;
+ }
stream_pipe = ctx->consumer_metadata_pipe;
} else {
+ ret = consumer_add_data_stream(stream);
+ if (ret) {
+ ERR("Consumer add stream %" PRIu64 " failed.",
+ stream->key);
+ goto error;
+ }
stream_pipe = ctx->consumer_data_pipe;
}
+ /*
+ * From this point on, the stream's ownership has been moved away from
+ * the channel and becomes globally visible.
+ */
+ stream->globally_visible = 1;
+
ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
if (ret < 0) {
ERR("Consumer write %s stream to pipe %d",
stream->metadata_flag ? "metadata" : "data",
lttng_pipe_get_writefd(stream_pipe));
+ if (stream->metadata_flag) {
+ consumer_del_stream_for_metadata(stream);
+ } else {
+ consumer_del_stream_for_data(stream);
+ }
}
-
+error:
return ret;
}
* If we are unable to send the stream to the thread, there is
* a big problem so just stop everything.
*/
+ /* Remove node from the channel stream list. */
+ cds_list_del(&stream->send_node);
goto error;
}
/* Remove node from the channel stream list. */
cds_list_del(&stream->send_node);
- /*
- * From this point on, the stream's ownership has been moved away from
- * the channel and becomes globally visible.
- */
- stream->globally_visible = 1;
}
error: