#define LTTNG_UST_RB_SIG_READ SIGRTMIN + 1
#define LTTNG_UST_RB_SIG_TEARDOWN SIGRTMIN + 2
#define CLOCKID CLOCK_MONOTONIC
+#define LTTNG_UST_RING_BUFFER_GET_RETRY 10
+#define LTTNG_UST_RING_BUFFER_RETRY_DELAY_MS 10
/*
* Use POSIX SHM: shm_open(3) and shm_unlink(3).
struct channel *chan = shmp(handle, buf->backend.chan);
const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
unsigned long consumed_cur, consumed_idx, commit_count, write_offset;
- int ret;
- int finalized;
+ int ret, finalized, nr_retry = LTTNG_UST_RING_BUFFER_GET_RETRY;
retry:
finalized = CMM_ACCESS_ONCE(buf->finalized);
/*
* Check that the subbuffer we are trying to consume has been
- * already fully committed.
+ * already fully committed. There are a few causes that can make
+ * this unavailability situation occur:
+ *
+ * Temporary (short-term) situation:
+ * - Application is running on a different CPU, between reserve
+ * and commit ring buffer operations,
+ * - Application is preempted between reserve and commit ring
+ * buffer operations,
+ *
+ * Long-term situation:
+ * - Application is stopped (SIGSTOP) between reserve and commit
+ * ring buffer operations. Could eventually be resumed by
+ * SIGCONT.
+ * - Application is killed (SIGTERM, SIGINT, SIGKILL) between
+ * reserve and commit ring buffer operation.
+ *
+ * From a consumer perspective, handling short-term
+ * unavailability situations is performed by retrying a few
+ * times after a delay. Handling long-term unavailability
+ * situations is handled by failing to get the sub-buffer.
+ *
+ * In all of those situations, if the application is taking a
+ * long time to perform its commit after ring buffer space
+ * reservation, we can end up in a situation where the producer
+ * will fill the ring buffer and try to write into the same
+ * sub-buffer again (which has a missing commit). This is
+ * handled by the producer in the sub-buffer switch handling
+ * code of the reserve routine by detecting unbalanced
+ * reserve/commit counters and discarding all further events
+ * until the situation is resolved in those situations. Two
+ * scenarios can occur:
+ *
+ * 1) The application causing the reserve/commit counters to be
+ * unbalanced has been terminated. In this situation, all
+ * further events will be discarded in the buffers, and no
+ * further buffer data will be readable by the consumer
+ * daemon. Tearing down the UST tracing session and starting
+ * anew is a work-around for those situations. Note that this
+ * only affects per-UID tracing. In per-PID tracing, the
+ * application vanishes with the termination, and therefore
+ * no more data needs to be written to the buffers.
+ * 2) The application causing the unbalance has been delayed for
+ * a long time, but will eventually try to increment the
+ * commit counter after eventually writing to the sub-buffer.
+ * This situation can cause events to be discarded until the
+ * application resumes its operations.
*/
if (((commit_count - chan->backend.subbuf_size)
& chan->commit_count_mask)
- (buf_trunc(consumed, chan)
>> chan->backend.num_subbuf_order)
- != 0)
- goto nodata;
+ != 0) {
+ if (nr_retry-- > 0) {
+ if (nr_retry <= (LTTNG_UST_RING_BUFFER_GET_RETRY >> 1))
+ (void) poll(NULL, 0, LTTNG_UST_RING_BUFFER_RETRY_DELAY_MS);
+ goto retry;
+ } else {
+ goto nodata;
+ }
+ }
/*
* Check that we are not about to read the same subbuffer in
* the writer is getting access to a subbuffer we were trying to get
* access to. Also checks that the "consumed" buffer count we are
* looking for matches the one contained in the subbuffer id.
+ *
+ * The short-lived race window described here can be affected by
+ * application signals and preemption, thus requiring to bound
+ * the loop to a maximum number of retry.
*/
ret = update_read_sb_index(config, &buf->backend, &chan->backend,
consumed_idx, buf_trunc_val(consumed, chan),
handle);
- if (ret)
- goto retry;
+ if (ret) {
+ if (nr_retry-- > 0) {
+ if (nr_retry <= (LTTNG_UST_RING_BUFFER_GET_RETRY >> 1))
+ (void) poll(NULL, 0, LTTNG_UST_RING_BUFFER_RETRY_DELAY_MS);
+ goto retry;
+ } else {
+ goto nodata;
+ }
+ }
subbuffer_id_clear_noref(config, &buf->backend.buf_rsb.id);
buf->get_subbuf_consumed = consumed;