consumer: remove timeout for UST metadata
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 16 Jul 2013 00:53:04 +0000 (20:53 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 16 Jul 2013 18:33:20 +0000 (14:33 -0400)
Remove time out for UST metadata by generating metadata packets whenever
they are about to be read from the ring buffer rather than filling them
when the metadata cache is updated, which requires a time out, and
therefore may fail.

Reviewed-by: Julien Desfossez <julien.desfossez@efficios.com>
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
src/common/consumer-metadata-cache.c
src/common/consumer-metadata-cache.h
src/common/consumer.c
src/common/ust-consumer/ust-consumer.c

index 4c8a665af0db7aa1d5d8224edcfac96c5fc2fbb2..173cac04976a95f77ff35e128e594324f3c2f240 100644 (file)
@@ -105,15 +105,17 @@ int consumer_metadata_cache_write(struct lttng_consumer_channel *channel,
        }
 
        if (cache->max_offset == cache->total_bytes_written) {
-               offset = cache->rb_pushed;
-               len = cache->total_bytes_written - cache->rb_pushed;
-               ret = lttng_ustconsumer_push_metadata(channel, cache->data, offset,
-                               len);
-               if (ret < 0) {
-                       ERR("Pushing metadata");
-                       goto end;
+               char dummy = 'c';
+
+               cache->contiguous = cache->max_offset;
+               if (channel->monitor) {
+                       ret = write(channel->metadata_stream->ust_metadata_poll_pipe[1],
+                                       &dummy, 1);
+                       if (ret < 1) {
+                               ERR("Wakeup UST metadata pipe");
+                               goto end;
+                       }
                }
-               cache->rb_pushed += len;
        }
 
 end:
@@ -177,11 +179,6 @@ void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel)
 
        DBG("Destroying metadata cache");
 
-       if (channel->metadata_cache->max_offset >
-                       channel->metadata_cache->rb_pushed) {
-               ERR("Destroying a cache not entirely commited");
-       }
-
        pthread_mutex_destroy(&channel->metadata_cache->lock);
        free(channel->metadata_cache->data);
        free(channel->metadata_cache);
@@ -195,14 +192,12 @@ void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel)
 int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
                uint64_t offset)
 {
-       int ret;
-       struct consumer_metadata_cache *cache;
+       int ret = 0;
+       struct lttng_consumer_stream *metadata_stream;
 
        assert(channel);
        assert(channel->metadata_cache);
 
-       cache = channel->metadata_cache;
-
        /*
         * XXX This consumer_data.lock should eventually be replaced by
         * a channel lock. It protects metadata_stream read and endpoint
@@ -212,14 +207,16 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
        pthread_mutex_lock(&channel->lock);
        pthread_mutex_lock(&channel->metadata_cache->lock);
 
-       if (cache->rb_pushed >= offset) {
-               ret = 0;
-       } else if (!channel->metadata_stream) {
+       metadata_stream = channel->metadata_stream;
+
+       if (!metadata_stream) {
                /*
                 * Having no metadata stream means the channel is being destroyed so there
                 * is no cache to flush anymore.
                 */
                ret = 0;
+       } else if (metadata_stream->ust_metadata_pushed >= offset) {
+               ret = 0;
        } else if (channel->metadata_stream->endpoint_status !=
                        CONSUMER_ENDPOINT_ACTIVE) {
                /* An inactive endpoint means we don't have to flush anymore. */
index b1a4dc2b777fad31b91a8ecf040b439f180776c3..8f485d6390feda1e399de76348aa15fdf371ef5d 100644 (file)
@@ -25,9 +25,9 @@ struct consumer_metadata_cache {
        char *data;
        uint64_t cache_alloc_size;
        /*
-        * How many bytes from the cache were already sent to the ring buffer.
+        * How many bytes from the cache are written contiguously.
         */
-       uint64_t rb_pushed;
+       uint64_t contiguous;
        /*
         * How many bytes are written in the buffer (excluding the wholes).
         */
index 94a0cc3efb1d557da732ed9a201848b0ee4dc3e4..a26a41554d2e379c2e97ad4cca20929627571468 100644 (file)
@@ -1880,6 +1880,13 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
+               if (stream->monitor) {
+                       /* close the write-side in close_metadata */
+                       ret = close(stream->ust_metadata_poll_pipe[0]);
+                       if (ret < 0) {
+                               PERROR("Close UST metadata read-side poll pipe");
+                       }
+               }
                lttng_ustconsumer_del_stream(stream);
                break;
        default:
@@ -2252,14 +2259,21 @@ restart:
                                DBG("Metadata available on fd %d", pollfd);
                                assert(stream->wait_fd == pollfd);
 
-                               len = ctx->on_buffer_ready(stream, ctx);
+                               do {
+                                       len = ctx->on_buffer_ready(stream, ctx);
+                                       /*
+                                        * We don't check the return value here since if we get
+                                        * a negative len, it means an error occured thus we
+                                        * simply remove it from the poll set and free the
+                                        * stream.
+                                        */
+                               } while (len > 0);
+
                                /* It's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                        /* Clean up stream from consumer and free it. */
                                        lttng_poll_del(&events, stream->wait_fd);
                                        consumer_del_metadata_stream(stream, metadata_ht);
-                               } else if (len > 0) {
-                                       stream->data_read = 1;
                                }
                        }
 
