This is just a rename and a change of semantic.
The lttng_data_pending returns 0 if _NO_ data is pending meaning that
the buffers are ready to be read safely. A value of 1 means that data is
still pending so the buffers are not ready for any read.
This is the same semantic as lttng_data_available but in reverse order.
Signed-off-by: David Goulet <dgoulet@efficios.com>
/*
* For a given session name, this call checks if the data is ready to be read
- * or is still being extracted by the consumer(s) hence not ready to be used by
- * any readers.
+ * or is still being extracted by the consumer(s) (pending) hence not ready to
+ * be used by any readers.
*
- * Return 0 if the data is _NOT_ available else 1 if the data is ready. On
- * error, a negative value is returned and readable by lttng_strerror().
+ * Return 0 if there is _no_ data pending in the buffers thus having a
+ * guarantee that the data can be read safely. Else, return 1 if there is still
+ * traced data is pending. On error, a negative value is returned and readable
+ * by lttng_strerror().
*/
-extern int lttng_data_available(const char *session_name);
+extern int lttng_data_pending(const char *session_name);
#ifdef __cplusplus
}
}
/*
- * Check for data availability for a given stream id from the session daemon.
+ * Check for data pending for a given stream id from the session daemon.
*/
static
-int relay_data_available(struct lttcomm_relayd_hdr *recv_hdr,
+int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_command *cmd, struct lttng_ht *streams_ht)
{
struct relay_session *session = cmd->session;
- struct lttcomm_relayd_data_available msg;
+ struct lttcomm_relayd_data_pending msg;
struct lttcomm_relayd_generic_reply reply;
struct relay_stream *stream;
int ret;
struct lttng_ht_iter iter;
uint64_t last_net_seq_num, stream_id;
- DBG("Data available command received");
+ DBG("Data pending command received");
if (!session || session->version_check_done == 0) {
ERR("Trying to check for data before version check");
ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), MSG_WAITALL);
if (ret < sizeof(msg)) {
- ERR("Relay didn't receive valid data_available struct size : %d", ret);
+ ERR("Relay didn't receive valid data_pending struct size : %d", ret);
ret = -1;
goto end_no_session;
}
stream = caa_container_of(node, struct relay_stream, stream_n);
assert(stream);
- DBG("Data available for stream id %" PRIu64 " prev_seq %" PRIu64
+ DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64
" and last_seq %" PRIu64, stream_id, stream->prev_seq,
last_net_seq_num);
/* Avoid wrapping issue */
if (((int64_t) (stream->prev_seq - last_net_seq_num)) <= 0) {
- /* Data has in fact been written and is available */
- ret = 1;
- } else {
- /* Data still being streamed. */
+ /* Data has in fact been written and is NOT pending */
ret = 0;
+ } else {
+ /* Data still being streamed thus pending */
+ ret = 1;
}
end_unlock:
reply.ret_code = htobe32(ret);
ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
if (ret < 0) {
- ERR("Relay data available ret code failed");
+ ERR("Relay data pending ret code failed");
}
end_no_session:
reply.ret_code = htobe32(LTTNG_OK);
ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
if (ret < 0) {
- ERR("Relay data available ret code failed");
+ ERR("Relay data quiescent control ret code failed");
}
return ret;
case RELAYD_CLOSE_STREAM:
ret = relay_close_stream(recv_hdr, cmd, streams_ht);
break;
- case RELAYD_DATA_AVAILABLE:
- ret = relay_data_available(recv_hdr, cmd, streams_ht);
+ case RELAYD_DATA_PENDING:
+ ret = relay_data_pending(recv_hdr, cmd, streams_ht);
break;
case RELAYD_QUIESCENT_CONTROL:
ret = relay_quiescent_control(recv_hdr, cmd);
}
/*
- * Command LTTNG_DATA_AVAILABLE returning 0 if the data is NOT ready to be read
- * or else 1 if the data is available for trace analysis.
+ * Command LTTNG_DATA_PENDING returning 0 if the data is NOT pending meaning
+ * ready for trace analysis (or anykind of reader) or else 1 for pending data.
*/
-int cmd_data_available(struct ltt_session *session)
+int cmd_data_pending(struct ltt_session *session)
{
int ret;
struct ltt_kernel_session *ksess = session->kernel_session;
}
if (ksess && ksess->consumer) {
- ret = consumer_is_data_available(ksess->id, ksess->consumer);
- if (ret == 0) {
+ ret = consumer_is_data_pending(ksess->id, ksess->consumer);
+ if (ret == 1) {
/* Data is still being extracted for the kernel. */
goto error;
}
}
if (usess && usess->consumer) {
- ret = consumer_is_data_available(usess->id, usess->consumer);
- if (ret == 0) {
+ ret = consumer_is_data_pending(usess->id, usess->consumer);
+ if (ret == 1) {
/* Data is still being extracted for the kernel. */
goto error;
}
}
/* Data is ready to be read by a viewer */
- ret = 1;
+ ret = 0;
error:
return ret;
ssize_t cmd_list_tracepoints(int domain, struct lttng_event **events);
int cmd_calibrate(int domain, struct lttng_calibrate *calibrate);
-int cmd_data_available(struct ltt_session *session);
+int cmd_data_pending(struct ltt_session *session);
#endif /* CMD_H */
}
/*
- * Ask the consumer if the data is ready to bread (available) for the specific
+ * Ask the consumer if the data is ready to read (NOT pending) for the specific
* session id.
*
* This function has a different behavior with the consumer i.e. that it waits
- * for a reply from the consumer if yes or no the data is available.
+ * for a reply from the consumer if yes or no the data is pending.
*/
-int consumer_is_data_available(unsigned int id,
+int consumer_is_data_pending(unsigned int id,
struct consumer_output *consumer)
{
int ret;
- int32_t ret_code = 1; /* Default is that the data is available */
+ int32_t ret_code = 0; /* Default is that the data is NOT pending */
struct consumer_socket *socket;
struct lttng_ht_iter iter;
struct lttcomm_consumer_msg msg;
assert(consumer);
- msg.cmd_type = LTTNG_CONSUMER_DATA_AVAILABLE;
+ msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
- msg.u.data_available.session_id = (uint64_t) id;
+ msg.u.data_pending.session_id = (uint64_t) id;
- DBG3("Consumer data available for id %u", id);
+ DBG3("Consumer data pending for id %u", id);
/* Send command for each consumer */
cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg));
if (ret < 0) {
- PERROR("send consumer data available command");
+ PERROR("send consumer data pending command");
pthread_mutex_unlock(socket->lock);
goto error;
}
- /*
- * Waiting for the reply code where 0 the data is not available and 1
- * it is for trace reading.
- */
ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code));
if (ret < 0) {
- PERROR("recv consumer data available status");
+ PERROR("recv consumer data pending status");
pthread_mutex_unlock(socket->lock);
goto error;
}
pthread_mutex_unlock(socket->lock);
- if (ret_code == 0) {
+ if (ret_code == 1) {
break;
}
}
- DBG("Consumer data available ret %d", ret_code);
+ DBG("Consumer data pending ret %d", ret_code);
return ret_code;
error:
uint64_t mmap_len,
const char *name,
unsigned int nb_init_streams);
-int consumer_is_data_available(unsigned int id,
+int consumer_is_data_pending(unsigned int id,
struct consumer_output *consumer);
#endif /* _CONSUMER_H */
case LTTNG_LIST_DOMAINS:
case LTTNG_START_TRACE:
case LTTNG_STOP_TRACE:
- case LTTNG_DATA_AVAILABLE:
+ case LTTNG_DATA_PENDING:
need_domain = 0;
break;
default:
bytecode);
break;
}
- case LTTNG_DATA_AVAILABLE:
+ case LTTNG_DATA_PENDING:
{
- ret = cmd_data_available(cmd_ctx->session);
+ ret = cmd_data_pending(cmd_ctx->session);
break;
}
default:
* Check if for a given session id there is still data needed to be extract
* from the buffers.
*
- * Return 1 if data is in fact available to be read or else 0.
+ * Return 1 if data is pending or else 0 meaning ready to be read.
*/
-int consumer_data_available(uint64_t id)
+int consumer_data_pending(uint64_t id)
{
int ret;
struct lttng_ht_iter iter;
struct lttng_ht *ht;
struct lttng_consumer_stream *stream;
struct consumer_relayd_sock_pair *relayd;
- int (*data_available)(struct lttng_consumer_stream *);
+ int (*data_pending)(struct lttng_consumer_stream *);
- DBG("Consumer data available command on session id %" PRIu64, id);
+ DBG("Consumer data pending command on session id %" PRIu64, id);
rcu_read_lock();
pthread_mutex_lock(&consumer_data.lock);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
- data_available = lttng_kconsumer_data_available;
+ data_pending = lttng_kconsumer_data_pending;
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- data_available = lttng_ustconsumer_data_available;
+ data_pending = lttng_ustconsumer_data_pending;
break;
default:
ERR("Unknown consumer data type");
/* If this call fails, the stream is being used hence data pending. */
ret = stream_try_lock(stream);
if (!ret) {
- goto data_not_available;
+ goto data_not_pending;
}
/*
ret = cds_lfht_is_node_deleted(&stream->node.node);
if (!ret) {
/* Check the stream if there is data in the buffers. */
- ret = data_available(stream);
- if (ret == 0) {
+ ret = data_pending(stream);
+ if (ret == 1) {
pthread_mutex_unlock(&stream->lock);
- goto data_not_available;
+ goto data_not_pending;
}
}
* are or will be marked for deletion hence no data pending.
*/
pthread_mutex_unlock(&stream->lock);
- goto data_not_available;
+ goto data_not_pending;
}
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
if (stream->metadata_flag) {
ret = relayd_quiescent_control(&relayd->control_sock);
} else {
- ret = relayd_data_available(&relayd->control_sock,
+ ret = relayd_data_pending(&relayd->control_sock,
stream->relayd_stream_id, stream->next_net_seq_num);
}
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret == 0) {
+ if (ret == 1) {
pthread_mutex_unlock(&stream->lock);
- goto data_not_available;
+ goto data_not_pending;
}
}
pthread_mutex_unlock(&stream->lock);
/* Data is available to be read by a viewer. */
pthread_mutex_unlock(&consumer_data.lock);
rcu_read_unlock();
- return 1;
+ return 0;
-data_not_available:
+data_not_pending:
/* Data is still being extracted from buffers. */
pthread_mutex_unlock(&consumer_data.lock);
rcu_read_unlock();
- return 0;
+ return 1;
}
/* Inform the consumer to kill a specific relayd connection */
LTTNG_CONSUMER_DESTROY_RELAYD,
/* Return to the sessiond if there is data pending for a session */
- LTTNG_CONSUMER_DATA_AVAILABLE,
+ LTTNG_CONSUMER_DATA_PENDING,
};
/* State of each fd in consumer */
struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock);
void consumer_flag_relayd_for_destroy(
struct consumer_relayd_sock_pair *relayd);
-int consumer_data_available(uint64_t id);
+int consumer_data_pending(uint64_t id);
#endif /* LIB_CONSUMER_H */
#define DEFAULT_HEALTH_CHECK_DELTA_NS 0
/*
- * Wait period before retrying the lttng_data_available command in the lttng
+ * Wait period before retrying the lttng_data_pending command in the lttng
* stop command of liblttng-ctl.
*/
#define DEFAULT_DATA_AVAILABILITY_WAIT_TIME 200000 /* usec */
goto end_nosignal;
}
- case LTTNG_CONSUMER_DATA_AVAILABLE:
+ case LTTNG_CONSUMER_DATA_PENDING:
{
int32_t ret;
- uint64_t id = msg.u.data_available.session_id;
+ uint64_t id = msg.u.data_pending.session_id;
- DBG("Kernel consumer data available command for id %" PRIu64, id);
+ DBG("Kernel consumer data pending command for id %" PRIu64, id);
- ret = consumer_data_available(id);
+ ret = consumer_data_pending(id);
/* Send back returned value to session daemon */
ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
if (ret < 0) {
- PERROR("send data available ret code");
+ PERROR("send data pending ret code");
}
break;
}
* stream. Consumer data lock MUST be acquired before calling this function
* and the stream lock.
*
- * Return 0 if the traced data are still getting read else 1 meaning that the
+ * Return 1 if the traced data are still getting read else 0 meaning that the
* data is available for trace viewer reading.
*/
-int lttng_kconsumer_data_available(struct lttng_consumer_stream *stream)
+int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
{
int ret;
/* There is still data so let's put back this subbuffer. */
ret = kernctl_put_subbuf(stream->wait_fd);
assert(ret == 0);
+ ret = 1; /* Data is pending */
goto end;
}
- /* Data is available to be read for this stream. */
- ret = 1;
+ /* Data is NOT pending and ready to be read. */
+ ret = 0;
end:
return ret;
ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx);
int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream);
-int lttng_kconsumer_data_available(struct lttng_consumer_stream *stream);
+int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream);
#endif /* _LTTNG_KCONSUMER_H */
/*
* Check for data availability for a given stream id.
*
- * Return 0 if NOT available, 1 if so and a negative value on error.
+ * Return 0 if NOT pending, 1 if so and a negative value on error.
*/
-int relayd_data_available(struct lttcomm_sock *sock, uint64_t stream_id,
+int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id,
uint64_t last_net_seq_num)
{
int ret;
- struct lttcomm_relayd_data_available msg;
+ struct lttcomm_relayd_data_pending msg;
struct lttcomm_relayd_generic_reply reply;
/* Code flow error. Safety net. */
assert(sock);
- DBG("Relayd data available for stream id %" PRIu64, stream_id);
+ DBG("Relayd data pending for stream id %" PRIu64, stream_id);
msg.stream_id = htobe64(stream_id);
msg.last_net_seq_num = htobe64(last_net_seq_num);
/* Send command */
- ret = send_command(sock, RELAYD_DATA_AVAILABLE, (void *) &msg,
+ ret = send_command(sock, RELAYD_DATA_PENDING, (void *) &msg,
sizeof(msg), 0);
if (ret < 0) {
goto error;
/* Return session id or negative ret code. */
if (reply.ret_code >= LTTNG_OK) {
ret = -reply.ret_code;
- ERR("Relayd data available replied error %d", ret);
+ ERR("Relayd data pending replied error %d", ret);
}
/* At this point, the ret code is either 1 or 0 */
ret = reply.ret_code;
- DBG("Relayd data is %s available for stream id %" PRIu64,
- ret == 1 ? "" : "NOT", stream_id);
+ DBG("Relayd data is %s pending for stream id %" PRIu64,
+ ret == 1 ? "NOT" : "", stream_id);
error:
return ret;
}
/* Control socket is quiescent */
- return 1;
+ return 0;
error:
return ret;
int relayd_send_metadata(struct lttcomm_sock *sock, size_t len);
int relayd_send_data_hdr(struct lttcomm_sock *sock,
struct lttcomm_relayd_data_hdr *hdr, size_t size);
-int relayd_data_available(struct lttcomm_sock *sock, uint64_t stream_id,
+int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id,
uint64_t last_net_seq_num);
int relayd_quiescent_control(struct lttcomm_sock *sock);
} __attribute__ ((__packed__));
/*
- * Used to test if for a given stream id the data is available on the relayd
- * side for reading.
+ * Used to test if for a given stream id the data is pending on the relayd side
+ * for reading.
*/
-struct lttcomm_relayd_data_available {
+struct lttcomm_relayd_data_pending {
uint64_t stream_id;
uint64_t last_net_seq_num; /* Sequence number of the last packet */
} __attribute__ ((__packed__));
RELAYD_VERSION,
RELAYD_SEND_METADATA,
RELAYD_CLOSE_STREAM,
- RELAYD_DATA_AVAILABLE,
+ RELAYD_DATA_PENDING,
RELAYD_QUIESCENT_CONTROL,
LTTNG_SET_FILTER,
LTTNG_HEALTH_CHECK,
- LTTNG_DATA_AVAILABLE,
+ LTTNG_DATA_PENDING,
};
/*
} destroy_relayd;
struct {
uint64_t session_id;
- } data_available;
+ } data_pending;
} u;
};
rcu_read_unlock();
return -ENOSYS;
}
- case LTTNG_CONSUMER_DATA_AVAILABLE:
+ case LTTNG_CONSUMER_DATA_PENDING:
{
int32_t ret;
- uint64_t id = msg.u.data_available.session_id;
+ uint64_t id = msg.u.data_pending.session_id;
- DBG("UST consumer data available command for id %" PRIu64, id);
+ DBG("UST consumer data pending command for id %" PRIu64, id);
- ret = consumer_data_available(id);
+ ret = consumer_data_pending(id);
/* Send back returned value to session daemon */
ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
if (ret < 0) {
- PERROR("send data available ret code");
+ PERROR("send data pending ret code");
}
break;
}
* stream. Consumer data lock MUST be acquired before calling this function
* and the stream lock.
*
- * Return 0 if the traced data are still getting read else 1 meaning that the
+ * Return 1 if the traced data are still getting read else 0 meaning that the
* data is available for trace viewer reading.
*/
-int lttng_ustconsumer_data_available(struct lttng_consumer_stream *stream)
+int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
{
int ret;
assert(stream);
- DBG("UST consumer checking data availability");
+ DBG("UST consumer checking data pending");
ret = ustctl_get_next_subbuf(stream->chan->handle, stream->buf);
if (ret == 0) {
/* There is still data so let's put back this subbuffer. */
ret = ustctl_put_subbuf(stream->chan->handle, stream->buf);
assert(ret == 0);
+ ret = 1; /* Data is pending */
goto end;
}
- /* Data is available to be read for this stream. */
- ret = 1;
+ /* Data is NOT pending so ready to be read. */
+ ret = 0;
end:
return ret;
extern int lttng_ustctl_get_mmap_read_offset(
struct lttng_ust_shm_handle *handle,
struct lttng_ust_lib_ring_buffer *buf, unsigned long *off);
-int lttng_ustconsumer_data_available(struct lttng_consumer_stream *stream);
+int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream);
#else /* HAVE_LIBLTTNG_UST_CTL */
return -ENOSYS;
}
static inline
-int lttng_ustconsumer_data_available(struct lttng_consumer_stream *stream)
+int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
{
return -ENOSYS;
}
/* Check for data availability */
do {
- data_ret = lttng_data_available(session_name);
+ data_ret = lttng_data_pending(session_name);
if (data_ret < 0) {
/* Return the data available call error. */
ret = data_ret;
* Data sleep time before retrying (in usec). Don't sleep if the call
* returned value indicates availability.
*/
- if (!data_ret) {
+ if (data_ret) {
usleep(DEFAULT_DATA_AVAILABILITY_WAIT_TIME);
_MSG(".");
}
- } while (data_ret != 1);
+ } while (data_ret != 0);
MSG("");
* or is still being extracted by the consumer(s) hence not ready to be used by
* any readers.
*/
-int lttng_data_available(const char *session_name)
+int lttng_data_pending(const char *session_name)
{
int ret;
struct lttcomm_session_msg lsm;
return -LTTNG_ERR_INVALID;
}
- lsm.cmd_type = LTTNG_DATA_AVAILABLE;
+ lsm.cmd_type = LTTNG_DATA_PENDING;
copy_string(lsm.session.name, session_name, sizeof(lsm.session.name));