health_code_update();
pthread_mutex_lock(&stream->lock);
+
+ /*
+ * Protect against concurrent teardown of a stream.
+ */
+ if (cds_lfht_is_node_deleted(&stream->node.node)) {
+ goto next;
+ }
+
if (!stream->quiescent) {
ustctl_flush_buffer(stream->ustream, 0);
stream->quiescent = true;
}
+next:
pthread_mutex_unlock(&stream->lock);
}
error:
{
int ret = 0;
struct lttng_consumer_channel *channel;
+ unsigned int channel_monitor;
DBG("UST consumer close metadata key %" PRIu64, chan_key);
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&channel->lock);
-
+ channel_monitor = channel->monitor;
if (cds_lfht_is_node_deleted(&channel->node.node)) {
goto error_unlock;
}
lttng_ustconsumer_close_metadata(channel);
+ pthread_mutex_unlock(&channel->lock);
+ pthread_mutex_unlock(&consumer_data.lock);
+ /*
+ * The ownership of a metadata channel depends on the type of
+ * session to which it belongs. In effect, the monitor flag is checked
+ * to determine if this metadata channel is in "snapshot" mode or not.
+ *
+ * In the non-snapshot case, the metadata channel is created along with
+ * a single stream which will remain present until the metadata channel
+ * is destroyed (on the destruction of its session). In this case, the
+ * metadata stream in "monitored" by the metadata poll thread and holds
+ * the ownership of its channel.
+ *
+ * Closing the metadata will cause the metadata stream's "metadata poll
+ * pipe" to be closed. Closing this pipe will wake-up the metadata poll
+ * thread which will teardown the metadata stream which, in return,
+ * deletes the metadata channel.
+ *
+ * In the snapshot case, the metadata stream is created and destroyed
+ * on every snapshot record. Since the channel doesn't have an owner
+ * other than the session daemon, it is safe to destroy it immediately
+ * on reception of the CLOSE_METADATA command.
+ */
+ if (!channel_monitor) {
+ /*
+ * The channel and consumer_data locks must be
+ * released before this call since consumer_del_channel
+ * re-acquires the channel and consumer_data locks to teardown
+ * the channel and queue its reclamation by the "call_rcu"
+ * worker thread.
+ */
+ consumer_del_channel(channel);
+ }
+
+ return ret;
error_unlock:
pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
DBG("UST consumer snapshot stream %s/%s (%" PRIu64 ")", path,
stream->name, stream->key);
}
- if (relayd_id != -1ULL) {
- ret = consumer_send_relayd_streams_sent(relayd_id);
- if (ret < 0) {
- goto error_unlock;
- }
- }
/*
* If tracing is active, we want to perform a "full" buffer flush.
produced_pos, nb_packets_per_stream,
stream->max_sb_size);
- while (consumed_pos < produced_pos) {
+ while ((long) (consumed_pos - produced_pos) < 0) {
ssize_t read_len;
unsigned long len, padded_len;
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
/* Session daemon status message are handled in the following call. */
- ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
+ consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
&msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
msg.u.relayd_sock.relayd_session_id);
struct ustctl_consumer_stream *ustream)
{
int ret;
+ uint64_t packet_size, content_size, timestamp_begin, timestamp_end,
+ events_discarded, stream_id, stream_instance_id,
+ packet_seq_num;
- ret = ustctl_get_timestamp_begin(ustream, &index->timestamp_begin);
+ ret = ustctl_get_timestamp_begin(ustream, ×tamp_begin);
if (ret < 0) {
PERROR("ustctl_get_timestamp_begin");
goto error;
}
- index->timestamp_begin = htobe64(index->timestamp_begin);
- ret = ustctl_get_timestamp_end(ustream, &index->timestamp_end);
+ ret = ustctl_get_timestamp_end(ustream, ×tamp_end);
if (ret < 0) {
PERROR("ustctl_get_timestamp_end");
goto error;
}
- index->timestamp_end = htobe64(index->timestamp_end);
- ret = ustctl_get_events_discarded(ustream, &index->events_discarded);
+ ret = ustctl_get_events_discarded(ustream, &events_discarded);
if (ret < 0) {
PERROR("ustctl_get_events_discarded");
goto error;
}
- index->events_discarded = htobe64(index->events_discarded);
- ret = ustctl_get_content_size(ustream, &index->content_size);
+ ret = ustctl_get_content_size(ustream, &content_size);
if (ret < 0) {
PERROR("ustctl_get_content_size");
goto error;
}
- index->content_size = htobe64(index->content_size);
- ret = ustctl_get_packet_size(ustream, &index->packet_size);
+ ret = ustctl_get_packet_size(ustream, &packet_size);
if (ret < 0) {
PERROR("ustctl_get_packet_size");
goto error;
}
- index->packet_size = htobe64(index->packet_size);
- ret = ustctl_get_stream_id(ustream, &index->stream_id);
+ ret = ustctl_get_stream_id(ustream, &stream_id);
if (ret < 0) {
PERROR("ustctl_get_stream_id");
goto error;
}
- index->stream_id = htobe64(index->stream_id);
- ret = ustctl_get_instance_id(ustream, &index->stream_instance_id);
+ ret = ustctl_get_instance_id(ustream, &stream_instance_id);
if (ret < 0) {
PERROR("ustctl_get_instance_id");
goto error;
}
- index->stream_instance_id = htobe64(index->stream_instance_id);
- ret = ustctl_get_sequence_number(ustream, &index->packet_seq_num);
+ ret = ustctl_get_sequence_number(ustream, &packet_seq_num);
if (ret < 0) {
PERROR("ustctl_get_sequence_number");
goto error;
}
- index->packet_seq_num = htobe64(index->packet_seq_num);
+
+ *index = (typeof(*index)) {
+ .offset = index->offset,
+ .packet_size = htobe64(packet_size),
+ .content_size = htobe64(content_size),
+ .timestamp_begin = htobe64(timestamp_begin),
+ .timestamp_end = htobe64(timestamp_end),
+ .events_discarded = htobe64(events_discarded),
+ .stream_id = htobe64(stream_id),
+ .stream_instance_id = htobe64(stream_instance_id),
+ .packet_seq_num = htobe64(packet_seq_num),
+ };
error:
return ret;
stream->ust_metadata_pushed);
ret = write_len;
+ /*
+ * Switch packet (but don't open the next one) on every commit of
+ * a metadata packet. Since the subbuffer is fully filled (with padding,
+ * if needed), the stream is "quiescent" after this commit.
+ */
+ ustctl_flush_buffer(stream->ustream, 1);
+ stream->quiescent = true;
end:
pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
return ret;
retry = 1;
}
- ustctl_flush_buffer(metadata->ustream, 1);
ret = ustctl_snapshot(metadata->ustream);
if (ret < 0) {
if (errno != EAGAIN) {
if (ret <= 0) {
goto end;
}
- ustctl_flush_buffer(stream->ustream, 1);
goto retry;
}
request.key = channel->key;
DBG("Sending metadata request to sessiond, session id %" PRIu64
- ", per-pid %" PRIu64 ", app UID %u and channek key %" PRIu64,
+ ", per-pid %" PRIu64 ", app UID %u and channel key %" PRIu64,
request.session_id, request.session_id_per_pid, request.uid,
request.key);