free(cmd);
cmd = NULL;
} else {
- lttng_waiter_wake_up(&cmd->reply_waiter);
+ lttng_waiter_wake(&cmd->reply_waiter);
}
return ret;
error_unlock:
/* Wake-up and return a fatal error to the calling thread. */
- lttng_waiter_wake_up(&cmd->reply_waiter);
+ lttng_waiter_wake(&cmd->reply_waiter);
cmd->reply_code = LTTNG_ERR_FATAL;
error:
/* Indicate a fatal error to the caller. */
/*
* Check if the cache is flushed up to the offset passed in parameter.
*
- * Return 0 if everything has been flushed, 1 if there is data not flushed.
+ * Return true if everything has been flushed, false if there is data not flushed.
*/
-int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
+static
+bool consumer_metadata_cache_is_flushed(struct lttng_consumer_channel *channel,
uint64_t offset, int timer)
{
- int ret = 0;
+ bool done_flushing = false;
struct lttng_consumer_stream *metadata_stream;
- assert(channel);
- assert(channel->metadata_cache);
-
/*
* If not called from a timer handler, we have to take the
* channel lock to be mutually exclusive with channel teardown.
* Having no metadata stream means the channel is being destroyed so there
* is no cache to flush anymore.
*/
- ret = 0;
+ done_flushing = true;
goto end_unlock_channel;
}
pthread_mutex_lock(&channel->metadata_cache->lock);
if (metadata_stream->ust_metadata_pushed >= offset) {
- ret = 0;
+ done_flushing = true;
} else if (channel->metadata_stream->endpoint_status !=
CONSUMER_ENDPOINT_ACTIVE) {
/* An inactive endpoint means we don't have to flush anymore. */
- ret = 0;
+ done_flushing = true;
} else {
/* Still not completely flushed. */
- ret = 1;
+ done_flushing = false;
}
pthread_mutex_unlock(&channel->metadata_cache->lock);
pthread_mutex_unlock(&metadata_stream->lock);
+
end_unlock_channel:
pthread_mutex_unlock(&channel->timer_lock);
if (!timer) {
pthread_mutex_unlock(&channel->lock);
}
- return ret;
+ return done_flushing;
+}
+
+/*
+ * Wait until the cache is flushed up to the offset passed in parameter or the
+ * metadata stream has been destroyed.
+ */
+void consumer_wait_metadata_cache_flushed(struct lttng_consumer_channel *channel,
+ uint64_t offset, bool invoked_by_timer)
+{
+ assert(channel);
+ assert(channel->metadata_cache);
+
+ if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) {
+ return;
+ }
+
+ /* Metadata cache is not currently flushed, wait on wait queue. */
+ for (;;) {
+ struct lttng_waiter waiter;
+
+ lttng_waiter_init(&waiter);
+ lttng_wait_queue_add(&channel->metadata_pushed_wait_queue, &waiter);
+ if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) {
+ /* Wake up all waiters, ourself included. */
+ lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue);
+ /* Ensure proper teardown of waiter. */
+ lttng_waiter_wait(&waiter);
+ break;
+ }
+
+ lttng_waiter_wait(&waiter);
+ }
}
const char *data);
int consumer_metadata_cache_allocate(struct lttng_consumer_channel *channel);
void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel);
-int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
- uint64_t offset, int timer);
+void consumer_wait_metadata_cache_flushed(struct lttng_consumer_channel *channel,
+ uint64_t offset, bool invoked_by_timer);
#endif /* CONSUMER_METADATA_CACHE_H */
* - metadata_socket_lock
* - Calling lttng_ustconsumer_recv_metadata():
* - channel->metadata_cache->lock
- * - Calling consumer_metadata_cache_flushed():
+ * - Calling consumer_wait_metadata_cache_flushed():
* - channel->timer_lock
* - channel->metadata_cache->lock
*
* they are held while consumer_timer_switch_stop() is
* called.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
+ ret = lttng_ustconsumer_request_metadata(ctx, channel, true, 1);
if (ret < 0) {
channel->switch_timer_error = 1;
}
cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
if (stream->net_seq_idx == net_seq_idx) {
uatomic_set(&stream->endpoint_status, status);
+ lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue);
+
DBG("Delete flag set to metadata stream %d", stream->wait_fd);
}
}
channel->is_live = is_in_live_session;
pthread_mutex_init(&channel->lock, NULL);
pthread_mutex_init(&channel->timer_lock, NULL);
+ lttng_wait_queue_init(&channel->metadata_pushed_wait_queue);
switch (output) {
case LTTNG_EVENT_SPLICE:
* pointer value.
*/
channel->metadata_stream = NULL;
+ lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue);
if (channel->metadata_cache) {
pthread_mutex_unlock(&channel->metadata_cache->lock);
#include <common/credentials.h>
#include <common/buffer-view.h>
#include <common/dynamic-array.h>
+#include <common/waiter.h>
struct lttng_consumer_local_data;
/* Metadata cache is metadata channel */
struct consumer_metadata_cache *metadata_cache;
+ /*
+ * Wait queue awaiting updates to metadata stream's flushed position.
+ */
+ struct lttng_wait_queue metadata_pushed_wait_queue;
+
/* For UST metadata periodical flush */
int switch_timer_enabled;
timer_t switch_timer;
*/
consumer_stream_destroy(metadata->metadata_stream, NULL);
metadata->metadata_stream = NULL;
+ lttng_wait_queue_wake_all(&metadata->metadata_pushed_wait_queue);
+
send_streams_error:
error_no_stream:
end:
* Ask the sessiond if we have new metadata waiting and update the
* consumer metadata cache.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 1);
if (ret < 0) {
goto error;
}
*/
consumer_stream_destroy(metadata_stream, NULL);
metadata_channel->metadata_stream = NULL;
+ lttng_wait_queue_wake_all(&metadata_channel->metadata_pushed_wait_queue);
error:
rcu_read_unlock();
*/
int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
uint64_t len, uint64_t version,
- struct lttng_consumer_channel *channel, int timer, int wait)
+ struct lttng_consumer_channel *channel, bool invoked_by_timer, int wait)
{
int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
char *metadata_str;
if (!wait) {
goto end_free;
}
- while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
- DBG("Waiting for metadata to be flushed");
-
- health_code_update();
- usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
- }
+ consumer_wait_metadata_cache_flushed(channel, offset + len, invoked_by_timer);
end_free:
free(metadata_str);
health_code_update();
ret = lttng_ustconsumer_recv_metadata(sock, key, offset, len,
- version, found_channel, 0, 1);
+ version, found_channel, false, 1);
if (ret < 0) {
/* error receiving from sessiond */
goto error_push_metadata_fatal;
goto end;
}
stream->ust_metadata_pushed += write_len;
+ lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue);
assert(stream->chan->metadata_cache->contents.size >=
stream->ust_metadata_pushed);
* Request metadata from the sessiond, but don't wait for the flush
* because we locked the metadata thread.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0);
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 0);
pthread_mutex_lock(&metadata_stream->lock);
if (ret < 0) {
status = SYNC_METADATA_STATUS_ERROR;
* pushed out due to concurrent interaction with the session daemon.
*/
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *channel, int timer, int wait)
+ struct lttng_consumer_channel *channel, bool invoked_by_timer, int wait)
{
struct lttcomm_metadata_request_msg request;
struct lttcomm_consumer_msg msg;
health_code_update();
ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
- key, offset, len, version, channel, timer, wait);
+ key, offset, len, version, channel, invoked_by_timer, wait);
if (ret >= 0) {
/*
* Only send the status msg if the sessiond is alive meaning a positive
void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream);
int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
uint64_t len, uint64_t version,
- struct lttng_consumer_channel *channel, int timer, int wait);
+ struct lttng_consumer_channel *channel, bool invoked_by_timer,
+ int wait);
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *channel, int timer, int wait);
+ struct lttng_consumer_channel *channel, bool invoked_by_timer,
+ int wait);
enum sync_metadata_status lttng_ustconsumer_sync_metadata(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *metadata);
{
unsigned int i;
- DBG("Beginning of waiter wait period");
- /* Load and test condition before read state */
+ DBG("Beginning of waiter \"wait\" period");
+
+ /* Load and test condition before read state. */
cmm_smp_rmb();
for (i = 0; i < WAIT_ATTEMPTS; i++) {
if (uatomic_read(&waiter->state) != WAITER_WAITING) {
goto skip_futex_wait;
}
+
caa_cpu_relax();
}
+
while (uatomic_read(&waiter->state) == WAITER_WAITING) {
if (!futex_noasync(&waiter->state, FUTEX_WAIT, WAITER_WAITING, NULL, NULL, 0)) {
/*
*/
continue;
}
+
switch (errno) {
case EAGAIN:
/* Value already changed. */
if (uatomic_read(&waiter->state) & WAITER_TEARDOWN) {
break;
}
+
caa_cpu_relax();
}
+
while (!(uatomic_read(&waiter->state) & WAITER_TEARDOWN)) {
poll(NULL, 0, 10);
}
+
assert(uatomic_read(&waiter->state) & WAITER_TEARDOWN);
- DBG("End of waiter wait period");
+ DBG("End of waiter \"wait\" period");
}
/*
* it to free this memory when it sees the WAITER_TEARDOWN flag.
*/
LTTNG_HIDDEN
-void lttng_waiter_wake_up(struct lttng_waiter *waiter)
+void lttng_waiter_wake(struct lttng_waiter *waiter)
{
cmm_smp_mb();
assert(uatomic_read(&waiter->state) == WAITER_WAITING);
abort();
}
}
+
/* Allow teardown of struct urcu_wait memory. */
uatomic_or(&waiter->state, WAITER_TEARDOWN);
}
+
+
+LTTNG_HIDDEN
+void lttng_wait_queue_init(struct lttng_wait_queue *queue)
+{
+ cds_wfs_init(&queue->stack);
+}
+
+LTTNG_HIDDEN
+void lttng_wait_queue_add(struct lttng_wait_queue *queue,
+ struct lttng_waiter *waiter)
+{
+ (void) cds_wfs_push(&queue->stack, &waiter->wait_queue_node);
+}
+
+LTTNG_HIDDEN
+void lttng_wait_queue_wake_all(struct lttng_wait_queue *queue)
+{
+ struct cds_wfs_head *waiters;
+ struct cds_wfs_node *iter, *iter_n;
+
+ /* Move all waiters from the queue to our local stack. */
+ waiters = __cds_wfs_pop_all(&queue->stack);
+
+ /* Wake all waiters in our stack head. */
+ cds_wfs_for_each_blocking_safe(waiters, iter, iter_n) {
+ struct lttng_waiter *waiter =
+ container_of(iter, struct lttng_waiter, wait_queue_node);
+
+ /* Don't wake already running threads. */
+ if (waiter->state & WAITER_RUNNING) {
+ continue;
+ }
+
+ lttng_waiter_wake(waiter);
+ }
+}
int32_t state;
};
+struct lttng_wait_queue {
+ struct cds_wfs_stack stack;
+};
+
LTTNG_HIDDEN
void lttng_waiter_init(struct lttng_waiter *waiter);
void lttng_waiter_wait(struct lttng_waiter *waiter);
/*
- * lttng_waiter_wake_up must only be called by a single waker.
+ * lttng_waiter_wake must only be called by a single waker.
* It is invalid for multiple "wake" operations to be invoked
* on a single waiter without re-initializing it before.
*/
LTTNG_HIDDEN
-void lttng_waiter_wake_up(struct lttng_waiter *waiter);
+void lttng_waiter_wake(struct lttng_waiter *waiter);
+
+LTTNG_HIDDEN
+void lttng_wait_queue_init(struct lttng_wait_queue *queue);
+
+/*
+ * Atomically add a waiter to a wait queue.
+ * A full memory barrier is issued before being added to the wait queue.
+ */
+LTTNG_HIDDEN
+void lttng_wait_queue_add(struct lttng_wait_queue *queue,
+ struct lttng_waiter *waiter);
+
+/*
+ * Wake every waiter present in the wait queue and remove them from
+ * the queue.
+ */
+LTTNG_HIDDEN
+void lttng_wait_queue_wake_all(struct lttng_wait_queue *queue);
#endif /* LTTNG_WAITER_H */