int net_index,
unsigned int metadata_flag,
const char *name,
- const char *pathname)
+ const char *pathname,
+ unsigned int session_id)
{
assert(msg);
msg->u.stream.gid = gid;
msg->u.stream.net_index = net_index;
msg->u.stream.metadata_flag = metadata_flag;
+ msg->u.stream.session_id = (uint64_t) session_id;
strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name));
msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0';
strncpy(msg->u.stream.path_name, pathname,
int net_index,
unsigned int metadata_flag,
const char *name,
- const char *pathname);
+ const char *pathname,
+ unsigned int session_id);
void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
enum lttng_consumer_command cmd,
int channel_key,
consumer->net_seq_index,
1, /* Metadata flag set */
"metadata",
- pathname);
+ pathname,
+ session->id);
/* Send stream and file descriptor */
ret = consumer_send_stream(sock, consumer, &lkm,
consumer->net_seq_index,
0, /* Metadata flag unset */
stream->name,
- pathname);
+ pathname,
+ session->id);
/* Send stream and file descriptor */
ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
consumer->net_seq_index,
0, /* Metadata flag unset */
stream->name,
- pathname);
+ pathname,
+ usess->id);
/* Send stream and file descriptor */
fds[0] = stream->obj->shm_fd;
consumer->net_seq_index,
1, /* Flag metadata set */
"metadata",
- pathname);
+ pathname,
+ usess->id);
/* Send stream and file descriptor */
fds[0] = usess->metadata->stream_obj->shm_fd;
iter.iter.node = &stream->node.node;
ret = lttng_ht_del(ht, &iter);
assert(!ret);
+
+ /* Remove node session id from the consumer_data stream ht */
+ iter.iter.node = &stream->node_session_id.node;
+ ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
+ assert(!ret);
rcu_read_unlock();
assert(consumer_data.stream_count > 0);
lttng_ht_add_unique_ulong(ht, &stream->node);
+ /*
+ * Add stream to the stream_list_ht of the consumer data. No need to steal
+ * the key since the HT does not use it and we allow to add redundant keys
+ * into this table.
+ */
+ lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
+
/* Check and cleanup relayd */
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != NULL) {
iter.iter.node = &stream->node.node;
ret = lttng_ht_del(ht, &iter);
assert(!ret);
+
+ /* Remove node session id from the consumer_data stream ht */
+ iter.iter.node = &stream->node_session_id.node;
+ ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
+ assert(!ret);
rcu_read_unlock();
if (stream->out_fd >= 0) {
consumer_steal_stream_key(stream->key, ht);
lttng_ht_add_unique_ulong(ht, &stream->node);
+
+ /*
+ * Add stream to the stream_list_ht of the consumer data. No need to steal
+ * the key since the HT does not use it and we allow to add redundant keys
+ * into this table.
+ */
+ lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
+
rcu_read_unlock();
pthread_mutex_unlock(&consumer_data.lock);
error:
return ret;
}
+
+/*
+ * Check if for a given session id there is still data needed to be extract
+ * from the buffers.
+ *
+ * Return 1 if data is in fact available to be read or else 0.
+ */
+int consumer_data_available(uint64_t id)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht;
+ struct lttng_consumer_stream *stream;
+ int (*data_available)(struct lttng_consumer_stream *);
+
+ DBG("Consumer data available command on session id %" PRIu64, id);
+
+ pthread_mutex_lock(&consumer_data.lock);
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ data_available = lttng_kconsumer_data_available;
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ data_available = lttng_ustconsumer_data_available;
+ break;
+ default:
+ ERR("Unknown consumer data type");
+ assert(0);
+ }
+
+ /* Ease our life a bit */
+ ht = consumer_data.stream_list_ht;
+
+ cds_lfht_for_each_entry_duplicate(ht->ht, (long unsigned int) ht->hash_fct,
+ ht->match_fct, (void *)((unsigned long) id),
+ &iter.iter, stream, node_session_id.node) {
+ /* Check the stream for data. */
+ ret = data_available(stream);
+ if (ret == 0) {
+ goto data_not_available;
+ }
+ }
+
+ /* TODO: Support to ask the relayd if the streams are remote */
+
+ /*
+ * Finding _no_ node in the hash table means that the stream(s) have been
+ * removed thus data is guaranteed to be available for analysis from the
+ * trace files. This is *only* true for local consumer and not network
+ * streaming.
+ */
+
+ /* Data is available to be read by a viewer. */
+ pthread_mutex_unlock(&consumer_data.lock);
+ return 1;
+
+data_not_available:
+ /* Data is still being extracted from buffers. */
+ pthread_mutex_unlock(&consumer_data.lock);
+ return 0;
+}
struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock);
void consumer_flag_relayd_for_destroy(
struct consumer_relayd_sock_pair *relayd);
+int consumer_data_available(uint64_t id);
#endif /* LIB_CONSUMER_H */
return ret;
}
+/*
+ * Check if data is still being extracted from the buffers for a specific
+ * stream. Consumer data lock MUST be acquired before calling this function.
+ *
+ * Return 0 if the traced data are still getting read else 1 meaning that the
+ * data is available for trace viewer reading.
+ */
+int lttng_kconsumer_data_available(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ assert(stream);
+
+ /*
+ * Try to lock the stream mutex. On failure, we know that the stream is
+ * being used else where hence there is data still being extracted.
+ */
+ ret = pthread_mutex_trylock(&stream->lock);
+ if (ret == EBUSY) {
+ goto data_not_available;
+ }
+ /* The stream is now locked so we can do our ustctl calls */
+
+ ret = kernctl_get_next_subbuf(stream->wait_fd);
+ if (ret == 0) {
+ /* There is still data so let's put back this subbuffer. */
+ ret = kernctl_put_subbuf(stream->wait_fd);
+ assert(ret == 0);
+ pthread_mutex_unlock(&stream->lock);
+ goto data_not_available;
+ }
+
+ /* Data is available to be read for this stream. */
+ pthread_mutex_unlock(&stream->lock);
+ return 1;
+
+data_not_available:
+ return 0;
+}
ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx);
int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream);
+int lttng_kconsumer_data_available(struct lttng_consumer_stream *stream);
#endif /* _LTTNG_KCONSUMER_H */
}
case LTTNG_CONSUMER_DATA_AVAILABLE:
{
- rcu_read_unlock();
- return -ENOSYS;
+ int32_t ret;
+ uint64_t id = msg.u.data_available.session_id;
+
+ DBG("UST consumer data available command for id %" PRIu64, id);
+
+ ret = consumer_data_available(id);
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ if (ret < 0) {
+ PERROR("send data available ret code");
+ }
+ break;
}
default:
break;
error:
return ret;
}
+
+/*
+ * Check if data is still being extracted from the buffers for a specific
+ * stream. Consumer data lock MUST be acquired before calling this function.
+ *
+ * Return 0 if the traced data are still getting read else 1 meaning that the
+ * data is available for trace viewer reading.
+ */
+int lttng_ustconsumer_data_available(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ assert(stream);
+
+ /*
+ * Try to lock the stream mutex. On failure, we know that the stream is
+ * being used else where hence there is data still being extracted.
+ */
+ ret = pthread_mutex_trylock(&stream->lock);
+ if (ret == EBUSY) {
+ goto data_not_available;
+ }
+ /* The stream is now locked so we can do our ustctl calls */
+
+ ret = ustctl_get_next_subbuf(stream->chan->handle, stream->buf);
+ if (ret == 0) {
+ /* There is still data so let's put back this subbuffer. */
+ ret = ustctl_put_subbuf(stream->chan->handle, stream->buf);
+ assert(ret == 0);
+ pthread_mutex_unlock(&stream->lock);
+ goto data_not_available;
+ }
+
+ /* Data is available to be read for this stream. */
+ pthread_mutex_unlock(&stream->lock);
+ return 1;
+
+data_not_available:
+ return 0;
+}
extern int lttng_ustctl_get_mmap_read_offset(
struct lttng_ust_shm_handle *handle,
struct lttng_ust_lib_ring_buffer *buf, unsigned long *off);
+int lttng_ustconsumer_data_available(struct lttng_consumer_stream *stream);
#else /* HAVE_LIBLTTNG_UST_CTL */
{
return -ENOSYS;
}
+static inline
+int lttng_ustconsumer_data_available(struct lttng_consumer_stream *stream)
+{
+ return -ENOSYS;
+}
#endif /* HAVE_LIBLTTNG_UST_CTL */
#endif /* _LTTNG_USTCONSUMER_H */