index c1e987435581600d79bbaadd7af72acf0f186849..6b47ec0c9b2739cb04b07808a03890eee065e2fa 100644 (file)
@@ -230,8 +230,18 @@ static int create_ust_streams(struct lttng_consumer_channel *channel,
         */
        while ((ustream = ustctl_create_stream(channel->uchan, cpu))) {
                int wait_fd;
+               int ust_metadata_pipe[2];
 
-               wait_fd = ustctl_stream_get_wait_fd(ustream);
+               if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && channel->monitor) {
+                       ret = utils_create_pipe_cloexec_nonblock(ust_metadata_pipe);
+                       if (ret < 0) {
+                               ERR("Create ust metadata poll pipe");
+                               goto error;
+                       }
+                       wait_fd = ust_metadata_pipe[0];
+               } else {
+                       wait_fd = ustctl_stream_get_wait_fd(ustream);
+               }
 
                /* Allocate consumer stream object. */
                stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret);
@@ -285,6 +295,8 @@ static int create_ust_streams(struct lttng_consumer_channel *channel,
                /* Keep stream reference when creating metadata. */
                if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
                        channel->metadata_stream = stream;
+                       stream->ust_metadata_poll_pipe[0] = ust_metadata_pipe[0];
+                       stream->ust_metadata_poll_pipe[1] = ust_metadata_pipe[1];
                }
        }
 
@@ -656,6 +668,13 @@ static int close_metadata(uint64_t chan_key)
                        ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
                        goto error_unlock;
                }
+               if (channel->monitor) {
+                       /* close the read-side in consumer_del_metadata_stream */
+                       ret = close(channel->metadata_stream->ust_metadata_poll_pipe[1]);
+                       if (ret < 0) {
+                               PERROR("Close UST metadata write-side poll pipe");
+                       }
+               }
        }
 
 error_unlock:
@@ -752,8 +771,6 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id,
                struct lttng_consumer_local_data *ctx)
 {
        int ret = 0;
-       ssize_t write_len;
-       uint64_t total_len = 0;
        struct lttng_consumer_channel *metadata_channel;
        struct lttng_consumer_stream *metadata_stream;
 
@@ -813,30 +830,13 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id,
        }
 
        pthread_mutex_lock(&metadata_channel->metadata_cache->lock);
