+ metadata_stream_reset_cache(stream, cache);
+
+end:
+ return ret;
+}
+
+/*
+ * Write up to one packet from the metadata cache to the channel.
+ *
+ * Returns the number of bytes pushed in the cache, or a negative value
+ * on error.
+ */
+static
+int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
+{
+ ssize_t write_len;
+ int ret;
+
+ pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+ ret = metadata_stream_check_version(stream);
+ if (ret < 0) {
+ goto end;
+ }
+ if (stream->chan->metadata_cache->max_offset
+ == stream->ust_metadata_pushed) {
+ ret = 0;
+ goto end;
+ }
+
+ write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
+ &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
+ stream->chan->metadata_cache->max_offset
+ - stream->ust_metadata_pushed);
+ assert(write_len != 0);
+ if (write_len < 0) {
+ ERR("Writing one metadata packet");
+ ret = -1;
+ goto end;
+ }
+ stream->ust_metadata_pushed += write_len;
+
+ assert(stream->chan->metadata_cache->max_offset >=
+ stream->ust_metadata_pushed);
+ ret = write_len;
+
+end:
+ pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
+ return ret;
+}
+
+
+/*
+ * Sync metadata meaning request them to the session daemon and snapshot to the
+ * metadata thread can consumer them.
+ *
+ * Metadata stream lock is held here, but we need to release it when
+ * interacting with sessiond, else we cause a deadlock with live
+ * awaiting on metadata to be pushed out.
+ *
+ * Return 0 if new metadatda is available, EAGAIN if the metadata stream
+ * is empty or a negative value on error.
+ */
+int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *metadata)
+{
+ int ret;
+ int retry = 0;
+
+ assert(ctx);
+ assert(metadata);
+
+ pthread_mutex_unlock(&metadata->lock);
+ /*
+ * Request metadata from the sessiond, but don't wait for the flush
+ * because we locked the metadata thread.
+ */
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata->chan, 0, 0);
+ pthread_mutex_lock(&metadata->lock);
+ if (ret < 0) {
+ goto end;
+ }
+
+ ret = commit_one_metadata_packet(metadata);
+ if (ret <= 0) {
+ goto end;
+ } else if (ret > 0) {
+ retry = 1;
+ }
+
+ ustctl_flush_buffer(metadata->ustream, 1);
+ ret = ustctl_snapshot(metadata->ustream);
+ if (ret < 0) {
+ if (errno != EAGAIN) {
+ ERR("Sync metadata, taking UST snapshot");
+ goto end;
+ }
+ DBG("No new metadata when syncing them.");
+ /* No new metadata, exit. */
+ ret = ENODATA;
+ goto end;
+ }
+
+ /*
+ * After this flush, we still need to extract metadata.
+ */
+ if (retry) {
+ ret = EAGAIN;
+ }
+
+end:
+ return ret;
+}
+
+/*
+ * Return 0 on success else a negative value.
+ */
+static int notify_if_more_data(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+ struct ustctl_consumer_stream *ustream;
+
+ assert(stream);
+ assert(ctx);
+
+ ustream = stream->ustream;
+
+ /*
+ * First, we are going to check if there is a new subbuffer available
+ * before reading the stream wait_fd.
+ */
+ /* Get the next subbuffer */
+ ret = ustctl_get_next_subbuf(ustream);
+ if (ret) {
+ /* No more data found, flag the stream. */
+ stream->has_data = 0;
+ ret = 0;
+ goto end;
+ }
+
+ ret = ustctl_put_subbuf(ustream);
+ assert(!ret);
+
+ /* This stream still has data. Flag it and wake up the data thread. */
+ stream->has_data = 1;
+
+ if (stream->monitor && !stream->hangup_flush_done && !ctx->has_wakeup) {
+ ssize_t writelen;
+
+ writelen = lttng_pipe_write(ctx->consumer_wakeup_pipe, "!", 1);
+ if (writelen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ ret = writelen;
+ goto end;
+ }
+
+ /* The wake up pipe has been notified. */
+ ctx->has_wakeup = 1;
+ }
+ ret = 0;
+
+end:
+ return ret;
+}
+
+static
+int update_stream_stats(struct lttng_consumer_stream *stream)
+{
+ int ret;
+ uint64_t seq, discarded;
+
+ ret = ustctl_get_sequence_number(stream->ustream, &seq);
+ if (ret < 0) {
+ PERROR("ustctl_get_sequence_number");
+ goto end;
+ }
+ /*
+ * Start the sequence when we extract the first packet in case we don't
+ * start at 0 (for example if a consumer is not connected to the
+ * session immediately after the beginning).
+ */
+ if (stream->last_sequence_number == -1ULL) {
+ stream->last_sequence_number = seq;
+ } else if (seq > stream->last_sequence_number) {
+ stream->chan->lost_packets += seq -
+ stream->last_sequence_number - 1;
+ } else {
+ /* seq <= last_sequence_number */
+ ERR("Sequence number inconsistent : prev = %" PRIu64
+ ", current = %" PRIu64,
+ stream->last_sequence_number, seq);
+ ret = -1;
+ goto end;
+ }
+ stream->last_sequence_number = seq;
+
+ ret = ustctl_get_events_discarded(stream->ustream, &discarded);
+ if (ret < 0) {
+ PERROR("kernctl_get_events_discarded");
+ goto end;
+ }
+ if (discarded < stream->last_discarded_events) {
+ /*
+ * Overflow has occurred. We assume only one wrap-around
+ * has occurred.
+ */
+ stream->chan->discarded_events +=
+ (1ULL << (CAA_BITS_PER_LONG - 1)) -
+ stream->last_discarded_events + discarded;
+ } else {
+ stream->chan->discarded_events += discarded -
+ stream->last_discarded_events;
+ }
+ stream->last_discarded_events = discarded;
+ ret = 0;
+
+end:
+ return ret;
+}
+
+/*
+ * Read subbuffer from the given stream.
+ *
+ * Stream lock MUST be acquired.
+ *
+ * Return 0 on success else a negative value.
+ */
+int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ unsigned long len, subbuf_size, padding;
+ int err, write_index = 1;
+ long ret = 0;
+ struct ustctl_consumer_stream *ustream;
+ struct ctf_packet_index index;
+
+ assert(stream);
+ assert(stream->ustream);
+ assert(ctx);
+
+ DBG("In UST read_subbuffer (wait_fd: %d, name: %s)", stream->wait_fd,
+ stream->name);
+
+ /* Ease our life for what's next. */
+ ustream = stream->ustream;
+
+ /*
+ * We can consume the 1 byte written into the wait_fd by UST. Don't trigger
+ * error if we cannot read this one byte (read returns 0), or if the error
+ * is EAGAIN or EWOULDBLOCK.
+ *
+ * This is only done when the stream is monitored by a thread, before the
+ * flush is done after a hangup and if the stream is not flagged with data
+ * since there might be nothing to consume in the wait fd but still have
+ * data available flagged by the consumer wake up pipe.
+ */
+ if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) {
+ char dummy;
+ ssize_t readlen;
+
+ readlen = lttng_read(stream->wait_fd, &dummy, 1);
+ if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ ret = readlen;
+ goto end;
+ }
+ }
+
+retry:
+ /* Get the next subbuffer */
+ err = ustctl_get_next_subbuf(ustream);
+ if (err != 0) {
+ /*
+ * Populate metadata info if the existing info has
+ * already been read.
+ */
+ if (stream->metadata_flag) {
+ ret = commit_one_metadata_packet(stream);
+ if (ret <= 0) {
+ goto end;
+ }
+ ustctl_flush_buffer(stream->ustream, 1);
+ goto retry;
+ }
+
+ ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
+ /*
+ * This is a debug message even for single-threaded consumer,
+ * because poll() have more relaxed criterions than get subbuf,
+ * so get_subbuf may fail for short race windows where poll()
+ * would issue wakeups.
+ */
+ DBG("Reserving sub buffer failed (everything is normal, "
+ "it is due to concurrency) [ret: %d]", err);
+ goto end;
+ }
+ assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
+
+ if (!stream->metadata_flag) {
+ index.offset = htobe64(stream->out_fd_offset);
+ ret = get_index_values(&index, ustream);
+ if (ret < 0) {
+ err = ustctl_put_subbuf(ustream);
+ assert(err == 0);
+ goto end;
+ }
+
+ /* Update the stream's sequence and discarded events count. */
+ ret = update_stream_stats(stream);
+ if (ret < 0) {
+ PERROR("kernctl_get_events_discarded");
+ err = ustctl_put_subbuf(ustream);
+ assert(err == 0);
+ goto end;
+ }
+ } else {
+ write_index = 0;
+ }
+
+ /* Get the full padded subbuffer size */
+ err = ustctl_get_padded_subbuf_size(ustream, &len);
+ assert(err == 0);
+
+ /* Get subbuffer data size (without padding) */
+ err = ustctl_get_subbuf_size(ustream, &subbuf_size);
+ assert(err == 0);
+
+ /* Make sure we don't get a subbuffer size bigger than the padded */
+ assert(len >= subbuf_size);
+
+ padding = len - subbuf_size;
+ /* write the subbuffer to the tracefile */
+ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index);
+ /*
+ * The mmap operation should write subbuf_size amount of data when network
+ * streaming or the full padding (len) size when we are _not_ streaming.
+ */
+ if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
+ (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
+ /*
+ * Display the error but continue processing to try to release the
+ * subbuffer. This is a DBG statement since any unexpected kill or
+ * signal, the application gets unregistered, relayd gets closed or
+ * anything that affects the buffer lifetime will trigger this error.
+ * So, for the sake of the user, don't print this error since it can
+ * happen and it is OK with the code flow.
+ */
+ DBG("Error writing to tracefile "
+ "(ret: %ld != len: %lu != subbuf_size: %lu)",
+ ret, len, subbuf_size);
+ write_index = 0;
+ }
+ err = ustctl_put_next_subbuf(ustream);
+ assert(err == 0);
+
+ /*
+ * This will consumer the byte on the wait_fd if and only if there is not
+ * next subbuffer to be acquired.
+ */
+ if (!stream->metadata_flag) {
+ ret = notify_if_more_data(stream, ctx);
+ if (ret < 0) {
+ goto end;
+ }
+ }
+
+ /* Write index if needed. */
+ if (!write_index) {
+ goto end;
+ }
+
+ if (stream->chan->live_timer_interval && !stream->metadata_flag) {
+ /*
+ * In live, block until all the metadata is sent.
+ */
+ pthread_mutex_lock(&stream->metadata_timer_lock);
+ assert(!stream->missed_metadata_flush);
+ stream->waiting_on_metadata = true;
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+
+ err = consumer_stream_sync_metadata(ctx, stream->session_id);
+
+ pthread_mutex_lock(&stream->metadata_timer_lock);
+ stream->waiting_on_metadata = false;
+ if (stream->missed_metadata_flush) {
+ stream->missed_metadata_flush = false;
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ (void) consumer_flush_ust_index(stream);
+ } else {
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ }
+
+ if (err < 0) {
+ goto end;
+ }
+ }
+
+ assert(!stream->metadata_flag);
+ err = consumer_stream_write_index(stream, &index);
+ if (err < 0) {
+ goto end;
+ }
+
+end:
+ return ret;
+}
+
+/*
+ * Called when a stream is created.
+ *
+ * Return 0 on success or else a negative value.
+ */
+int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ assert(stream);
+
+ /* Don't create anything if this is set for streaming. */
+ if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) {
+ ret = utils_create_stream_file(stream->chan->pathname, stream->name,
+ stream->chan->tracefile_size, stream->tracefile_count_current,
+ stream->uid, stream->gid, NULL);
+ if (ret < 0) {
+ goto error;
+ }
+ stream->out_fd = ret;
+ stream->tracefile_size_current = 0;
+
+ if (!stream->metadata_flag) {
+ struct lttng_index_file *index_file;
+
+ index_file = lttng_index_file_create(stream->chan->pathname,
+ stream->name, stream->uid, stream->gid,
+ stream->chan->tracefile_size,
+ stream->tracefile_count_current,
+ CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+ if (!index_file) {
+ goto error;
+ }
+ assert(!stream->index_file);
+ stream->index_file = index_file;
+ }
+ }
+ ret = 0;
+
+error:
+ return ret;
+}
+
+/*
+ * Check if data is still being extracted from the buffers for a specific
+ * stream. Consumer data lock MUST be acquired before calling this function
+ * and the stream lock.
+ *
+ * Return 1 if the traced data are still getting read else 0 meaning that the
+ * data is available for trace viewer reading.
+ */
+int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ assert(stream);
+ assert(stream->ustream);
+
+ DBG("UST consumer checking data pending");
+
+ if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
+ ret = 0;
+ goto end;
+ }
+
+ if (stream->chan->type == CONSUMER_CHANNEL_TYPE_METADATA) {
+ uint64_t contiguous, pushed;
+
+ /* Ease our life a bit. */
+ contiguous = stream->chan->metadata_cache->max_offset;
+ pushed = stream->ust_metadata_pushed;
+
+ /*
+ * We can simply check whether all contiguously available data
+ * has been pushed to the ring buffer, since the push operation
+ * is performed within get_next_subbuf(), and because both
+ * get_next_subbuf() and put_next_subbuf() are issued atomically
+ * thanks to the stream lock within
+ * lttng_ustconsumer_read_subbuffer(). This basically means that
+ * whetnever ust_metadata_pushed is incremented, the associated
+ * metadata has been consumed from the metadata stream.
+ */
+ DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64,
+ contiguous, pushed);
+ assert(((int64_t) (contiguous - pushed)) >= 0);
+ if ((contiguous != pushed) ||
+ (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) {
+ ret = 1; /* Data is pending */
+ goto end;
+ }
+ } else {
+ ret = ustctl_get_next_subbuf(stream->ustream);
+ if (ret == 0) {
+ /*
+ * There is still data so let's put back this
+ * subbuffer.
+ */
+ ret = ustctl_put_subbuf(stream->ustream);
+ assert(ret == 0);
+ ret = 1; /* Data is pending */
+ goto end;
+ }
+ }