union v_atomic records_count; /* Number of records written */
union v_atomic records_overrun; /* Number of overwritten records */
wait_queue_head_t read_wait; /* reader buffer-level wait queue */
+ wait_queue_head_t write_wait; /* writer buffer-level wait queue (for metadata only) */
int finalized; /* buffer has been finalized */
struct timer_list switch_timer; /* timer for periodical switch */
struct timer_list read_timer; /* timer for read poll */
}
init_waitqueue_head(&buf->read_wait);
+ init_waitqueue_head(&buf->write_wait);
raw_spin_lock_init(&buf->raw_tick_nohz_spinlock);
/*
/**
* lib_ring_buffer_put_snapshot - move consumed counter forward
+ *
+ * Should only be called from consumer context.
* @buf: ring buffer
* @consumed_new: new consumed count value
*/
while ((long) consumed - (long) consumed_new < 0)
consumed = atomic_long_cmpxchg(&buf->consumed, consumed,
consumed_new);
+ /* Wake-up the metadata producer */
+ wake_up_interruptible(&buf->write_wait);
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_move_consumer);
* we need to bail out after timeout or being
* interrupted.
*/
- waitret = wait_event_interruptible_timeout(*chan->ops->get_reader_wait_queue(chan->chan),
+ waitret = wait_event_interruptible_timeout(*chan->ops->get_writer_buf_wait_queue(chan->chan, -1),
({
ret = chan->ops->event_reserve(&ctx, 0);
ret != -ENOBUFS || !ret;
* may change due to concurrent writes.
*/
size_t (*packet_avail_size)(struct channel *chan);
- wait_queue_head_t *(*get_reader_wait_queue)(struct channel *chan);
+ wait_queue_head_t *(*get_writer_buf_wait_queue)(struct channel *chan, int cpu);
wait_queue_head_t *(*get_hp_wait_queue)(struct channel *chan);
int (*is_finalized)(struct channel *chan);
int (*is_disabled)(struct channel *chan);
}
static
-wait_queue_head_t *ltt_get_reader_wait_queue(struct channel *chan)
+wait_queue_head_t *ltt_get_writer_buf_wait_queue(struct channel *chan, int cpu)
{
- return &chan->read_wait;
+ struct lib_ring_buffer *buf = channel_get_ring_buffer(&client_config,
+ chan, cpu);
+ return &buf->write_wait;
}
static
.event_commit = ltt_event_commit,
.event_write = ltt_event_write,
.packet_avail_size = NULL, /* Would be racy anyway */
- .get_reader_wait_queue = ltt_get_reader_wait_queue,
+ .get_writer_buf_wait_queue = ltt_get_writer_buf_wait_queue,
.get_hp_wait_queue = ltt_get_hp_wait_queue,
.is_finalized = ltt_is_finalized,
.is_disabled = ltt_is_disabled,
header->content_size = data_size * CHAR_BIT; /* in bits */
header->packet_size = PAGE_ALIGN(data_size) * CHAR_BIT; /* in bits */
- records_lost += lib_ring_buffer_get_records_lost_full(&client_config, buf);
+ /*
+ * We do not care about the records lost count, because the metadata
+ * channel waits and retry.
+ */
+ (void) lib_ring_buffer_get_records_lost_full(&client_config, buf);
records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, buf);
records_lost += lib_ring_buffer_get_records_lost_big(&client_config, buf);
WARN_ON_ONCE(records_lost != 0);
}
static
-wait_queue_head_t *ltt_get_reader_wait_queue(struct channel *chan)
+wait_queue_head_t *ltt_get_writer_buf_wait_queue(struct channel *chan, int cpu)
{
- return &chan->read_wait;
+ struct lib_ring_buffer *buf = channel_get_ring_buffer(&client_config,
+ chan, cpu);
+ return &buf->write_wait;
}
static
.event_commit = ltt_event_commit,
.event_write = ltt_event_write,
.packet_avail_size = ltt_packet_avail_size,
- .get_reader_wait_queue = ltt_get_reader_wait_queue,
+ .get_writer_buf_wait_queue = ltt_get_writer_buf_wait_queue,
.get_hp_wait_queue = ltt_get_hp_wait_queue,
.is_finalized = ltt_is_finalized,
.is_disabled = ltt_is_disabled,