1 /* LTTng ltt-tracer.c atomic lockless buffering scheme Promela model v2
2 * Created for the Spin validator.
3 * Mathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
6 * TODO : create test cases that will generate an overflow on the offset and
7 * counter type. Counter types smaller than a byte should be used.
9 * Promela only has unsigned char, no signed char.
10 * Because detection of difference < 0 depends on a signed type, but we want
11 * compactness, check also for the values being higher than half of the unsigned
12 * char range (and consider them negative). The model, by design, does not use
13 * offsets or counts higher than 127 because we would then have to use a larger
14 * type (short or int).
16 #define HALF_UCHAR (255/2)
18 /* NUMPROCS 4 : causes event loss with some reader timings.
19 * e.g. 3 events, 1 switch, 1 event (lost, buffer full), read 1 subbuffer
23 /* NUMPROCS 3 : does not cause event loss because buffers are big enough.
25 * e.g. 3 events, 1 switch, read 1 subbuffer
31 #define SUBBUF_SIZE (BUFSIZE / NR_SUBBUFS)
36 byte commit_count[NR_SUBBUFS];
47 /* buffer slot in-use bit. Detects racy use (more than a single process
48 * accessing a slot at any given step).
50 bool buffer_use[BUFSIZE];
52 /* Proceed to a sub-subber switch is needed.
53 * Used in a periodical timer interrupt to fill and ship the current subbuffer
54 * to the reader so we can guarantee a steady flow. If a subbuffer is
55 * completely empty, do not switch.
56 * Also used as "finalize" operation to complete the last subbuffer after
57 * all writers have finished so the last subbuffer can be read by the reader.
61 byte prev_off, new_off, tmp_commit;
67 size = SUBBUF_SIZE - (prev_off % SUBBUF_SIZE);
68 new_off = prev_off + size;
70 :: (new_off - read_off > BUFSIZE && new_off - read_off < HALF_UCHAR)
71 || size == SUBBUF_SIZE ->
72 refcount = refcount - 1;
79 :: prev_off != write_off -> goto cmpxchg_loop
80 :: else -> write_off = new_off;
85 tmp_commit = commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] + size;
86 commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] = tmp_commit;
88 :: (((prev_off / BUFSIZE) * BUFSIZE) / NR_SUBBUFS) + SUBBUF_SIZE -
94 refcount = refcount - 1;
101 * Writes 1 byte of information in the buffer at the current
102 * "write_off" position and then increment the commit_count of the sub-buffer
103 * the information has been written to.
108 byte prev_off, new_off, tmp_commit;
113 prev_off = write_off;
114 new_off = prev_off + size;
118 :: new_off - read_off > BUFSIZE && new_off - read_off < HALF_UCHAR ->
125 :: prev_off != write_off -> goto cmpxchg_loop
126 :: else -> write_off = new_off;
131 assert(buffer_use[(prev_off + i) % BUFSIZE] == 0);
132 buffer_use[(prev_off + i) % BUFSIZE] = 1;
134 :: i >= size -> break
138 /* writing to buffer...
145 buffer_use[(prev_off + i) % BUFSIZE] = 0;
147 :: i >= size -> break
149 tmp_commit = commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] + size;
150 commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] = tmp_commit;
152 :: (((prev_off / BUFSIZE) * BUFSIZE) / NR_SUBBUFS) + SUBBUF_SIZE -
164 refcount = refcount - 1;
169 * Read the information sub-buffer per sub-buffer when available.
171 * Reads the information as soon as it is ready, or may be delayed by
172 * an asynchronous delivery. Being modeled as a process insures all cases
173 * (scheduled very quickly or very late, causing event loss) are covered.
174 * Only one reader per buffer (normally ensured by a mutex). This is modeled
175 * by using a single reader process.
182 :: (write_off / SUBBUF_SIZE) - (read_off / SUBBUF_SIZE) > 0
183 && (write_off / SUBBUF_SIZE) - (read_off / SUBBUF_SIZE) < HALF_UCHAR
184 && (commit_count[(read_off % BUFSIZE) / SUBBUF_SIZE]
185 - SUBBUF_SIZE - (((read_off / BUFSIZE) * BUFSIZE) / NR_SUBBUFS)
190 :: i < SUBBUF_SIZE ->
191 assert(buffer_use[(read_off + i) % BUFSIZE] == 0);
192 buffer_use[(read_off + i) % BUFSIZE] = 1;
194 :: i >= SUBBUF_SIZE -> break
198 /* reading from buffer...
204 :: i < SUBBUF_SIZE ->
205 buffer_use[(read_off + i) % BUFSIZE] = 0;
207 :: i >= SUBBUF_SIZE -> break
209 read_off = read_off + SUBBUF_SIZE;
211 :: read_off >= (NUMPROCS - events_lost) -> break;
215 /* Waits for all tracer and switcher processes to finish before finalizing
216 * the buffer. Only after that will the reader be allowed to read the
224 refcount = refcount + 1;
225 run switcher(); /* Finalize the last sub-buffer so it can be read. */
243 :: i >= NR_SUBBUFS -> break
250 :: i >= BUFSIZE -> break
257 refcount = refcount + 1;
260 :: i >= NUMPROCS -> break
265 refcount = refcount + 1;
268 :: i >= NUMSWITCH -> break
275 /* The writer head must always be superior or equal to the reader head.
277 assert(write_off - read_off >= 0 && write_off - read_off < HALF_UCHAR);
282 commit_sum = commit_sum + commit_count[j];
283 /* The commit count of a particular subbuffer must always be higher
284 * or equal to the retrieve_count of this subbuffer.
285 * assert(commit_count[j] - retrieve_count[j] >= 0 &&
286 * commit_count[j] - retrieve_count[j] < HALF_UCHAR);
289 :: j >= NR_SUBBUFS -> break
291 /* The sum of all subbuffer commit counts must always be lower or equal
292 * to the writer head, because space must be reserved before it is
293 * written to and then committed.
295 assert(write_off - commit_sum >= 0 && write_off - commit_sum < HALF_UCHAR);
297 /* If we have less writers than the buffer space available, we should
300 assert(NUMPROCS + NUMSWITCH > BUFSIZE || events_lost == 0);