-       while (total_len < metadata_channel->metadata_cache->total_bytes_written) {
-               /*
-                * Write at most one packet of metadata into the channel
-                * to avoid blocking here.
-                */
-               write_len = ustctl_write_one_packet_to_channel(metadata_channel->uchan,
-                               metadata_channel->metadata_cache->data,
-                               metadata_channel->metadata_cache->total_bytes_written);
-               if (write_len < 0) {
-                       ERR("UST consumer snapshot writing metadata packet");
-                       ret = -1;
-                       goto error_unlock;
-               }
-               total_len += write_len;
 
-               DBG("Written %" PRIu64 " bytes to metadata (left: %" PRIu64 ")",
-                               write_len,
-                               metadata_channel->metadata_cache->total_bytes_written - write_len);
-               ustctl_flush_buffer(metadata_stream->ustream, 1);
+       do {
                ret = lttng_consumer_read_subbuffer(metadata_stream, ctx);
                if (ret < 0) {
                        goto error_unlock;
                }
-       }
+       } while (ret > 0);
 
 error_unlock:
        pthread_mutex_unlock(&metadata_channel->metadata_cache->lock);
@@ -1623,21 +1623,50 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        ustream = stream->ustream;
 
        /* We can consume the 1 byte written into the wait_fd by UST */
-       if (!stream->hangup_flush_done) {
+       if (stream->monitor && !stream->hangup_flush_done) {
                ssize_t readlen;
 
                do {
                        readlen = read(stream->wait_fd, &dummy, 1);
                } while (readlen == -1 && errno == EINTR);
-               if (readlen == -1) {
+               if (readlen == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
                        ret = readlen;
                        goto end;
                }
        }
 
+retry:
        /* Get the next subbuffer */
        err = ustctl_get_next_subbuf(ustream);
        if (err != 0) {
+               /*
+                * Populate metadata info if the existing info has
+                * already been read.
+                */
+               if (stream->metadata_flag) {
+                       ssize_t write_len;
+
+                       if (stream->chan->metadata_cache->contiguous
+                                       == stream->ust_metadata_pushed) {
+                               ret = 0;
+                               goto end;
+                       }
+
+                       write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
+                                       &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
+                                       stream->chan->metadata_cache->contiguous
+                                               - stream->ust_metadata_pushed);
+                       assert(write_len != 0);
+                       if (write_len < 0) {
+                               ERR("Writing one metadata packet");
+                               ret = -1;
+                               goto end;
+                       }
+                       stream->ust_metadata_pushed += write_len;
+                       ustctl_flush_buffer(stream->ustream, 1);
+                       goto retry;
+               }
+
                ret = err;      /* ustctl_get_next_subbuf returns negative, caller expect positive. */
                /*
                 * This is a debug message even for single-threaded consumer,
@@ -1739,13 +1768,37 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
                goto end;
        }
 
-       ret = ustctl_get_next_subbuf(stream->ustream);
-       if (ret == 0) {
-               /* There is still data so let's put back this subbuffer. */
-               ret = ustctl_put_subbuf(stream->ustream);
-               assert(ret == 0);
-               ret = 1;  /* Data is pending */
-               goto end;
+       if (stream->chan->type == CONSUMER_CHANNEL_TYPE_METADATA) {
+               /*
+                * We can simply check whether all contiguously available data
+                * has been pushed to the ring buffer, since the push operation
+                * is performed within get_next_subbuf(), and because both
+                * get_next_subbuf() and put_next_subbuf() are issued atomically
+                * thanks to the stream lock within
+                * lttng_ustconsumer_read_subbuffer(). This basically means that
+                * whetnever ust_metadata_pushed is incremented, the associated
+                * metadata has been consumed from the metadata stream.
+                */
+               DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64,
+                       stream->chan->metadata_cache->contiguous,
+                       stream->ust_metadata_pushed);
+               if (stream->chan->metadata_cache->contiguous
+                               != stream->ust_metadata_pushed) {
+                       ret = 1;        /* Data is pending */
+                       goto end;
+               }
+       } else {
+               ret = ustctl_get_next_subbuf(stream->ustream);
+               if (ret == 0) {
+                       /*
+                        * There is still data so let's put back this
+                        * subbuffer.
+                        */
+                       ret = ustctl_put_subbuf(stream->ustream);
+                       assert(ret == 0);
+                       ret = 1;        /* Data is pending */
+                       goto end;
+               }
        }
 
        /* Data is NOT pending so ready to be read. */
This page took 0.032298 seconds and 4 git commands to generate.