* Return 0 if everything has been flushed, 1 if there is data not flushed.
*/
int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
- uint64_t offset)
+ uint64_t offset, int timer)
{
int ret = 0;
struct lttng_consumer_stream *metadata_stream;
assert(channel->metadata_cache);
/*
- * XXX This consumer_data.lock should eventually be replaced by
- * a channel lock. It protects metadata_stream read and endpoint
- * status check.
+ * If not called from a timer handler, we have to take the
+ * channel lock to be mutually exclusive with channel teardown.
+ * Timer handler does not need to take this lock because it is
+ * already synchronized by timer stop (and, more importantly,
+ * taking this lock in a timer handler would cause a deadlock).
*/
- pthread_mutex_lock(&consumer_data.lock);
- pthread_mutex_lock(&channel->lock);
+ if (!timer) {
+ pthread_mutex_lock(&channel->lock);
+ }
pthread_mutex_lock(&channel->timer_lock);
pthread_mutex_lock(&channel->metadata_cache->lock);
pthread_mutex_unlock(&channel->metadata_cache->lock);
pthread_mutex_unlock(&channel->timer_lock);
- pthread_mutex_unlock(&channel->lock);
- pthread_mutex_unlock(&consumer_data.lock);
+ if (!timer) {
+ pthread_mutex_unlock(&channel->lock);
+ }
return ret;
}
int consumer_metadata_cache_allocate(struct lttng_consumer_channel *channel);
void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel);
int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
- uint64_t offset);
+ uint64_t offset, int timer);
#endif /* CONSUMER_METADATA_CACHE_H */
if (stream->globally_visible) {
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->chan->lock);
- pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
/* Remove every reference of the stream in the consumer. */
consumer_stream_delete(stream, ht);
consumer_data.need_update = 1;
pthread_mutex_unlock(&stream->lock);
- pthread_mutex_unlock(&stream->chan->timer_lock);
pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
} else {
* - Calling lttng_ustconsumer_recv_metadata():
* - channel->metadata_cache->lock
* - Calling consumer_metadata_cache_flushed():
- * - consumer_data.lock
- * - channel->lock
- * - channel->metadata_cache->lock
+ * - channel->timer_lock
+ * - channel->metadata_cache->lock
*
- * Both consumer_data.lock and channel->lock currently
- * cause a deadlock, since they are held while
- * consumer_timer_switch_stop() is called.
+ * Ensure that neither consumer_data.lock nor
+ * channel->lock are taken within this function, since
+ * they are held while consumer_timer_switch_stop() is
+ * called.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, channel);
+ ret = lttng_ustconsumer_request_metadata(ctx, channel, 1);
if (ret < 0) {
channel->switch_timer_error = 1;
}
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&channel->lock);
- pthread_mutex_lock(&channel->timer_lock);
/* Delete streams that might have been left in the stream list. */
cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
call_rcu(&channel->node.head, free_channel_rcu);
end:
- pthread_mutex_unlock(&channel->timer_lock);
pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
}
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->chan->lock);
- pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
switch (consumer_data.type) {
end:
/*
* Nullify the stream reference so it is not used after deletion. The
- * consumer data lock MUST be acquired before being able to check for a
- * NULL pointer value.
+ * channel lock MUST be acquired before being able to check for
+ * a NULL pointer value.
*/
stream->chan->metadata_stream = NULL;
pthread_mutex_unlock(&stream->lock);
- pthread_mutex_unlock(&stream->chan->timer_lock);
pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&channel->lock);
- pthread_mutex_lock(&channel->timer_lock);
if (cds_lfht_is_node_deleted(&channel->node.node)) {
goto error_unlock;
}
error_unlock:
- pthread_mutex_unlock(&channel->timer_lock);
pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
error:
* Ask the sessiond if we have new metadata waiting and update the
* consumer metadata cache.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel);
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0);
if (ret < 0) {
goto error;
}
* Receive the metadata updates from the sessiond.
*/
int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
- uint64_t len, struct lttng_consumer_channel *channel)
+ uint64_t len, struct lttng_consumer_channel *channel,
+ int timer)
{
int ret, ret_code = LTTNG_OK;
char *metadata_str;
}
pthread_mutex_unlock(&channel->metadata_cache->lock);
- while (consumer_metadata_cache_flushed(channel, offset + len)) {
+ while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
DBG("Waiting for metadata to be flushed");
usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
}
}
ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
- len, channel);
+ len, channel, 0);
if (ret < 0) {
/* error receiving from sessiond */
goto error_fatal;
* introduces deadlocks.
*/
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *channel)
+ struct lttng_consumer_channel *channel, int timer)
{
struct lttcomm_metadata_request_msg request;
struct lttcomm_consumer_msg msg;
}
ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
- key, offset, len, channel);
+ key, offset, len, channel, timer);
if (ret_code >= 0) {
/*
* Only send the status msg if the sessiond is alive meaning a positive
void lttng_ustconsumer_close_metadata(struct lttng_ht *ht);
void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream);
int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
- uint64_t len, struct lttng_consumer_channel *channel);
+ uint64_t len, struct lttng_consumer_channel *channel,
+ int timer);
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *channel);
+ struct lttng_consumer_channel *channel, int timer);
#else /* HAVE_LIBLTTNG_UST_CTL */
}
static inline
int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
- uint64_t len, struct lttng_consumer_channel *channel)
+ uint64_t len, struct lttng_consumer_channel *channel,
+ int timer)
{
return -ENOSYS;
}
static inline
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *channel)
+ struct lttng_consumer_channel *channel, int timer)
{
return -ENOSYS;
}