unsigned long *consumed);
typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
unsigned long *produced);
+typedef int (*flush_index_cb)(struct lttng_consumer_stream *stream);
static struct timer_signal_data timer_signal = {
.tid = 0,
return ret;
}
-static int check_kernel_stream(struct lttng_consumer_stream *stream)
+static int check_stream(struct lttng_consumer_stream *stream,
+ flush_index_cb flush_index)
{
int ret;
}
break;
}
- ret = consumer_flush_kernel_index(stream);
+ ret = flush_index(stream);
pthread_mutex_unlock(&stream->lock);
end:
return ret;
return ret;
}
-static int check_ust_stream(struct lttng_consumer_stream *stream)
-{
- int ret;
-
- assert(stream);
- assert(stream->ustream);
- /*
- * While holding the stream mutex, try to take a snapshot, if it
- * succeeds, it means that data is ready to be sent, just let the data
- * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
- * means that there is no data to read after the flush, so we can
- * safely send the empty index.
- *
- * Doing a trylock and checking if waiting on metadata if
- * trylock fails. Bail out of the stream is indeed waiting for
- * metadata to be pushed. Busy wait on trylock otherwise.
- */
- for (;;) {
- ret = pthread_mutex_trylock(&stream->lock);
- switch (ret) {
- case 0:
- break; /* We have the lock. */
- case EBUSY:
- pthread_mutex_lock(&stream->metadata_timer_lock);
- if (stream->waiting_on_metadata) {
- ret = 0;
- stream->missed_metadata_flush = true;
- pthread_mutex_unlock(&stream->metadata_timer_lock);
- goto end; /* Bail out. */
- }
- pthread_mutex_unlock(&stream->metadata_timer_lock);
- /* Try again. */
- caa_cpu_relax();
- continue;
- default:
- ERR("Unexpected pthread_mutex_trylock error %d", ret);
- ret = -1;
- goto end;
- }
- break;
- }
- ret = consumer_flush_ust_index(stream);
- pthread_mutex_unlock(&stream->lock);
-end:
- return ret;
-}
-
/*
* Execute action on a live timer
*/
int ret;
struct lttng_consumer_channel *channel;
struct lttng_consumer_stream *stream;
- struct lttng_ht *ht;
struct lttng_ht_iter iter;
+ const struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+ const flush_index_cb flush_index =
+ ctx->type == LTTNG_CONSUMER_KERNEL ?
+ consumer_flush_kernel_index :
+ consumer_flush_ust_index;
channel = si->si_value.sival_ptr;
assert(channel);
if (channel->switch_timer_error) {
goto error;
}
- ht = consumer_data.stream_per_chan_id_ht;
DBG("Live timer for channel %" PRIu64, channel->key);
rcu_read_lock();
- switch (ctx->type) {
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct, &channel->key, &iter.iter,
- stream, node_channel_id.node) {
- ret = check_ust_stream(stream);
- if (ret < 0) {
- goto error_unlock;
- }
- }
- break;
- case LTTNG_CONSUMER_KERNEL:
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct, &channel->key, &iter.iter,
- stream, node_channel_id.node) {
- ret = check_kernel_stream(stream);
- if (ret < 0) {
- goto error_unlock;
- }
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct, &channel->key, &iter.iter,
+ stream, node_channel_id.node) {
+ ret = check_stream(stream, flush_index);
+ if (ret < 0) {
+ goto error_unlock;
}
- break;
- case LTTNG_CONSUMER_UNKNOWN:
- assert(0);
- break;
}
error_unlock: