X-Git-Url: http://git.lttng.org./?a=blobdiff_plain;ds=sidebyside;f=libringbuffer%2Fring_buffer_frontend.c;h=2b4ecc24b1c44769afe455f40773b6c6db5233c8;hb=a6352fd40a2090fd883a6c369144bf405c9e9ec4;hp=5ceda871a907912d517c506f103b8686a0b4fea5;hpb=14641debd03ba299bd06040cb62e0dbdef7fac81;p=lttng-ust.git diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index 5ceda871..2b4ecc24 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -38,13 +38,16 @@ * Dual LGPL v2.1/GPL v2 license. */ +#include +#include #include +#include +#include "smp.h" #include "config.h" #include "backend.h" #include "frontend.h" -#include "iterator.h" -#include "nohz.h" +#include "shm.h" /* * Internal structure representing offsets to use at a sub-buffer switch. @@ -56,20 +59,7 @@ struct switch_offsets { switch_old_end:1; }; -#ifdef CONFIG_NO_HZ -enum tick_nohz_val { - TICK_NOHZ_STOP, - TICK_NOHZ_FLUSH, - TICK_NOHZ_RESTART, -}; - -static ATOMIC_NOTIFIER_HEAD(tick_nohz_notifier); -#endif /* CONFIG_NO_HZ */ - -static DEFINE_PER_CPU(spinlock_t, ring_buffer_nohz_lock); - -DEFINE_PER_CPU(unsigned int, lib_ring_buffer_nesting); -EXPORT_PER_CPU_SYMBOL(lib_ring_buffer_nesting); +__thread unsigned int lib_ring_buffer_nesting; static void lib_ring_buffer_print_errors(struct channel *chan, @@ -80,11 +70,11 @@ void lib_ring_buffer_print_errors(struct channel *chan, */ void lib_ring_buffer_free(struct lib_ring_buffer *buf) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu); - kfree(buf->commit_hot); - kfree(buf->commit_cold); + free(shmp(buf->commit_hot)); + free(shmp(buf->commit_cold)); lib_ring_buffer_backend_free(&buf->backend); } @@ -100,7 +90,7 @@ void lib_ring_buffer_free(struct lib_ring_buffer *buf) */ void lib_ring_buffer_reset(struct lib_ring_buffer *buf) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; unsigned int i; @@ -108,15 +98,14 @@ void lib_ring_buffer_reset(struct lib_ring_buffer *buf) * Reset iterator first. It will put the subbuffer if it currently holds * it. */ - lib_ring_buffer_iterator_reset(buf); v_set(config, &buf->offset, 0); for (i = 0; i < chan->backend.num_subbuf; i++) { - v_set(config, &buf->commit_hot[i].cc, 0); - v_set(config, &buf->commit_hot[i].seq, 0); - v_set(config, &buf->commit_cold[i].cc_sb, 0); + v_set(config, &shmp(buf->commit_hot)[i].cc, 0); + v_set(config, &shmp(buf->commit_hot)[i].seq, 0); + v_set(config, &shmp(buf->commit_cold)[i].cc_sb, 0); } - atomic_long_set(&buf->consumed, 0); - atomic_set(&buf->record_disabled, 0); + uatomic_set(&buf->consumed, 0); + uatomic_set(&buf->record_disabled, 0); v_set(config, &buf->last_tsc, 0); lib_ring_buffer_backend_reset(&buf->backend); /* Don't reset number of active readers */ @@ -127,7 +116,6 @@ void lib_ring_buffer_reset(struct lib_ring_buffer *buf) v_set(config, &buf->records_overrun, 0); buf->finalized = 0; } -EXPORT_SYMBOL_GPL(lib_ring_buffer_reset); /** * channel_reset - Reset channel to initial values. @@ -143,21 +131,20 @@ void channel_reset(struct channel *chan) /* * Reset iterators first. Will put the subbuffer if held for reading. */ - channel_iterator_reset(chan); - atomic_set(&chan->record_disabled, 0); + uatomic_set(&chan->record_disabled, 0); /* Don't reset commit_count_mask, still valid */ channel_backend_reset(&chan->backend); /* Don't reset switch/read timer interval */ /* Don't reset notifiers and notifier enable bits */ /* Don't reset reader reference count */ } -EXPORT_SYMBOL_GPL(channel_reset); /* * Must be called under cpu hotplug protection. */ int lib_ring_buffer_create(struct lib_ring_buffer *buf, - struct channel_backend *chanb, int cpu) + struct channel_backend *chanb, int cpu, + struct shm_header *shm_header) { const struct lib_ring_buffer_config *config = chanb->config; struct channel *chan = caa_container_of(chanb, struct channel, backend); @@ -171,39 +158,29 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, if (buf->backend.allocated) return 0; - /* - * Paranoia: per cpu dynamic allocation is not officially documented as - * zeroing the memory, so let's do it here too, just in case. - */ - memset(buf, 0, sizeof(*buf)); - - ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend, cpu); + ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend, + cpu, shm_header); if (ret) return ret; - buf->commit_hot = - kzalloc_node(ALIGN(sizeof(*buf->commit_hot) - * chan->backend.num_subbuf, - 1 << INTERNODE_CACHE_SHIFT), - GFP_KERNEL, cpu_to_node(max(cpu, 0))); - if (!buf->commit_hot) { + set_shmp(&buf->commit_hot, + zalloc_shm(shm_header, + sizeof(*buf->commit_hot) * chan->backend.num_subbuf)); + if (!shmp(buf->commit_hot)) { ret = -ENOMEM; goto free_chanbuf; } - buf->commit_cold = - kzalloc_node(ALIGN(sizeof(*buf->commit_cold) - * chan->backend.num_subbuf, - 1 << INTERNODE_CACHE_SHIFT), - GFP_KERNEL, cpu_to_node(max(cpu, 0))); - if (!buf->commit_cold) { + set_shmp(&buf->commit_cold, + zalloc_shm(shm_header, + sizeof(*buf->commit_cold) * chan->backend.num_subbuf)); + if (!shmp(buf->commit_cold)) { ret = -ENOMEM; goto free_commit; } num_subbuf = chan->backend.num_subbuf; - init_waitqueue_head(&buf->read_wait); - raw_spin_lock_init(&buf->raw_tick_nohz_spinlock); + //init_waitqueue_head(&buf->read_wait); /* * Write the subbuffer header for first subbuffer so we know the total @@ -211,38 +188,24 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, */ subbuf_header_size = config->cb.subbuffer_header_size(); v_set(config, &buf->offset, subbuf_header_size); - subbuffer_id_clear_noref(config, &buf->backend.buf_wsb[0].id); - tsc = config->cb.ring_buffer_clock_read(buf->backend.chan); + subbuffer_id_clear_noref(config, &shmp(buf->backend.buf_wsb)[0].id); + tsc = config->cb.ring_buffer_clock_read(shmp(buf->backend.chan)); config->cb.buffer_begin(buf, tsc, 0); - v_add(config, subbuf_header_size, &buf->commit_hot[0].cc); + v_add(config, subbuf_header_size, &shmp(buf->commit_hot)[0].cc); if (config->cb.buffer_create) { ret = config->cb.buffer_create(buf, priv, cpu, chanb->name); if (ret) goto free_init; } - - /* - * Ensure the buffer is ready before setting it to allocated and setting - * the cpumask. - * Used for cpu hotplug vs cpumask iteration. - */ - smp_wmb(); buf->backend.allocated = 1; - - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { - CHAN_WARN_ON(chan, cpumask_test_cpu(cpu, - chan->backend.cpumask)); - cpumask_set_cpu(cpu, chan->backend.cpumask); - } - return 0; /* Error handling */ free_init: - kfree(buf->commit_cold); + /* commit_cold will be freed by shm teardown */ free_commit: - kfree(buf->commit_hot); + /* commit_hot will be freed by shm teardown */ free_chanbuf: lib_ring_buffer_backend_free(&buf->backend); return ret; @@ -251,55 +214,52 @@ free_chanbuf: static void switch_buffer_timer(unsigned long data) { struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data; - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; /* * Only flush buffers periodically if readers are active. */ - if (atomic_long_read(&buf->active_readers)) + if (uatomic_read(&buf->active_readers)) lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - mod_timer_pinned(&buf->switch_timer, - jiffies + chan->switch_timer_interval); - else - mod_timer(&buf->switch_timer, - jiffies + chan->switch_timer_interval); + //TODO timers + //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) + // mod_timer_pinned(&buf->switch_timer, + // jiffies + chan->switch_timer_interval); + //else + // mod_timer(&buf->switch_timer, + // jiffies + chan->switch_timer_interval); } -/* - * Called with ring_buffer_nohz_lock held for per-cpu buffers. - */ static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; if (!chan->switch_timer_interval || buf->switch_timer_enabled) return; - init_timer(&buf->switch_timer); - buf->switch_timer.function = switch_buffer_timer; - buf->switch_timer.expires = jiffies + chan->switch_timer_interval; - buf->switch_timer.data = (unsigned long)buf; - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - add_timer_on(&buf->switch_timer, buf->backend.cpu); - else - add_timer(&buf->switch_timer); + //TODO + //init_timer(&buf->switch_timer); + //buf->switch_timer.function = switch_buffer_timer; + //buf->switch_timer.expires = jiffies + chan->switch_timer_interval; + //buf->switch_timer.data = (unsigned long)buf; + //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) + // add_timer_on(&buf->switch_timer, buf->backend.cpu); + //else + // add_timer(&buf->switch_timer); buf->switch_timer_enabled = 1; } -/* - * Called with ring_buffer_nohz_lock held for per-cpu buffers. - */ static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); if (!chan->switch_timer_interval || !buf->switch_timer_enabled) return; - del_timer_sync(&buf->switch_timer); + //TODO + //del_timer_sync(&buf->switch_timer); buf->switch_timer_enabled = 0; } @@ -309,31 +269,30 @@ static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf) static void read_buffer_timer(unsigned long data) { struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data; - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; CHAN_WARN_ON(chan, !buf->backend.allocated); - if (atomic_long_read(&buf->active_readers) + if (uatomic_read(&buf->active_readers) && lib_ring_buffer_poll_deliver(config, buf, chan)) { - wake_up_interruptible(&buf->read_wait); - wake_up_interruptible(&chan->read_wait); + //TODO + //wake_up_interruptible(&buf->read_wait); + //wake_up_interruptible(&chan->read_wait); } - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - mod_timer_pinned(&buf->read_timer, - jiffies + chan->read_timer_interval); - else - mod_timer(&buf->read_timer, - jiffies + chan->read_timer_interval); + //TODO + //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) + // mod_timer_pinned(&buf->read_timer, + // jiffies + chan->read_timer_interval); + //else + // mod_timer(&buf->read_timer, + // jiffies + chan->read_timer_interval); } -/* - * Called with ring_buffer_nohz_lock held for per-cpu buffers. - */ static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER @@ -341,24 +300,22 @@ static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf) || buf->read_timer_enabled) return; - init_timer(&buf->read_timer); - buf->read_timer.function = read_buffer_timer; - buf->read_timer.expires = jiffies + chan->read_timer_interval; - buf->read_timer.data = (unsigned long)buf; + //TODO + //init_timer(&buf->read_timer); + //buf->read_timer.function = read_buffer_timer; + //buf->read_timer.expires = jiffies + chan->read_timer_interval; + //buf->read_timer.data = (unsigned long)buf; - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - add_timer_on(&buf->read_timer, buf->backend.cpu); - else - add_timer(&buf->read_timer); + //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) + // add_timer_on(&buf->read_timer, buf->backend.cpu); + //else + // add_timer(&buf->read_timer); buf->read_timer_enabled = 1; } -/* - * Called with ring_buffer_nohz_lock held for per-cpu buffers. - */ static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER @@ -366,202 +323,34 @@ static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf) || !buf->read_timer_enabled) return; - del_timer_sync(&buf->read_timer); + //TODO + //del_timer_sync(&buf->read_timer); /* * do one more check to catch data that has been written in the last * timer period. */ if (lib_ring_buffer_poll_deliver(config, buf, chan)) { - wake_up_interruptible(&buf->read_wait); - wake_up_interruptible(&chan->read_wait); + //TODO + //wake_up_interruptible(&buf->read_wait); + //wake_up_interruptible(&chan->read_wait); } buf->read_timer_enabled = 0; } -#ifdef CONFIG_HOTPLUG_CPU -/** - * lib_ring_buffer_cpu_hp_callback - CPU hotplug callback - * @nb: notifier block - * @action: hotplug action to take - * @hcpu: CPU number - * - * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD) - */ -static -int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, - unsigned long action, - void *hcpu) -{ - unsigned int cpu = (unsigned long)hcpu; - struct channel *chan = caa_container_of(nb, struct channel, - cpu_hp_notifier); - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); - const struct lib_ring_buffer_config *config = chan->backend.config; - - if (!chan->cpu_hp_enable) - return NOTIFY_DONE; - - CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL); - - switch (action) { - case CPU_DOWN_FAILED: - case CPU_DOWN_FAILED_FROZEN: - case CPU_ONLINE: - case CPU_ONLINE_FROZEN: - wake_up_interruptible(&chan->hp_wait); - lib_ring_buffer_start_switch_timer(buf); - lib_ring_buffer_start_read_timer(buf); - return NOTIFY_OK; - - case CPU_DOWN_PREPARE: - case CPU_DOWN_PREPARE_FROZEN: - lib_ring_buffer_stop_switch_timer(buf); - lib_ring_buffer_stop_read_timer(buf); - return NOTIFY_OK; - - case CPU_DEAD: - case CPU_DEAD_FROZEN: - /* - * Performing a buffer switch on a remote CPU. Performed by - * the CPU responsible for doing the hotunplug after the target - * CPU stopped running completely. Ensures that all data - * from that remote CPU is flushed. - */ - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); - return NOTIFY_OK; - - default: - return NOTIFY_DONE; - } -} -#endif - -#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) -/* - * For per-cpu buffers, call the reader wakeups before switching the buffer, so - * that wake-up-tracing generated events are flushed before going idle (in - * tick_nohz). We test if the spinlock is locked to deal with the race where - * readers try to sample the ring buffer before we perform the switch. We let - * the readers retry in that case. If there is data in the buffer, the wake up - * is going to forbid the CPU running the reader thread from going idle. - */ -static int notrace ring_buffer_tick_nohz_callback(struct notifier_block *nb, - unsigned long val, - void *data) -{ - struct channel *chan = caa_container_of(nb, struct channel, - tick_nohz_notifier); - const struct lib_ring_buffer_config *config = chan->backend.config; - struct lib_ring_buffer *buf; - int cpu = smp_processor_id(); - - if (config->alloc != RING_BUFFER_ALLOC_PER_CPU) { - /* - * We don't support keeping the system idle with global buffers - * and streaming active. In order to do so, we would need to - * sample a non-nohz-cpumask racelessly with the nohz updates - * without adding synchronization overhead to nohz. Leave this - * use-case out for now. - */ - return 0; - } - - buf = channel_get_ring_buffer(config, chan, cpu); - switch (val) { - case TICK_NOHZ_FLUSH: - raw_spin_lock(&buf->raw_tick_nohz_spinlock); - if (config->wakeup == RING_BUFFER_WAKEUP_BY_TIMER - && chan->read_timer_interval - && atomic_long_read(&buf->active_readers) - && (lib_ring_buffer_poll_deliver(config, buf, chan) - || lib_ring_buffer_pending_data(config, buf, chan))) { - wake_up_interruptible(&buf->read_wait); - wake_up_interruptible(&chan->read_wait); - } - if (chan->switch_timer_interval) - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); - raw_spin_unlock(&buf->raw_tick_nohz_spinlock); - break; - case TICK_NOHZ_STOP: - spin_lock(&__get_cpu_var(ring_buffer_nohz_lock)); - lib_ring_buffer_stop_switch_timer(buf); - lib_ring_buffer_stop_read_timer(buf); - spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock)); - break; - case TICK_NOHZ_RESTART: - spin_lock(&__get_cpu_var(ring_buffer_nohz_lock)); - lib_ring_buffer_start_read_timer(buf); - lib_ring_buffer_start_switch_timer(buf); - spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock)); - break; - } - - return 0; -} - -void notrace lib_ring_buffer_tick_nohz_flush(void) -{ - atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_FLUSH, - NULL); -} - -void notrace lib_ring_buffer_tick_nohz_stop(void) -{ - atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_STOP, - NULL); -} - -void notrace lib_ring_buffer_tick_nohz_restart(void) -{ - atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_RESTART, - NULL); -} -#endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */ - -/* - * Holds CPU hotplug. - */ static void channel_unregister_notifiers(struct channel *chan) { const struct lib_ring_buffer_config *config = chan->backend.config; int cpu; - channel_iterator_unregister_notifiers(chan); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { -#ifdef CONFIG_NO_HZ - /* - * Remove the nohz notifier first, so we are certain we stop - * the timers. - */ - atomic_notifier_chain_unregister(&tick_nohz_notifier, - &chan->tick_nohz_notifier); - /* - * ring_buffer_nohz_lock will not be needed below, because - * we just removed the notifiers, which were the only source of - * concurrency. - */ -#endif /* CONFIG_NO_HZ */ -#ifdef CONFIG_HOTPLUG_CPU - get_online_cpus(); - chan->cpu_hp_enable = 0; - for_each_online_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, - cpu); - lib_ring_buffer_stop_switch_timer(buf); - lib_ring_buffer_stop_read_timer(buf); - } - put_online_cpus(); - unregister_cpu_notifier(&chan->cpu_hp_notifier); -#else for_each_possible_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, - cpu); + struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu]; + lib_ring_buffer_stop_switch_timer(buf); lib_ring_buffer_stop_read_timer(buf); } -#endif } else { - struct lib_ring_buffer *buf = chan->backend.buf; + struct lib_ring_buffer *buf = shmp(chan->backend.buf); lib_ring_buffer_stop_switch_timer(buf); lib_ring_buffer_stop_read_timer(buf); @@ -571,9 +360,8 @@ static void channel_unregister_notifiers(struct channel *chan) static void channel_free(struct channel *chan) { - channel_iterator_free(chan); channel_backend_free(&chan->backend); - kfree(chan); + free(chan); } /** @@ -590,6 +378,7 @@ static void channel_free(struct channel *chan) * padding to let readers get those sub-buffers. * Used for live streaming. * @read_timer_interval: Time interval (in us) to wake up pending readers. + * @shmid: shared memory ID (output) * * Holds cpu hotplug. * Returns NULL on failure. @@ -598,79 +387,104 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config, const char *name, void *priv, void *buf_addr, size_t subbuf_size, size_t num_subbuf, unsigned int switch_timer_interval, - unsigned int read_timer_interval) + unsigned int read_timer_interval, + int *shmid) { int ret, cpu; struct channel *chan; + size_t shmsize, bufshmsize; + struct shm_header *shm_header; + unsigned long num_subbuf_alloc; if (lib_ring_buffer_check_config(config, switch_timer_interval, read_timer_interval)) return NULL; - chan = kzalloc(sizeof(struct channel), GFP_KERNEL); - if (!chan) + /* Calculate the shm allocation layout */ + shmsize = sizeof(struct shm_header); + shmsize += sizeof(struct channel); + + /* Per-cpu buffer size: control (prior to backend) */ + bufshmsize = sizeof(struct lib_ring_buffer); + shmsize += bufshmsize * num_possible_cpus(); + + /* Per-cpu buffer size: backend */ + /* num_subbuf + 1 is the worse case */ + num_subbuf_alloc = num_subbuf + 1; + bufshmsize = sizeof(struct lib_ring_buffer_backend_pages *) * num_subbuf_alloc; + bufshmsize += subbuf_size * (num_subbuf_alloc); + bufshmsize += (sizeof(struct lib_ring_buffer_backend_pages) + subbuf_size) * num_subbuf_alloc; + bufshmsize += sizeof(struct lib_ring_buffer_backend_subbuffer) * num_subbuf; + shmsize += bufshmsize * num_possible_cpus(); + + /* Per-cpu buffer size: control (after backend) */ + bufshmsize += sizeof(struct commit_counters_hot) * num_subbuf; + bufshmsize += sizeof(struct commit_counters_cold) * num_subbuf; + + /* Allocate shm */ + *shmid = shmget(getpid(), shmsize, IPC_CREAT | IPC_EXCL | 0700); + if (*shmid < 0) { + if (errno == EINVAL) + ERR("shmget() returned EINVAL; maybe /proc/sys/kernel/shmmax should be increased."); + else + PERROR("shmget"); return NULL; + } - ret = channel_backend_init(&chan->backend, name, config, priv, - subbuf_size, num_subbuf); - if (ret) - goto error; + shm_header = shmat(*shmid, NULL, 0); + if (shm_header == (void *) -1) { + perror("shmat"); + goto destroy_shmem; + } - ret = channel_iterator_init(chan); + /* Already mark the shared memory for destruction. This will occur only + * when all users have detached. + */ + ret = shmctl(*shmid, IPC_RMID, NULL); + if (ret == -1) { + perror("shmctl"); + goto destroy_shmem; + } + + shm_header->magic = SHM_MAGIC; + shm_header->major = SHM_MAJOR; + shm_header->major = SHM_MINOR; + shm_header->bits_per_long = CAA_BITS_PER_LONG; + shm_header->shm_size = shmsize; + shm_header->shm_allocated = sizeof(struct shm_header); + + chan = zalloc_shm(shm_header, sizeof(struct channel)); + if (!chan) + goto destroy_shmem; + set_shmp(shm_header->chan, chan); + + ret = channel_backend_init(&chan->backend, name, config, priv, + subbuf_size, num_subbuf, shm_header); if (ret) - goto error_free_backend; + goto destroy_shmem; chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order); - chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval); - chan->read_timer_interval = usecs_to_jiffies(read_timer_interval); - kref_init(&chan->ref); - init_waitqueue_head(&chan->read_wait); - init_waitqueue_head(&chan->hp_wait); + //TODO + //chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval); + //chan->read_timer_interval = usecs_to_jiffies(read_timer_interval); + urcu_ref_init(&chan->ref); + //TODO + //init_waitqueue_head(&chan->read_wait); + //init_waitqueue_head(&chan->hp_wait); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { -#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) - /* Only benefit from NO_HZ idle with per-cpu buffers for now. */ - chan->tick_nohz_notifier.notifier_call = - ring_buffer_tick_nohz_callback; - chan->tick_nohz_notifier.priority = ~0U; - atomic_notifier_chain_register(&tick_nohz_notifier, - &chan->tick_nohz_notifier); -#endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */ - /* * In case of non-hotplug cpu, if the ring-buffer is allocated * in early initcall, it will not be notified of secondary cpus. * In that off case, we need to allocate for all possible cpus. */ -#ifdef CONFIG_HOTPLUG_CPU - chan->cpu_hp_notifier.notifier_call = - lib_ring_buffer_cpu_hp_callback; - chan->cpu_hp_notifier.priority = 6; - register_cpu_notifier(&chan->cpu_hp_notifier); - - get_online_cpus(); - for_each_online_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, - cpu); - spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu)); - lib_ring_buffer_start_switch_timer(buf); - lib_ring_buffer_start_read_timer(buf); - spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu)); - } - chan->cpu_hp_enable = 1; - put_online_cpus(); -#else for_each_possible_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, - cpu); - spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu)); + struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu]; lib_ring_buffer_start_switch_timer(buf); lib_ring_buffer_start_read_timer(buf); - spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu)); } -#endif } else { - struct lib_ring_buffer *buf = chan->backend.buf; + struct lib_ring_buffer *buf = shmp(chan->backend.buf); lib_ring_buffer_start_switch_timer(buf); lib_ring_buffer_start_read_timer(buf); @@ -678,18 +492,18 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config, return chan; -error_free_backend: - channel_backend_free(&chan->backend); -error: - kfree(chan); +destroy_shmem: + ret = shmctl(*shmid, IPC_RMID, NULL); + if (ret == -1) { + perror("shmctl"); + } return NULL; } -EXPORT_SYMBOL_GPL(channel_create); static -void channel_release(struct kref *kref) +void channel_release(struct urcu_ref *ref) { - struct channel *chan = caa_container_of(kref, struct channel, ref); + struct channel *chan = caa_container_of(ref, struct channel, ref); channel_free(chan); } @@ -713,13 +527,8 @@ void *channel_destroy(struct channel *chan) channel_unregister_notifiers(chan); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { - /* - * No need to hold cpu hotplug, because all notifiers have been - * unregistered. - */ for_each_channel_cpu(cpu, chan) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, - cpu); + struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu]; if (config->cb.buffer_finalize) config->cb.buffer_finalize(buf, @@ -730,12 +539,12 @@ void *channel_destroy(struct channel *chan) /* * Perform flush before writing to finalized. */ - smp_wmb(); + cmm_smp_wmb(); CMM_ACCESS_ONCE(buf->finalized) = 1; - wake_up_interruptible(&buf->read_wait); + //wake_up_interruptible(&buf->read_wait); } } else { - struct lib_ring_buffer *buf = chan->backend.buf; + struct lib_ring_buffer *buf = shmp(chan->backend.buf); if (config->cb.buffer_finalize) config->cb.buffer_finalize(buf, chan->backend.priv, -1); @@ -744,62 +553,47 @@ void *channel_destroy(struct channel *chan) /* * Perform flush before writing to finalized. */ - smp_wmb(); + cmm_smp_wmb(); CMM_ACCESS_ONCE(buf->finalized) = 1; - wake_up_interruptible(&buf->read_wait); + //wake_up_interruptible(&buf->read_wait); } CMM_ACCESS_ONCE(chan->finalized) = 1; - wake_up_interruptible(&chan->hp_wait); - wake_up_interruptible(&chan->read_wait); - kref_put(&chan->ref, channel_release); + //wake_up_interruptible(&chan->hp_wait); + //wake_up_interruptible(&chan->read_wait); + urcu_ref_put(&chan->ref, channel_release); priv = chan->backend.priv; return priv; } -EXPORT_SYMBOL_GPL(channel_destroy); struct lib_ring_buffer *channel_get_ring_buffer( const struct lib_ring_buffer_config *config, struct channel *chan, int cpu) { if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) - return chan->backend.buf; + return shmp(chan->backend.buf); else - return per_cpu_ptr(chan->backend.buf, cpu); + return &shmp(chan->backend.buf)[cpu]; } -EXPORT_SYMBOL_GPL(channel_get_ring_buffer); int lib_ring_buffer_open_read(struct lib_ring_buffer *buf) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); - if (!atomic_long_add_unless(&buf->active_readers, 1, 1)) + if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0) return -EBUSY; - kref_get(&chan->ref); - smp_mb__after_atomic_inc(); + urcu_ref_get(&chan->ref); + cmm_smp_mb(); return 0; } -EXPORT_SYMBOL_GPL(lib_ring_buffer_open_read); void lib_ring_buffer_release_read(struct lib_ring_buffer *buf) { - struct channel *chan = buf->backend.chan; - - CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1); - smp_mb__before_atomic_dec(); - atomic_long_dec(&buf->active_readers); - kref_put(&chan->ref, channel_release); -} -EXPORT_SYMBOL_GPL(lib_ring_buffer_release_read); + struct channel *chan = shmp(buf->backend.chan); -/* - * Promote compiler barrier to a smp_mb(). - * For the specific ring buffer case, this IPI call should be removed if the - * architecture does not reorder writes. This should eventually be provided by - * a separate architecture-specific infrastructure. - */ -static void remote_mb(void *info) -{ - smp_mb(); + CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1); + cmm_smp_mb(); + uatomic_dec(&buf->active_readers); + urcu_ref_put(&chan->ref, channel_release); } /** @@ -810,24 +604,22 @@ static void remote_mb(void *info) * * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no * data to read at consumed position, or 0 if the get operation succeeds. - * Busy-loop trying to get data if the tick_nohz sequence lock is held. */ int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf, unsigned long *consumed, unsigned long *produced) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; unsigned long consumed_cur, write_offset; int finalized; -retry: finalized = CMM_ACCESS_ONCE(buf->finalized); /* * Read finalized before counters. */ - smp_rmb(); - consumed_cur = atomic_long_read(&buf->consumed); + cmm_smp_rmb(); + consumed_cur = uatomic_read(&buf->consumed); /* * No need to issue a memory barrier between consumed count read and * write offset read, because consumed count can only change @@ -858,12 +650,9 @@ nodata: */ if (finalized) return -ENODATA; - else if (raw_spin_is_locked(&buf->raw_tick_nohz_spinlock)) - goto retry; else return -EAGAIN; } -EXPORT_SYMBOL_GPL(lib_ring_buffer_snapshot); /** * lib_ring_buffer_put_snapshot - move consumed counter forward @@ -874,22 +663,21 @@ void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf, unsigned long consumed_new) { struct lib_ring_buffer_backend *bufb = &buf->backend; - struct channel *chan = bufb->chan; + struct channel *chan = shmp(bufb->chan); unsigned long consumed; - CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1); + CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1); /* * Only push the consumed value forward. * If the consumed cmpxchg fails, this is because we have been pushed by * the writer in flight recorder mode. */ - consumed = atomic_long_read(&buf->consumed); + consumed = uatomic_read(&buf->consumed); while ((long) consumed - (long) consumed_new < 0) - consumed = atomic_long_cmpxchg(&buf->consumed, consumed, - consumed_new); + consumed = uatomic_cmpxchg(&buf->consumed, consumed, + consumed_new); } -EXPORT_SYMBOL_GPL(lib_ring_buffer_move_consumer); /** * lib_ring_buffer_get_subbuf - get exclusive access to subbuffer for reading @@ -898,12 +686,11 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_move_consumer); * * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no * data to read at consumed position, or 0 if the get operation succeeds. - * Busy-loop trying to get data if the tick_nohz sequence lock is held. */ int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf, unsigned long consumed) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; unsigned long consumed_cur, consumed_idx, commit_count, write_offset; int ret; @@ -914,72 +701,21 @@ retry: /* * Read finalized before counters. */ - smp_rmb(); - consumed_cur = atomic_long_read(&buf->consumed); + cmm_smp_rmb(); + consumed_cur = uatomic_read(&buf->consumed); consumed_idx = subbuf_index(consumed, chan); - commit_count = v_read(config, &buf->commit_cold[consumed_idx].cc_sb); + commit_count = v_read(config, &shmp(buf->commit_cold)[consumed_idx].cc_sb); /* * Make sure we read the commit count before reading the buffer * data and the write offset. Correct consumed offset ordering * wrt commit count is insured by the use of cmpxchg to update * the consumed offset. - * smp_call_function_single can fail if the remote CPU is offline, - * this is OK because then there is no wmb to execute there. - * If our thread is executing on the same CPU as the on the buffers - * belongs to, we don't have to synchronize it at all. If we are - * migrated, the scheduler will take care of the memory barriers. - * Normally, smp_call_function_single() should ensure program order when - * executing the remote function, which implies that it surrounds the - * function execution with : - * smp_mb() - * send IPI - * csd_lock_wait - * recv IPI - * smp_mb() - * exec. function - * smp_mb() - * csd unlock - * smp_mb() - * - * However, smp_call_function_single() does not seem to clearly execute - * such barriers. It depends on spinlock semantic to provide the barrier - * before executing the IPI and, when busy-looping, csd_lock_wait only - * executes smp_mb() when it has to wait for the other CPU. - * - * I don't trust this code. Therefore, let's add the smp_mb() sequence - * required ourself, even if duplicated. It has no performance impact - * anyway. - * - * smp_mb() is needed because smp_rmb() and smp_wmb() only order read vs - * read and write vs write. They do not ensure core synchronization. We - * really have to ensure total order between the 3 barriers running on - * the 2 CPUs. */ - if (config->ipi == RING_BUFFER_IPI_BARRIER) { - if (config->sync == RING_BUFFER_SYNC_PER_CPU - && config->alloc == RING_BUFFER_ALLOC_PER_CPU) { - if (raw_smp_processor_id() != buf->backend.cpu) { - /* Total order with IPI handler smp_mb() */ - smp_mb(); - smp_call_function_single(buf->backend.cpu, - remote_mb, NULL, 1); - /* Total order with IPI handler smp_mb() */ - smp_mb(); - } - } else { - /* Total order with IPI handler smp_mb() */ - smp_mb(); - smp_call_function(remote_mb, NULL, 1); - /* Total order with IPI handler smp_mb() */ - smp_mb(); - } - } else { - /* - * Local rmb to match the remote wmb to read the commit count - * before the buffer data and the write offset. - */ - smp_rmb(); - } + /* + * Local rmb to match the remote wmb to read the commit count + * before the buffer data and the write offset. + */ + cmm_smp_rmb(); write_offset = v_read(config, &buf->offset); @@ -1035,12 +771,9 @@ nodata: */ if (finalized) return -ENODATA; - else if (raw_spin_is_locked(&buf->raw_tick_nohz_spinlock)) - goto retry; else return -EAGAIN; } -EXPORT_SYMBOL_GPL(lib_ring_buffer_get_subbuf); /** * lib_ring_buffer_put_subbuf - release exclusive subbuffer access @@ -1049,11 +782,11 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_get_subbuf); void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf) { struct lib_ring_buffer_backend *bufb = &buf->backend; - struct channel *chan = bufb->chan; + struct channel *chan = shmp(bufb->chan); const struct lib_ring_buffer_config *config = chan->backend.config; unsigned long read_sb_bindex, consumed_idx, consumed; - CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1); + CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1); if (!buf->get_subbuf) { /* @@ -1074,9 +807,9 @@ void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf) */ read_sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id); v_add(config, v_read(config, - &bufb->array[read_sb_bindex]->records_unread), + &shmp(bufb->array)[read_sb_bindex]->records_unread), &bufb->records_read); - v_set(config, &bufb->array[read_sb_bindex]->records_unread, 0); + v_set(config, &shmp(bufb->array)[read_sb_bindex]->records_unread, 0); CHAN_WARN_ON(chan, config->mode == RING_BUFFER_OVERWRITE && subbuffer_id_is_noref(config, bufb->buf_rsb.id)); subbuffer_id_set_noref(config, &bufb->buf_rsb.id); @@ -1097,7 +830,6 @@ void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf) * if the writer concurrently updated it. */ } -EXPORT_SYMBOL_GPL(lib_ring_buffer_put_subbuf); /* * cons_offset is an iterator on all subbuffer offsets between the reader @@ -1113,12 +845,11 @@ void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf, unsigned long cons_idx, commit_count, commit_count_sb; cons_idx = subbuf_index(cons_offset, chan); - commit_count = v_read(config, &buf->commit_hot[cons_idx].cc); - commit_count_sb = v_read(config, &buf->commit_cold[cons_idx].cc_sb); + commit_count = v_read(config, &shmp(buf->commit_hot)[cons_idx].cc); + commit_count_sb = v_read(config, &shmp(buf->commit_cold)[cons_idx].cc_sb); if (subbuf_offset(commit_count, chan) != 0) - printk(KERN_WARNING - "ring buffer %s, cpu %d: " + ERRMSG("ring buffer %s, cpu %d: " "commit count in subbuffer %lu,\n" "expecting multiples of %lu bytes\n" " [ %lu bytes committed, %lu bytes reader-visible ]\n", @@ -1126,7 +857,7 @@ void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf, chan->backend.subbuf_size, commit_count, commit_count_sb); - printk(KERN_DEBUG "ring buffer: %s, cpu %d: %lu bytes committed\n", + ERRMSG("ring buffer: %s, cpu %d: %lu bytes committed\n", chan->backend.name, cpu, commit_count); } @@ -1150,15 +881,14 @@ void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf, * references are left. */ write_offset = v_read(config, &buf->offset); - cons_offset = atomic_long_read(&buf->consumed); + cons_offset = uatomic_read(&buf->consumed); if (write_offset != cons_offset) - printk(KERN_WARNING - "ring buffer %s, cpu %d: " + ERRMSG("ring buffer %s, cpu %d: " "non-consumed data\n" " [ %lu bytes written, %lu bytes read ]\n", chan->backend.name, cpu, write_offset, cons_offset); - for (cons_offset = atomic_long_read(&buf->consumed); + for (cons_offset = uatomic_read(&buf->consumed); (long) (subbuf_trunc((unsigned long) v_read(config, &buf->offset), chan) - cons_offset) > 0; @@ -1174,7 +904,7 @@ void lib_ring_buffer_print_errors(struct channel *chan, const struct lib_ring_buffer_config *config = chan->backend.config; void *priv = chan->backend.priv; - printk(KERN_DEBUG "ring buffer %s, cpu %d: %lu records written, " + ERRMSG("ring buffer %s, cpu %d: %lu records written, " "%lu records overrun\n", chan->backend.name, cpu, v_read(config, &buf->records_count), @@ -1183,8 +913,7 @@ void lib_ring_buffer_print_errors(struct channel *chan, if (v_read(config, &buf->records_lost_full) || v_read(config, &buf->records_lost_wrap) || v_read(config, &buf->records_lost_big)) - printk(KERN_WARNING - "ring buffer %s, cpu %d: records were lost. Caused by:\n" + ERRMSG("ring buffer %s, cpu %d: records were lost. Caused by:\n" " [ %lu buffer full, %lu nest buffer wrap-around, " "%lu event too big ]\n", chan->backend.name, cpu, @@ -1216,18 +945,10 @@ void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf, * Order all writes to buffer before the commit count update that will * determine that the subbuffer is full. */ - if (config->ipi == RING_BUFFER_IPI_BARRIER) { - /* - * Must write slot data before incrementing commit count. This - * compiler barrier is upgraded into a smp_mb() by the IPI sent - * by get_subbuf(). - */ - barrier(); - } else - smp_wmb(); + cmm_smp_wmb(); v_add(config, config->cb.subbuffer_header_size(), - &buf->commit_hot[oldidx].cc); - commit_count = v_read(config, &buf->commit_hot[oldidx].cc); + &shmp(buf->commit_hot)[oldidx].cc); + commit_count = v_read(config, &shmp(buf->commit_hot)[oldidx].cc); /* Check if the written buffer has to be delivered */ lib_ring_buffer_check_deliver(config, buf, chan, offsets->old, commit_count, oldidx); @@ -1262,17 +983,9 @@ void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf, * Order all writes to buffer before the commit count update that will * determine that the subbuffer is full. */ - if (config->ipi == RING_BUFFER_IPI_BARRIER) { - /* - * Must write slot data before incrementing commit count. This - * compiler barrier is upgraded into a smp_mb() by the IPI sent - * by get_subbuf(). - */ - barrier(); - } else - smp_wmb(); - v_add(config, padding_size, &buf->commit_hot[oldidx].cc); - commit_count = v_read(config, &buf->commit_hot[oldidx].cc); + cmm_smp_wmb(); + v_add(config, padding_size, &shmp(buf->commit_hot)[oldidx].cc); + commit_count = v_read(config, &shmp(buf->commit_hot)[oldidx].cc); lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1, commit_count, oldidx); lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx, @@ -1303,18 +1016,10 @@ void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf, * Order all writes to buffer before the commit count update that will * determine that the subbuffer is full. */ - if (config->ipi == RING_BUFFER_IPI_BARRIER) { - /* - * Must write slot data before incrementing commit count. This - * compiler barrier is upgraded into a smp_mb() by the IPI sent - * by get_subbuf(). - */ - barrier(); - } else - smp_wmb(); + cmm_smp_wmb(); v_add(config, config->cb.subbuffer_header_size(), - &buf->commit_hot[beginidx].cc); - commit_count = v_read(config, &buf->commit_hot[beginidx].cc); + &shmp(buf->commit_hot)[beginidx].cc); + commit_count = v_read(config, &shmp(buf->commit_hot)[beginidx].cc); /* Check if the written buffer has to be delivered */ lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin, commit_count, beginidx); @@ -1347,17 +1052,9 @@ void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf, * Order all writes to buffer before the commit count update that will * determine that the subbuffer is full. */ - if (config->ipi == RING_BUFFER_IPI_BARRIER) { - /* - * Must write slot data before incrementing commit count. This - * compiler barrier is upgraded into a smp_mb() by the IPI sent - * by get_subbuf(). - */ - barrier(); - } else - smp_wmb(); - v_add(config, padding_size, &buf->commit_hot[endidx].cc); - commit_count = v_read(config, &buf->commit_hot[endidx].cc); + cmm_smp_wmb(); + v_add(config, padding_size, &shmp(buf->commit_hot)[endidx].cc); + commit_count = v_read(config, &shmp(buf->commit_hot)[endidx].cc); lib_ring_buffer_check_deliver(config, buf, chan, offsets->end - 1, commit_count, endidx); lib_ring_buffer_write_commit_counter(config, buf, chan, endidx, @@ -1398,10 +1095,10 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, * The next record that reserves space will be responsible for * populating the following subbuffer header. We choose not to populate * the next subbuffer header here because we want to be able to use - * SWITCH_ACTIVE for periodical buffer flush and CPU tick_nohz stop - * buffer flush, which must guarantee that all the buffer content - * (records and header timestamps) are visible to the reader. This is - * required for quiescence guarantees for the fusion merge. + * SWITCH_ACTIVE for periodical buffer flush, which must + * guarantee that all the buffer content (records and header + * timestamps) are visible to the reader. This is required for + * quiescence guarantees for the fusion merge. */ if (mode == SWITCH_FLUSH || off > 0) { if (unlikely(off == 0)) { @@ -1435,7 +1132,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, */ void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; struct switch_offsets offsets; unsigned long oldidx; @@ -1482,7 +1179,6 @@ void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode m */ lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc); } -EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow); /* * Returns : @@ -1547,14 +1243,14 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, (buf_trunc(offsets->begin, chan) >> chan->backend.num_subbuf_order) - ((unsigned long) v_read(config, - &buf->commit_cold[sb_index].cc_sb) + &shmp(buf->commit_cold)[sb_index].cc_sb) & chan->commit_count_mask); if (likely(reserve_commit_diff == 0)) { /* Next subbuffer not being written to. */ if (unlikely(config->mode != RING_BUFFER_OVERWRITE && subbuf_trunc(offsets->begin, chan) - subbuf_trunc((unsigned long) - atomic_long_read(&buf->consumed), chan) + uatomic_read(&buf->consumed), chan) >= chan->backend.buf_size)) { /* * We do not overwrite non consumed buffers @@ -1638,9 +1334,9 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) int ret; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - buf = per_cpu_ptr(chan->backend.buf, ctx->cpu); + buf = &shmp(chan->backend.buf)[ctx->cpu]; else - buf = chan->backend.buf; + buf = shmp(chan->backend.buf); ctx->buf = buf; offsets.size = 0; @@ -1696,4 +1392,3 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) ctx->buf_offset = offsets.begin + offsets.pre_header_padding; return 0; } -EXPORT_SYMBOL_GPL(lib_ring_buffer_reserve_slow);