return ret;
}
+/*
+ * Check for data availability for a given stream id from the session daemon.
+ */
+static
+int relay_data_available(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd, struct lttng_ht *streams_ht)
+{
+ struct relay_session *session = cmd->session;
+ struct lttcomm_relayd_data_available msg;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ int ret;
+ struct lttng_ht_node_ulong *node;
+ struct lttng_ht_iter iter;
+ uint64_t last_net_seq_num, stream_id;
+
+ DBG("Data available command received");
+
+ if (!session || session->version_check_done == 0) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), MSG_WAITALL);
+ if (ret < sizeof(msg)) {
+ ERR("Relay didn't receive valid data_available struct size : %d", ret);
+ ret = -1;
+ goto end_no_session;
+ }
+
+ stream_id = be64toh(msg.stream_id);
+ last_net_seq_num = be64toh(msg.last_net_seq_num);
+
+ rcu_read_lock();
+ lttng_ht_lookup(streams_ht, (void *)((unsigned long) stream_id), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node == NULL) {
+ DBG("Relay stream %" PRIu64 " not found", stream_id);
+ ret = -1;
+ goto end_unlock;
+ }
+
+ stream = caa_container_of(node, struct relay_stream, stream_n);
+ assert(stream);
+
+ DBG("Data available for stream id %" PRIu64 " prev_seq %" PRIu64
+ " and last_seq %" PRIu64, stream_id, stream->prev_seq,
+ last_net_seq_num);
+
+ if (stream->prev_seq == -1UL || stream->prev_seq <= last_net_seq_num) {
+ /* Data has in fact been written and is available */
+ ret = 1;
+ } else {
+ /* Data still being streamed. */
+ ret = 0;
+ }
+
+end_unlock:
+ rcu_read_unlock();
+
+ reply.ret_code = htobe32(ret);
+ ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (ret < 0) {
+ ERR("Relay data available ret code failed");
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * Wait for the control socket to reach a quiescent state.
+ *
+ * Note that for now, when receiving this command from the session daemon, this
+ * means that every subsequent commands or data received on the control socket
+ * has been handled. So, this is why we simply return OK here.
+ */
+static
+int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd)
+{
+ int ret;
+ struct lttcomm_relayd_generic_reply reply;
+
+ DBG("Checking quiescent state on control socket");
+
+ reply.ret_code = htobe32(LTTNG_OK);
+ ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (ret < 0) {
+ ERR("Relay data available ret code failed");
+ }
+
+ return ret;
+}
+
/*
* relay_process_control: Process the commands received on the control socket
*/
case RELAYD_CLOSE_STREAM:
ret = relay_close_stream(recv_hdr, cmd, streams_ht);
break;
+ case RELAYD_DATA_AVAILABLE:
+ ret = relay_data_available(recv_hdr, cmd, streams_ht);
+ break;
+ case RELAYD_QUIESCENT_CONTROL:
+ ret = relay_quiescent_control(recv_hdr, cmd);
+ break;
case RELAYD_UPDATE_SYNC_INFO:
default:
ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
DBG3("Consumer data available for id %u", id);
+ /* Send command for each consumer */
cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
node.node) {
/* Code flow error */
/* RCU lock for the relayd pointer */
rcu_read_lock();
+ pthread_mutex_lock(&stream->lock);
+
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
lttng_consumer_sync_trace_file(stream, orig_offset);
end:
+ pthread_mutex_unlock(&stream->lock);
/* Unlock only if ctrl socket used */
if (relayd && stream->metadata_flag) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
/* RCU lock for the relayd pointer */
rcu_read_lock();
+ pthread_mutex_lock(&stream->lock);
+
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
}
end:
+ pthread_mutex_unlock(&stream->lock);
if (relayd && stream->metadata_flag) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
struct lttng_ht_iter iter;
struct lttng_ht *ht;
struct lttng_consumer_stream *stream;
+ struct consumer_relayd_sock_pair *relayd;
int (*data_available)(struct lttng_consumer_stream *);
DBG("Consumer data available command on session id %" PRIu64, id);
assert(0);
}
+ rcu_read_lock();
+
/* Ease our life a bit */
ht = consumer_data.stream_list_ht;
- cds_lfht_for_each_entry_duplicate(ht->ht, (long unsigned int) ht->hash_fct,
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct((void *)((unsigned long) id), 0x42UL),
ht->match_fct, (void *)((unsigned long) id),
&iter.iter, stream, node_session_id.node) {
/* Check the stream for data. */
if (ret == 0) {
goto data_not_available;
}
- }
- /* TODO: Support to ask the relayd if the streams are remote */
+ if (stream->net_seq_idx != -1) {
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ assert(relayd);
+
+ pthread_mutex_lock(&stream->lock);
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ if (stream->metadata_flag) {
+ ret = relayd_quiescent_control(&relayd->control_sock);
+ } else {
+ ret = relayd_data_available(&relayd->control_sock,
+ stream->relayd_stream_id, stream->next_net_seq_num);
+ }
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ pthread_mutex_unlock(&stream->lock);
+ if (ret == 0) {
+ goto data_not_available;
+ }
+ }
+ }
/*
* Finding _no_ node in the hash table means that the stream(s) have been
/* Data is available to be read by a viewer. */
pthread_mutex_unlock(&consumer_data.lock);
+ rcu_read_unlock();
return 1;
data_not_available:
/* Data is still being extracted from buffers. */
pthread_mutex_unlock(&consumer_data.lock);
+ rcu_read_unlock();
return 0;
}
}
case LTTNG_CONSUMER_DATA_AVAILABLE:
{
- rcu_read_unlock();
- return -ENOSYS;
+ int32_t ret;
+ uint64_t id = msg.u.data_available.session_id;
+
+ DBG("Kernel consumer data available command for id %" PRIu64, id);
+
+ ret = consumer_data_available(id);
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ if (ret < 0) {
+ PERROR("send data available ret code");
+ }
+ break;
}
default:
goto end_nosignal;
error:
return ret;
}
+
+/*
+ * Check for data availability for a given stream id.
+ *
+ * Return 0 if NOT available, 1 if so and a negative value on error.
+ */
+int relayd_data_available(struct lttcomm_sock *sock, uint64_t stream_id,
+ uint64_t last_net_seq_num)
+{
+ int ret;
+ struct lttcomm_relayd_data_available msg;
+ struct lttcomm_relayd_generic_reply reply;
+
+ /* Code flow error. Safety net. */
+ assert(sock);
+
+ DBG("Relayd data available for stream id %" PRIu64, stream_id);
+
+ msg.stream_id = htobe64(stream_id);
+ msg.last_net_seq_num = htobe64(last_net_seq_num);
+
+ /* Send command */
+ ret = send_command(sock, RELAYD_DATA_AVAILABLE, (void *) &msg,
+ sizeof(msg), 0);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Recevie response */
+ ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ if (ret < 0) {
+ goto error;
+ }
+
+ reply.ret_code = be32toh(reply.ret_code);
+
+ /* Return session id or negative ret code. */
+ if (reply.ret_code >= LTTNG_OK) {
+ ret = -reply.ret_code;
+ ERR("Relayd data available replied error %d", ret);
+ }
+
+ /* At this point, the ret code is either 1 or 0 */
+ ret = reply.ret_code;
+
+ DBG("Relayd data is %s available for stream id %" PRIu64,
+ ret == 1 ? "" : "NOT", stream_id);
+
+error:
+ return ret;
+}
+
+/*
+ * Check on the relayd side for a quiescent state on the control socket.
+ */
+int relayd_quiescent_control(struct lttcomm_sock *sock)
+{
+ int ret;
+ struct lttcomm_relayd_generic_reply reply;
+
+ /* Code flow error. Safety net. */
+ assert(sock);
+
+ DBG("Relayd checking quiescent control state");
+
+ /* Send command */
+ ret = send_command(sock, RELAYD_QUIESCENT_CONTROL, NULL, 0, 0);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Recevie response */
+ ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ if (ret < 0) {
+ goto error;
+ }
+
+ reply.ret_code = be32toh(reply.ret_code);
+
+ /* Return session id or negative ret code. */
+ if (reply.ret_code != LTTNG_OK) {
+ ret = -reply.ret_code;
+ ERR("Relayd quiescent control replied error %d", ret);
+ goto error;
+ }
+
+ /* Control socket is quiescent */
+ return 1;
+
+error:
+ return ret;
+}
int relayd_send_metadata(struct lttcomm_sock *sock, size_t len);
int relayd_send_data_hdr(struct lttcomm_sock *sock,
struct lttcomm_relayd_data_hdr *hdr, size_t size);
+int relayd_data_available(struct lttcomm_sock *sock, uint64_t stream_id,
+ uint64_t last_net_seq_num);
+int relayd_quiescent_control(struct lttcomm_sock *sock);
#endif /* _RELAYD_H */
uint64_t last_net_seq_num; /* sequence number of last packet */
} __attribute__ ((__packed__));
+/*
+ * Used to test if for a given stream id the data is available on the relayd
+ * side for reading.
+ */
+struct lttcomm_relayd_data_available {
+ uint64_t stream_id;
+ uint64_t last_net_seq_num; /* Sequence number of the last packet */
+} __attribute__ ((__packed__));
+
#endif /* _RELAYD_COMM */
RELAYD_VERSION,
RELAYD_SEND_METADATA,
RELAYD_CLOSE_STREAM,
+ RELAYD_DATA_AVAILABLE,
+ RELAYD_QUIESCENT_CONTROL,
LTTNG_SET_FILTER,
LTTNG_HEALTH_CHECK,
LTTNG_DATA_AVAILABLE,
assert(stream);
+ DBG("UST consumer checking data availability");
+
/*
* Try to lock the stream mutex. On failure, we know that the stream is
* being used else where hence there is data still being extracted.