From: David Goulet Date: Thu, 25 Oct 2012 19:48:15 +0000 (-0400) Subject: Fix: Synchronization issue for data available command X-Git-Tag: v2.1.0-rc6~22 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=4e9a468645939ea62541fa802893b928b01888b7;p=lttng-tools.git Fix: Synchronization issue for data available command Signed-off-by: David Goulet --- diff --git a/src/common/consumer.c b/src/common/consumer.c index d245ed7ee..efd9e7eb3 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -2671,6 +2671,34 @@ error: return ret; } +/* + * Try to lock the stream mutex. + * + * On success, 1 is returned else 0 indicating that the mutex is NOT lock. + */ +static int stream_try_lock(struct lttng_consumer_stream *stream) +{ + int ret; + + assert(stream); + + /* + * Try to lock the stream mutex. On failure, we know that the stream is + * being used else where hence there is data still being extracted. + */ + ret = pthread_mutex_trylock(&stream->lock); + if (ret) { + /* For both EBUSY and EINVAL error, the mutex is NOT locked. */ + ret = 0; + goto end; + } + + ret = 1; + +end: + return ret; +} + /* * Check if for a given session id there is still data needed to be extract * from the buffers. @@ -2711,17 +2739,42 @@ int consumer_data_available(uint64_t id) ht->hash_fct((void *)((unsigned long) id), 0x42UL), ht->match_fct, (void *)((unsigned long) id), &iter.iter, stream, node_session_id.node) { - /* Check the stream for data. */ - ret = data_available(stream); - if (ret == 0) { + /* If this call fails, the stream is being used hence data pending. */ + ret = stream_try_lock(stream); + if (!ret) { goto data_not_available; } + /* + * A removed node from the hash table indicates that the stream has + * been deleted thus having a guarantee that the buffers are closed + * on the consumer side. However, data can still be transmitted + * over the network so don't skip the relayd check. + */ + ret = cds_lfht_is_node_deleted(&stream->node.node); + if (!ret) { + /* Check the stream if there is data in the buffers. */ + ret = data_available(stream); + if (ret == 0) { + pthread_mutex_unlock(&stream->lock); + goto data_not_available; + } + } + + /* Relayd check */ if (stream->net_seq_idx != -1) { relayd = consumer_find_relayd(stream->net_seq_idx); - assert(relayd); + if (!relayd) { + /* + * At this point, if the relayd object is not available for the + * given stream, it is because the relayd is being cleaned up + * so every stream associated with it (for a session id value) + * are or will be marked for deletion hence no data pending. + */ + pthread_mutex_unlock(&stream->lock); + goto data_not_available; + } - pthread_mutex_lock(&stream->lock); pthread_mutex_lock(&relayd->ctrl_sock_mutex); if (stream->metadata_flag) { ret = relayd_quiescent_control(&relayd->control_sock); @@ -2730,11 +2783,12 @@ int consumer_data_available(uint64_t id) stream->relayd_stream_id, stream->next_net_seq_num); } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - pthread_mutex_unlock(&stream->lock); if (ret == 0) { + pthread_mutex_unlock(&stream->lock); goto data_not_available; } } + pthread_mutex_unlock(&stream->lock); } /* diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 249df8a47..196deee9a 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -485,7 +485,8 @@ error: /* * Check if data is still being extracted from the buffers for a specific - * stream. Consumer data lock MUST be acquired before calling this function. + * stream. Consumer data lock MUST be acquired before calling this function + * and the stream lock. * * Return 0 if the traced data are still getting read else 1 meaning that the * data is available for trace viewer reading. @@ -496,31 +497,17 @@ int lttng_kconsumer_data_available(struct lttng_consumer_stream *stream) assert(stream); - /* - * Try to lock the stream mutex. On failure, we know that the stream is - * being used else where hence there is data still being extracted. - */ - ret = pthread_mutex_trylock(&stream->lock); - if (ret == EBUSY) { - /* Data not available */ - ret = 0; - goto end; - } - /* The stream is now locked so we can do our ustctl calls */ - ret = kernctl_get_next_subbuf(stream->wait_fd); if (ret == 0) { /* There is still data so let's put back this subbuffer. */ ret = kernctl_put_subbuf(stream->wait_fd); assert(ret == 0); - goto end_unlock; + goto end; } /* Data is available to be read for this stream. */ ret = 1; -end_unlock: - pthread_mutex_unlock(&stream->lock); end: return ret; } diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index e8e3f9396..4d3671a34 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -526,7 +526,8 @@ error: /* * Check if data is still being extracted from the buffers for a specific - * stream. Consumer data lock MUST be acquired before calling this function. + * stream. Consumer data lock MUST be acquired before calling this function + * and the stream lock. * * Return 0 if the traced data are still getting read else 1 meaning that the * data is available for trace viewer reading. @@ -539,31 +540,17 @@ int lttng_ustconsumer_data_available(struct lttng_consumer_stream *stream) DBG("UST consumer checking data availability"); - /* - * Try to lock the stream mutex. On failure, we know that the stream is - * being used else where hence there is data still being extracted. - */ - ret = pthread_mutex_trylock(&stream->lock); - if (ret == EBUSY) { - /* Data not available */ - ret = 0; - goto end; - } - /* The stream is now locked so we can do our ustctl calls */ - ret = ustctl_get_next_subbuf(stream->chan->handle, stream->buf); if (ret == 0) { /* There is still data so let's put back this subbuffer. */ ret = ustctl_put_subbuf(stream->chan->handle, stream->buf); assert(ret == 0); - goto end_unlock; + goto end; } /* Data is available to be read for this stream. */ ret = 1; -end_unlock: - pthread_mutex_unlock(&stream->lock); end: return ret; }