return ret;
}
+/*
+ * Try to lock the stream mutex.
+ *
+ * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
+ */
+static int stream_try_lock(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ assert(stream);
+
+ /*
+ * Try to lock the stream mutex. On failure, we know that the stream is
+ * being used else where hence there is data still being extracted.
+ */
+ ret = pthread_mutex_trylock(&stream->lock);
+ if (ret) {
+ /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
+ ret = 0;
+ goto end;
+ }
+
+ ret = 1;
+
+end:
+ return ret;
+}
+
/*
* Check if for a given session id there is still data needed to be extract
* from the buffers.
ht->hash_fct((void *)((unsigned long) id), 0x42UL),
ht->match_fct, (void *)((unsigned long) id),
&iter.iter, stream, node_session_id.node) {
- /* Check the stream for data. */
- ret = data_available(stream);
- if (ret == 0) {
+ /* If this call fails, the stream is being used hence data pending. */
+ ret = stream_try_lock(stream);
+ if (!ret) {
goto data_not_available;
}
+ /*
+ * A removed node from the hash table indicates that the stream has
+ * been deleted thus having a guarantee that the buffers are closed
+ * on the consumer side. However, data can still be transmitted
+ * over the network so don't skip the relayd check.
+ */
+ ret = cds_lfht_is_node_deleted(&stream->node.node);
+ if (!ret) {
+ /* Check the stream if there is data in the buffers. */
+ ret = data_available(stream);
+ if (ret == 0) {
+ pthread_mutex_unlock(&stream->lock);
+ goto data_not_available;
+ }
+ }
+
+ /* Relayd check */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
- assert(relayd);
+ if (!relayd) {
+ /*
+ * At this point, if the relayd object is not available for the
+ * given stream, it is because the relayd is being cleaned up
+ * so every stream associated with it (for a session id value)
+ * are or will be marked for deletion hence no data pending.
+ */
+ pthread_mutex_unlock(&stream->lock);
+ goto data_not_available;
+ }
- pthread_mutex_lock(&stream->lock);
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
if (stream->metadata_flag) {
ret = relayd_quiescent_control(&relayd->control_sock);
stream->relayd_stream_id, stream->next_net_seq_num);
}
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- pthread_mutex_unlock(&stream->lock);
if (ret == 0) {
+ pthread_mutex_unlock(&stream->lock);
goto data_not_available;
}
}
+ pthread_mutex_unlock(&stream->lock);
}
/*
/*
* Check if data is still being extracted from the buffers for a specific
- * stream. Consumer data lock MUST be acquired before calling this function.
+ * stream. Consumer data lock MUST be acquired before calling this function
+ * and the stream lock.
*
* Return 0 if the traced data are still getting read else 1 meaning that the
* data is available for trace viewer reading.
assert(stream);
- /*
- * Try to lock the stream mutex. On failure, we know that the stream is
- * being used else where hence there is data still being extracted.
- */
- ret = pthread_mutex_trylock(&stream->lock);
- if (ret == EBUSY) {
- /* Data not available */
- ret = 0;
- goto end;
- }
- /* The stream is now locked so we can do our ustctl calls */
-
ret = kernctl_get_next_subbuf(stream->wait_fd);
if (ret == 0) {
/* There is still data so let's put back this subbuffer. */
ret = kernctl_put_subbuf(stream->wait_fd);
assert(ret == 0);
- goto end_unlock;
+ goto end;
}
/* Data is available to be read for this stream. */
ret = 1;
-end_unlock:
- pthread_mutex_unlock(&stream->lock);
end:
return ret;
}
/*
* Check if data is still being extracted from the buffers for a specific
- * stream. Consumer data lock MUST be acquired before calling this function.
+ * stream. Consumer data lock MUST be acquired before calling this function
+ * and the stream lock.
*
* Return 0 if the traced data are still getting read else 1 meaning that the
* data is available for trace viewer reading.
DBG("UST consumer checking data availability");
- /*
- * Try to lock the stream mutex. On failure, we know that the stream is
- * being used else where hence there is data still being extracted.
- */
- ret = pthread_mutex_trylock(&stream->lock);
- if (ret == EBUSY) {
- /* Data not available */
- ret = 0;
- goto end;
- }
- /* The stream is now locked so we can do our ustctl calls */
-
ret = ustctl_get_next_subbuf(stream->chan->handle, stream->buf);
if (ret == 0) {
/* There is still data so let's put back this subbuffer. */
ret = ustctl_put_subbuf(stream->chan->handle, stream->buf);
assert(ret == 0);
- goto end_unlock;
+ goto end;
}
/* Data is available to be read for this stream. */
ret = 1;
-end_unlock:
- pthread_mutex_unlock(&stream->lock);
end:
return ret;
}