Fix: sessiond vs consumerd push/get metadata deadlock
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 19 Aug 2015 21:44:59 +0000 (14:44 -0700)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 8 Sep 2015 13:10:39 +0000 (09:10 -0400)
We need to unlock the registry while we push metadata to break a
circular dependency between the consumerd metadata lock and the sessiond
registry lock. Indeed, pushing metadata to the consumerd awaits that it
gets pushed all the way to relayd, but doing so requires grabbing the
metadata lock. If a concurrent metadata request is being performed by
consumerd, this can try to grab the registry lock on the sessiond while
holding the metadata lock on the consumer daemon. Those push and pull
schemes are performed on two different bidirectionnal communication
sockets.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-sessiond/ust-app.c
src/common/consumer-metadata-cache.c
src/common/consumer-metadata-cache.h
src/common/consumer-timer.c
src/common/consumer-timer.h
src/common/consumer.c
src/common/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c

index 96ba2f4bcad0649afe1cfcec93e2f6c1f13002df..fc4b7085ec5770e57550c169924e112bb794dd49 100644 (file)
@@ -440,17 +440,20 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry,
 {
        int ret;
        char *metadata_str = NULL;
-       size_t len, offset;
+       size_t len, offset, new_metadata_len_sent;
        ssize_t ret_val;
+       uint64_t metadata_key;
 
        assert(registry);
        assert(socket);
 
+       metadata_key = registry->metadata_key;
+
        /*
         * Means that no metadata was assigned to the session. This can
         * happens if no start has been done previously.
         */
-       if (!registry->metadata_key) {
+       if (!metadata_key) {
                return 0;
        }
 
@@ -468,6 +471,7 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry,
 
        offset = registry->metadata_len_sent;
        len = registry->metadata_len - registry->metadata_len_sent;
+       new_metadata_len_sent = registry->metadata_len;
        if (len == 0) {
                DBG3("No metadata to push for metadata key %" PRIu64,
                                registry->metadata_key);
@@ -486,13 +490,26 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry,
                ret_val = -ENOMEM;
                goto error;
        }
-       /* Copy what we haven't send out. */
+       /* Copy what we haven't sent out. */
        memcpy(metadata_str, registry->metadata + offset, len);
-       registry->metadata_len_sent += len;
 
 push_data:
-       ret = consumer_push_metadata(socket, registry->metadata_key,
+       pthread_mutex_unlock(&registry->lock);
+       /*
+        * We need to unlock the registry while we push metadata to
+        * break a circular dependency between the consumerd metadata
+        * lock and the sessiond registry lock. Indeed, pushing metadata
+        * to the consumerd awaits that it gets pushed all the way to
+        * relayd, but doing so requires grabbing the metadata lock. If
+        * a concurrent metadata request is being performed by
+        * consumerd, this can try to grab the registry lock on the
+        * sessiond while holding the metadata lock on the consumer
+        * daemon. Those push and pull schemes are performed on two
+        * different bidirectionnal communication sockets.
+        */
+       ret = consumer_push_metadata(socket, metadata_key,
                        metadata_str, len, offset);
+       pthread_mutex_lock(&registry->lock);
        if (ret < 0) {
                /*
                 * There is an acceptable race here between the registry
@@ -510,17 +527,29 @@ push_data:
                 */
                if (ret == -LTTCOMM_CONSUMERD_CHANNEL_FAIL) {
                        ret = 0;
+               } else {
+                       ERR("Error pushing metadata to consumer");
                }
-
-               /*
-                * Update back the actual metadata len sent since it
-                * failed here.
-                */
-               registry->metadata_len_sent -= len;
                ret_val = ret;
                goto error_push;
+       } else {
+               /*
+                * Metadata may have been concurrently pushed, since
+                * we're not holding the registry lock while pushing to
+                * consumer.  This is handled by the fact that we send
+                * the metadata content, size, and the offset at which
+                * that metadata belongs. This may arrive out of order
+                * on the consumer side, and the consumer is able to
+                * deal with overlapping fragments. The consumer
+                * supports overlapping fragments, which must be
+                * contiguous starting from offset 0. We keep the
+                * largest metadata_len_sent value of the concurrent
+                * send.
+                */
+               registry->metadata_len_sent =
+                       max_t(size_t, registry->metadata_len_sent,
+                               new_metadata_len_sent);
        }
-
        free(metadata_str);
        return len;
 
index 9cd99e5bf4d29f090baeb0c8326ce10ba880f2c7..6774692316a6dc4fe1b3e1a0fdf33d09fbc0c960 100644 (file)
@@ -73,8 +73,8 @@ end:
 
 /*
  * Write metadata to the cache, extend the cache if necessary. We support
- * non-contiguous updates but not overlapping ones. If there is contiguous
- * metadata in the cache, we send it to the ring buffer. The metadata cache
+ * overlapping updates, but they need to be contiguous. Send the
+ * contiguous metadata in cache to the ring buffer. The metadata cache
  * lock MUST be acquired to write in the cache.
  *
  * Return 0 on success, a negative value on error.
@@ -102,15 +102,10 @@ int consumer_metadata_cache_write(struct lttng_consumer_channel *channel,
        }
 
        memcpy(cache->data + offset, data, len);
-       cache->total_bytes_written += len;
        if (offset + len > cache->max_offset) {
-               cache->max_offset = offset + len;
-       }
-
-       if (cache->max_offset == cache->total_bytes_written) {
                char dummy = 'c';
 
-               cache->contiguous = cache->max_offset;
+               cache->max_offset = offset + len;
                if (channel->monitor) {
                        size_ret = lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1],
                                        &dummy, 1);
index aaf9f24d2a5a978b4c0024d290be86910a1ae0df..e7aba4ac923778fea3594ce9161390bd1286450a 100644 (file)
 struct consumer_metadata_cache {
        char *data;
        uint64_t cache_alloc_size;
-       /*
-        * How many bytes from the cache are written contiguously.
-        */
-       uint64_t contiguous;
-       /*
-        * How many bytes are written in the buffer (excluding the wholes).
-        */
-       uint64_t total_bytes_written;
        /*
         * The upper-limit of data written inside the buffer.
         *
         * With the total_bytes_written it allows us to keep track of when the
         * cache contains contiguous metadata ready to be sent to the RB.
-        * The metadata cache updates must not overlap.
+        * All cached data is contiguous.
         */
        uint64_t max_offset;
        /*
index 646d32342cdfa0a7a7552332dd3f5da2c90cc680..8bf3ae80c2c4f4f35bf91780057f11f5f6a7b786 100644 (file)
@@ -133,78 +133,103 @@ error:
        return ret;
 }
 
-static int check_kernel_stream(struct lttng_consumer_stream *stream)
+int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
 {
        uint64_t ts, stream_id;
        int ret;
 
-       /*
-        * While holding the stream mutex, try to take a snapshot, if it
-        * succeeds, it means that data is ready to be sent, just let the data
-        * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
-        * means that there is no data to read after the flush, so we can
-        * safely send the empty index.
-        */
-       pthread_mutex_lock(&stream->lock);
        ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
        if (ret < 0) {
                ERR("Failed to get the current timestamp");
-               goto error_unlock;
+               goto end;
        }
        ret = kernctl_buffer_flush(stream->wait_fd);
        if (ret < 0) {
                ERR("Failed to flush kernel stream");
-               goto error_unlock;
+               goto end;
        }
        ret = kernctl_snapshot(stream->wait_fd);
        if (ret < 0) {
                if (errno != EAGAIN && errno != ENODATA) {
                        PERROR("live timer kernel snapshot");
                        ret = -1;
-                       goto error_unlock;
+                       goto end;
                }
                ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
                if (ret < 0) {
                        PERROR("kernctl_get_stream_id");
-                       goto error_unlock;
+                       goto end;
                }
                DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
                ret = send_empty_index(stream, ts, stream_id);
                if (ret < 0) {
-                       goto error_unlock;
+                       goto end;
                }
        }
        ret = 0;
-
-error_unlock:
-       pthread_mutex_unlock(&stream->lock);
+end:
        return ret;
 }
 
-static int check_ust_stream(struct lttng_consumer_stream *stream)
+static int check_kernel_stream(struct lttng_consumer_stream *stream)
 {
-       uint64_t ts, stream_id;
        int ret;
 
-       assert(stream);
-       assert(stream->ustream);
        /*
         * While holding the stream mutex, try to take a snapshot, if it
         * succeeds, it means that data is ready to be sent, just let the data
         * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
         * means that there is no data to read after the flush, so we can
         * safely send the empty index.
+        *
+        * Doing a trylock and checking if waiting on metadata if
+        * trylock fails. Bail out of the stream is indeed waiting for
+        * metadata to be pushed. Busy wait on trylock otherwise.
         */
-       pthread_mutex_lock(&stream->lock);
+       for (;;) {
+               ret = pthread_mutex_trylock(&stream->lock);
+               switch (ret) {
+               case 0:
+                       break;  /* We have the lock. */
+               case EBUSY:
+                       pthread_mutex_lock(&stream->metadata_timer_lock);
+                       if (stream->waiting_on_metadata) {
+                               ret = 0;
+                               stream->missed_metadata_flush = true;
+                               pthread_mutex_unlock(&stream->metadata_timer_lock);
+                               goto end;       /* Bail out. */
+                       }
+                       pthread_mutex_unlock(&stream->metadata_timer_lock);
+                       /* Try again. */
+                       caa_cpu_relax();
+                       continue;
+               default:
+                       ERR("Unexpected pthread_mutex_trylock error %d", ret);
+                       ret = -1;
+                       goto end;
+               }
+               break;
+       }
+       ret = consumer_flush_kernel_index(stream);
+       pthread_mutex_unlock(&stream->lock);
+end:
+       return ret;
+}
+
+int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
+{
+       uint64_t ts, stream_id;
+       int ret;
+
        ret = cds_lfht_is_node_deleted(&stream->node.node);
        if (ret) {
-               goto error_unlock;
+               goto end;
        }
 
        ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
        if (ret < 0) {
                ERR("Failed to get the current timestamp");
-               goto error_unlock;
+               goto end;
        }
        lttng_ustconsumer_flush_buffer(stream, 1);
        ret = lttng_ustconsumer_take_snapshot(stream);
@@ -212,23 +237,68 @@ static int check_ust_stream(struct lttng_consumer_stream *stream)
                if (ret != -EAGAIN) {
                        ERR("Taking UST snapshot");
                        ret = -1;
-                       goto error_unlock;
+                       goto end;
                }
                ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
                if (ret < 0) {
                        PERROR("ustctl_get_stream_id");
-                       goto error_unlock;
+                       goto end;
                }
                DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
                ret = send_empty_index(stream, ts, stream_id);
                if (ret < 0) {
-                       goto error_unlock;
+                       goto end;
                }
        }
        ret = 0;
+end:
+       return ret;
+}
 
-error_unlock:
+static int check_ust_stream(struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       assert(stream);
+       assert(stream->ustream);
+       /*
+        * While holding the stream mutex, try to take a snapshot, if it
+        * succeeds, it means that data is ready to be sent, just let the data
+        * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
+        * means that there is no data to read after the flush, so we can
+        * safely send the empty index.
+        *
+        * Doing a trylock and checking if waiting on metadata if
+        * trylock fails. Bail out of the stream is indeed waiting for
+        * metadata to be pushed. Busy wait on trylock otherwise.
+        */
+       for (;;) {
+               ret = pthread_mutex_trylock(&stream->lock);
+               switch (ret) {
+               case 0:
+                       break;  /* We have the lock. */
+               case EBUSY:
+                       pthread_mutex_lock(&stream->metadata_timer_lock);
+                       if (stream->waiting_on_metadata) {
+                               ret = 0;
+                               stream->missed_metadata_flush = true;
+                               pthread_mutex_unlock(&stream->metadata_timer_lock);
+                               goto end;       /* Bail out. */
+                       }
+                       pthread_mutex_unlock(&stream->metadata_timer_lock);
+                       /* Try again. */
+                       caa_cpu_relax();
+                       continue;
+               default:
+                       ERR("Unexpected pthread_mutex_trylock error %d", ret);
+                       ret = -1;
+                       goto end;
+               }
+               break;
+       }
+       ret = consumer_flush_ust_index(stream);
        pthread_mutex_unlock(&stream->lock);
+end:
        return ret;
 }
 
index baaa82b04993a1ee1173ccb5fc5952555b1cb95c..22e74574ce5622d7ec7e242b83865fa766b1f639 100644 (file)
@@ -52,4 +52,7 @@ void consumer_timer_live_stop(struct lttng_consumer_channel *channel);
 void *consumer_timer_thread(void *data);
 int consumer_signal_init(void);
 
+int consumer_flush_kernel_index(struct lttng_consumer_stream *stream);
+int consumer_flush_ust_index(struct lttng_consumer_stream *stream);
+
 #endif /* CONSUMER_TIMER_H */
index effa5f86f38cfa67247fdbd17a214c34f42c5703..526fbbf35c157f2bf545ef53b68dc0e45ef17365 100644 (file)
@@ -563,6 +563,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
        stream->index_fd = -1;
        pthread_mutex_init(&stream->lock, NULL);
+       pthread_mutex_init(&stream->metadata_timer_lock, NULL);
 
        /* If channel is the metadata, flag this stream as metadata. */
        if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
index 509e24e01af7bb391ba2ddaeb207b90688be6313..ac3b4903f6b8c3e945432b35a471100a8998aede 100644 (file)
@@ -242,6 +242,21 @@ struct lttng_consumer_stream {
        int shm_fd_is_copy;
        int data_read;
        int hangup_flush_done;
+
+       /*
+        * metadata_timer_lock protects flags waiting_on_metadata and
+        * missed_metadata_flush.
+        */
+       pthread_mutex_t metadata_timer_lock;
+       /*
+        * Flag set when awaiting metadata to be pushed. Used in the
+        * timer thread to skip waiting on the stream (and stream lock) to
+        * ensure we can proceed to flushing metadata in live mode.
+        */
+       bool waiting_on_metadata;
+       /* Raised when a timer misses a metadata flush. */
+       bool missed_metadata_flush;
+
        enum lttng_event_output output;
        /* Maximum subbuffer size. */
        unsigned long max_sb_size;
index e30d21b1ac59e77437ed983e39b5a8ada2e17dff..aae56f90cf8e0765f6b95d65a375f06a5f1891fe 100644 (file)
@@ -1219,7 +1219,22 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                /*
                 * In live, block until all the metadata is sent.
                 */
+               pthread_mutex_lock(&stream->metadata_timer_lock);
+               assert(!stream->missed_metadata_flush);
+               stream->waiting_on_metadata = true;
+               pthread_mutex_unlock(&stream->metadata_timer_lock);
+
                err = consumer_stream_sync_metadata(ctx, stream->session_id);
+
+               pthread_mutex_lock(&stream->metadata_timer_lock);
+               stream->waiting_on_metadata = false;
+               if (stream->missed_metadata_flush) {
+                       stream->missed_metadata_flush = false;
+                       pthread_mutex_unlock(&stream->metadata_timer_lock);
+                       (void) consumer_flush_kernel_index(stream);
+               } else {
+                       pthread_mutex_unlock(&stream->metadata_timer_lock);
+               }
                if (err < 0) {
                        goto end;
                }
index 819817d149922d5b2a6b02eb42230a3e35f21fe6..7dfcf9a3cf46381675222dfbcb94a474e0e8eaa9 100644 (file)
@@ -1167,7 +1167,12 @@ error:
 }
 
 /*
- * Receive the metadata updates from the sessiond.
+ * Receive the metadata updates from the sessiond. Supports receiving
+ * overlapping metadata, but is needs to always belong to a contiguous
+ * range starting from 0.
+ * Be careful about the locks held when calling this function: it needs
+ * the metadata cache flush to concurrently progress in order to
+ * complete.
  */
 int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                uint64_t len, struct lttng_consumer_channel *channel,
@@ -1581,6 +1586,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                health_code_update();
 
+               if (!len) {
+                       /*
+                        * There is nothing to receive. We have simply
+                        * checked whether the channel can be found.
+                        */
+                       ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+                       goto end_msg_sessiond;
+               }
+
                /* Tell session daemon we are ready to receive the metadata. */
                ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
                if (ret < 0) {
@@ -1942,7 +1956,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
        int ret;
 
        pthread_mutex_lock(&stream->chan->metadata_cache->lock);
-       if (stream->chan->metadata_cache->contiguous
+       if (stream->chan->metadata_cache->max_offset
                        == stream->ust_metadata_pushed) {
                ret = 0;
                goto end;
@@ -1950,7 +1964,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
 
        write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
                        &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
-                       stream->chan->metadata_cache->contiguous
+                       stream->chan->metadata_cache->max_offset
                        - stream->ust_metadata_pushed);
        assert(write_len != 0);
        if (write_len < 0) {
@@ -1960,7 +1974,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
        }
        stream->ust_metadata_pushed += write_len;
 
-       assert(stream->chan->metadata_cache->contiguous >=
+       assert(stream->chan->metadata_cache->max_offset >=
                        stream->ust_metadata_pushed);
        ret = write_len;
 
@@ -1974,7 +1988,9 @@ end:
  * Sync metadata meaning request them to the session daemon and snapshot to the
  * metadata thread can consumer them.
  *
- * Metadata stream lock MUST be acquired.
+ * Metadata stream lock is held here, but we need to release it when
+ * interacting with sessiond, else we cause a deadlock with live
+ * awaiting on metadata to be pushed out.
  *
  * Return 0 if new metadatda is available, EAGAIN if the metadata stream
  * is empty or a negative value on error.
@@ -1988,6 +2004,7 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
        assert(ctx);
        assert(metadata);
 
+       pthread_mutex_unlock(&metadata->lock);
        /*
         * Request metadata from the sessiond, but don't wait for the flush
         * because we locked the metadata thread.
@@ -1996,6 +2013,7 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
        if (ret < 0) {
                goto end;
        }
+       pthread_mutex_lock(&metadata->lock);
 
        ret = commit_one_metadata_packet(metadata);
        if (ret <= 0) {
@@ -2222,7 +2240,23 @@ retry:
                /*
                 * In live, block until all the metadata is sent.
                 */
+               pthread_mutex_lock(&stream->metadata_timer_lock);
+               assert(!stream->missed_metadata_flush);
+               stream->waiting_on_metadata = true;
+               pthread_mutex_unlock(&stream->metadata_timer_lock);
+
                err = consumer_stream_sync_metadata(ctx, stream->session_id);
+
+               pthread_mutex_lock(&stream->metadata_timer_lock);
+               stream->waiting_on_metadata = false;
+               if (stream->missed_metadata_flush) {
+                       stream->missed_metadata_flush = false;
+                       pthread_mutex_unlock(&stream->metadata_timer_lock);
+                       (void) consumer_flush_ust_index(stream);
+               } else {
+                       pthread_mutex_unlock(&stream->metadata_timer_lock);
+               }
+
                if (err < 0) {
                        goto end;
                }
@@ -2303,7 +2337,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
                uint64_t contiguous, pushed;
 
                /* Ease our life a bit. */
-               contiguous = stream->chan->metadata_cache->contiguous;
+               contiguous = stream->chan->metadata_cache->max_offset;
                pushed = stream->ust_metadata_pushed;
 
                /*
@@ -2432,6 +2466,10 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
  * function or any of its callees. Timers have a very strict locking
  * semantic with respect to teardown. Failure to respect this semantic
  * introduces deadlocks.
+ *
+ * DON'T hold the metadata lock when calling this function, else this
+ * can cause deadlock involving consumer awaiting for metadata to be
+ * pushed out due to concurrent interaction with the session daemon.
  */
 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_channel *channel, int timer, int wait)
This page took 0.035476 seconds and 4 git commands to generate.