X-Git-Url: http://git.lttng.org./?a=blobdiff_plain;ds=sidebyside;f=libringbuffer%2Fring_buffer_frontend.c;h=8c8126025878ed72929bf6ecd8ebedf9b2806148;hb=3962118d1f08fd33ad6adfad10452962c3662bd9;hp=ea73bcc676351ac76f25eeae85282c1aa17bc25a;hpb=d1a996f5524540ebc8caa7c3c1bd3049018d212c;p=lttng-ust.git diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index ea73bcc6..8c812602 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -128,15 +128,21 @@ void lib_ring_buffer_print_errors(struct channel *chan, * Handle timer teardown race wrt memory free of private data by * ring buffer signals are handled by a single thread, which permits * a synchronization point between handling of each signal. - * Protected by the ust mutex. + * Protected by the lock within the structure. */ struct timer_signal_data { pthread_t tid; /* thread id managing signals */ int setup_done; int qs_done; + pthread_mutex_t lock; }; -static struct timer_signal_data timer_signal; +static struct timer_signal_data timer_signal = { + .tid = 0, + .setup_done = 0, + .qs_done = 0, + .lock = PTHREAD_MUTEX_INITIALIZER, +}; /** * lib_ring_buffer_reset - Reset ring buffer to initial values. @@ -418,7 +424,6 @@ void *sig_thread(void *arg) } /* - * Called with ust_lock() held. * Ensure only a single thread listens on the timer signal. */ static @@ -427,8 +432,9 @@ void lib_ring_buffer_setup_timer_thread(void) pthread_t thread; int ret; + pthread_mutex_lock(&timer_signal.lock); if (timer_signal.setup_done) - return; + goto end; ret = pthread_create(&thread, NULL, &sig_thread, NULL); if (ret) { @@ -441,11 +447,64 @@ void lib_ring_buffer_setup_timer_thread(void) PERROR("pthread_detach"); } timer_signal.setup_done = 1; +end: + pthread_mutex_unlock(&timer_signal.lock); } /* - * Called with ust_lock() held. + * Wait for signal-handling thread quiescent state. */ +static +void lib_ring_buffer_wait_signal_thread_qs(unsigned int signr) +{ + sigset_t pending_set; + int ret; + + /* + * We need to be the only thread interacting with the thread + * that manages signals for teardown synchronization. + */ + pthread_mutex_lock(&timer_signal.lock); + + /* + * Ensure we don't have any signal queued for this channel. + */ + for (;;) { + ret = sigemptyset(&pending_set); + if (ret == -1) { + PERROR("sigemptyset"); + } + ret = sigpending(&pending_set); + if (ret == -1) { + PERROR("sigpending"); + } + if (!sigismember(&pending_set, signr)) + break; + caa_cpu_relax(); + } + + /* + * From this point, no new signal handler will be fired that + * would try to access "chan". However, we still need to wait + * for any currently executing handler to complete. + */ + cmm_smp_mb(); + CMM_STORE_SHARED(timer_signal.qs_done, 0); + cmm_smp_mb(); + + /* + * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management + * thread wakes up. + */ + kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN); + + while (!CMM_LOAD_SHARED(timer_signal.qs_done)) + caa_cpu_relax(); + cmm_smp_mb(); + + pthread_mutex_unlock(&timer_signal.lock); +} + static void lib_ring_buffer_channel_switch_timer_start(struct channel *chan) { @@ -479,13 +538,9 @@ void lib_ring_buffer_channel_switch_timer_start(struct channel *chan) } } -/* - * Called with ust_lock() held. - */ static void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan) { - sigset_t pending_set; int ret; if (!chan->switch_timer_interval || !chan->switch_timer_enabled) @@ -496,49 +551,12 @@ void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan) PERROR("timer_delete"); } - /* - * Ensure we don't have any signal queued for this channel. - */ - for (;;) { - ret = sigemptyset(&pending_set); - if (ret == -1) { - PERROR("sigemptyset"); - } - ret = sigpending(&pending_set); - if (ret == -1) { - PERROR("sigpending"); - } - if (!sigismember(&pending_set, LTTNG_UST_RB_SIG_FLUSH)) - break; - caa_cpu_relax(); - } - - /* - * From this point, no new signal handler will be fired that - * would try to access "chan". However, we still need to wait - * for any currently executing handler to complete. - */ - cmm_smp_mb(); - CMM_STORE_SHARED(timer_signal.qs_done, 0); - cmm_smp_mb(); - - /* - * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management - * thread wakes up. - */ - kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN); - - while (!CMM_LOAD_SHARED(timer_signal.qs_done)) - caa_cpu_relax(); - cmm_smp_mb(); + lib_ring_buffer_wait_signal_thread_qs(LTTNG_UST_RB_SIG_FLUSH); chan->switch_timer = 0; chan->switch_timer_enabled = 0; } -/* - * Called with ust_lock() held. - */ static void lib_ring_buffer_channel_read_timer_start(struct channel *chan) { @@ -574,14 +592,10 @@ void lib_ring_buffer_channel_read_timer_start(struct channel *chan) } } -/* - * Called with ust_lock() held. - */ static void lib_ring_buffer_channel_read_timer_stop(struct channel *chan) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; - sigset_t pending_set; int ret; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER @@ -599,42 +613,7 @@ void lib_ring_buffer_channel_read_timer_stop(struct channel *chan) */ lib_ring_buffer_channel_do_read(chan); - - /* - * Ensure we don't have any signal queued for this channel. - */ - for (;;) { - ret = sigemptyset(&pending_set); - if (ret == -1) { - PERROR("sigemptyset"); - } - ret = sigpending(&pending_set); - if (ret == -1) { - PERROR("sigpending"); - } - if (!sigismember(&pending_set, LTTNG_UST_RB_SIG_READ)) - break; - caa_cpu_relax(); - } - - /* - * From this point, no new signal handler will be fired that - * would try to access "chan". However, we still need to wait - * for any currently executing handler to complete. - */ - cmm_smp_mb(); - CMM_STORE_SHARED(timer_signal.qs_done, 0); - cmm_smp_mb(); - - /* - * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management - * thread wakes up. - */ - kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN); - - while (!CMM_LOAD_SHARED(timer_signal.qs_done)) - caa_cpu_relax(); - cmm_smp_mb(); + lib_ring_buffer_wait_signal_thread_qs(LTTNG_UST_RB_SIG_READ); chan->read_timer = 0; chan->read_timer_enabled = 0; @@ -1439,8 +1418,10 @@ void lib_ring_buffer_switch_new_start(struct lttng_ust_lib_ring_buffer *buf, /* * lib_ring_buffer_switch_new_end: finish switching current subbuffer * - * The only remaining threads could be the ones with pending commits. They will - * have to do the deliver themselves. + * Calls subbuffer_set_data_size() to set the data size of the current + * sub-buffer. We do not need to perform check_deliver nor commit here, + * since this task will be done by the "commit" of the event for which + * we are currently doing the space reservation. */ static void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf, @@ -1450,26 +1431,12 @@ void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf, struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; - unsigned long endidx = subbuf_index(offsets->end - 1, chan); - unsigned long commit_count, padding_size, data_size; + unsigned long endidx, data_size; + endidx = subbuf_index(offsets->end - 1, chan); data_size = subbuf_offset(offsets->end - 1, chan) + 1; - padding_size = chan->backend.subbuf_size - data_size; subbuffer_set_data_size(config, &buf->backend, endidx, data_size, handle); - - /* - * Order all writes to buffer before the commit count update that will - * determine that the subbuffer is full. - */ - cmm_smp_wmb(); - v_add(config, padding_size, &shmp_index(handle, buf->commit_hot, endidx)->cc); - commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, endidx)->cc); - lib_ring_buffer_check_deliver(config, buf, chan, offsets->end - 1, - commit_count, endidx, handle); - lib_ring_buffer_write_commit_counter(config, buf, chan, endidx, - offsets->end, commit_count, - padding_size, handle); } /*