* Represents a session for the relay point of view
*/
struct relay_session {
+ /*
+ * This session id is used to identify a set of stream to a tracing session
+ * but also make sure we have a unique session id associated with a session
+ * daemon which can provide multiple data source.
+ */
uint64_t id;
struct lttcomm_sock *sock;
};
/* Information telling us when to close the stream */
unsigned int close_flag:1;
uint64_t last_net_seq_num;
+ /* Indicate if the stream was initialized for a data pending command. */
+ unsigned int data_pending_check_done:1;
};
/*
{
if (stream->close_flag && stream->prev_seq == stream->last_net_seq_num) {
+ /*
+ * We are about to close the stream so set the data pending flag to 1
+ * which will make the end data pending command skip the stream which
+ * is now closed and ready. Note that after proceeding to a file close,
+ * the written file is ready for reading.
+ */
+ stream->data_pending_check_done = 1;
return 1;
}
return 0;
ret = 1;
}
+ /* Pending check is now done. */
+ stream->data_pending_check_done = 1;
+
end_unlock:
rcu_read_unlock();
return ret;
}
+/*
+ * Initialize a data pending command. This means that a client is about to ask
+ * for data pending for each stream he/she holds. Simply iterate over all
+ * streams of a session and set the data_pending_check_done flag.
+ *
+ * This command returns to the client a LTTNG_OK code.
+ */
+static
+int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd, struct lttng_ht *streams_ht)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttcomm_relayd_begin_data_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ uint64_t session_id;
+
+ assert(recv_hdr);
+ assert(cmd);
+ assert(streams_ht);
+
+ DBG("Init streams for data pending");
+
+ if (!cmd->session || cmd->version_check_done == 0) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
+ if (ret < sizeof(msg)) {
+ ERR("Relay didn't receive valid begin data_pending struct size: %d",
+ ret);
+ ret = -1;
+ goto end_no_session;
+ }
+
+ session_id = be64toh(msg.session_id);
+
+ /*
+ * Iterate over all streams to set the begin data pending flag. For now, the
+ * streams are indexed by stream handle so we have to iterate over all
+ * streams to find the one associated with the right session_id.
+ */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) {
+ if (stream->session->id == session_id) {
+ stream->data_pending_check_done = 0;
+ DBG("Set begin data pending flag to stream %" PRIu64,
+ stream->stream_handle);
+ }
+ }
+ rcu_read_unlock();
+
+ /* All good, send back reply. */
+ reply.ret_code = htobe32(LTTNG_OK);
+
+ ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (ret < 0) {
+ ERR("Relay begin data pending send reply failed");
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * End data pending command. This will check, for a given session id, if each
+ * stream associated with it has its data_pending_check_done flag set. If not,
+ * this means that the client lost track of the stream but the data is still
+ * being streamed on our side. In this case, we inform the client that data is
+ * inflight.
+ *
+ * Return to the client if there is data in flight or not with a ret_code.
+ */
+static
+int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd, struct lttng_ht *streams_ht)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttcomm_relayd_end_data_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ uint64_t session_id;
+ uint32_t is_data_inflight = 0;
+
+ assert(recv_hdr);
+ assert(cmd);
+ assert(streams_ht);
+
+ DBG("End data pending command");
+
+ if (!cmd->session || cmd->version_check_done == 0) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
+ if (ret < sizeof(msg)) {
+ ERR("Relay didn't receive valid end data_pending struct size: %d",
+ ret);
+ ret = -1;
+ goto end_no_session;
+ }
+
+ session_id = be64toh(msg.session_id);
+
+ /* Iterate over all streams to see if the begin data pending flag is set. */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) {
+ if (stream->session->id == session_id &&
+ !stream->data_pending_check_done) {
+ is_data_inflight = 1;
+ DBG("Data is still in flight for stream %" PRIu64,
+ stream->stream_handle);
+ break;
+ }
+ }
+ rcu_read_unlock();
+
+ /* All good, send back reply. */
+ reply.ret_code = htobe32(is_data_inflight);
+
+ ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (ret < 0) {
+ ERR("Relay end data pending send reply failed");
+ }
+
+end_no_session:
+ return ret;
+}
+
/*
* relay_process_control: Process the commands received on the control socket
*/
case RELAYD_QUIESCENT_CONTROL:
ret = relay_quiescent_control(recv_hdr, cmd);
break;
+ case RELAYD_BEGIN_DATA_PENDING:
+ ret = relay_begin_data_pending(recv_hdr, cmd, streams_ht);
+ break;
+ case RELAYD_END_DATA_PENDING:
+ ret = relay_end_data_pending(recv_hdr, cmd, streams_ht);
+ break;
case RELAYD_UPDATE_SYNC_INFO:
default:
ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
/*
* This hash table contains the mapping between the session id of the sessiond
- * and the relayd session id. Element of the ht are indexed by sessiond_id.
+ * and the relayd session id. Element of the ht are indexed by sessiond session
+ * id.
*
* Node can be added when a relayd communication is opened in the sessiond
* thread.
DBG("Consumer destroy and close relayd socket pair");
+ /* Loockup for a relayd node in the session id map hash table. */
lttng_ht_lookup(relayd_session_id_ht,
- (void *)((unsigned long) relayd->session_id), &iter);
+ (void *)((unsigned long) relayd->sessiond_session_id), &iter);
node = lttng_ht_iter_get_node_ulong(&iter);
if (node != NULL) {
/* We assume the relayd is being or is destroyed */
return;
}
- ret = lttng_ht_del(relayd_session_id_ht, &iter);
- if (ret != 0) {
- /* We assume the relayd is being or is destroyed */
- return;
- }
+ /*
+ * Try to delete it from the relayd session id ht. The return value is of
+ * no importance since either way we are going to try to delete the relayd
+ * from the global relayd_ht.
+ */
+ lttng_ht_del(relayd_session_id_ht, &iter);
iter.iter.node = &relayd->node.node;
ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
}
lttng_ht_destroy(consumer_data.relayd_ht);
+ /* The destroy_relayd call makes sure that this ht is empty here. */
+ lttng_ht_destroy(relayd_session_id_ht);
rcu_read_unlock();
}
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
goto error;
}
+ relayd->sessiond_session_id = (uint64_t) sessiond_id;
}
/* Poll on consumer socket. */
* grab the socket lock since the relayd object is not yet visible.
*/
ret = relayd_create_session(&relayd->control_sock,
- &relayd->session_id);
+ &relayd->relayd_session_id);
if (ret < 0) {
goto error;
}
goto error;
}
- relayd_id_node->relayd_id = relayd->session_id;
+ relayd_id_node->relayd_id = relayd->relayd_session_id;
relayd_id_node->sessiond_id = (uint64_t) sessiond_id;
- /* Indexed by session id of the session daemon. */
+ /* Indexed by session id of the sessiond. */
lttng_ht_node_init_ulong(&relayd_id_node->node,
relayd_id_node->sessiond_id);
rcu_read_lock();
return ret;
}
+/*
+ * Search for a relayd associated to the session id and return the reference.
+ *
+ * A rcu read side lock MUST be acquire before calling this function and locked
+ * until the relayd object is no longer necessary.
+ */
+static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_ht_node_ulong *node;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+ struct consumer_relayd_session_id *session_id_map;
+
+ /* Get the session id map. */
+ lttng_ht_lookup(relayd_session_id_ht, (void *)((unsigned long) id), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node == NULL) {
+ goto end;
+ }
+
+ session_id_map = caa_container_of(node, struct consumer_relayd_session_id,
+ node);
+
+ /* Iterate over all relayd since they are indexed by net_seq_idx. */
+ cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
+ node.node) {
+ if (relayd->relayd_session_id == session_id_map->relayd_id) {
+ /* Found the relayd. There can be only one per id. */
+ break;
+ }
+ }
+
+end:
+ return relayd;
+}
+
/*
* Check if for a given session id there is still data needed to be extract
* from the buffers.
struct lttng_ht_iter iter;
struct lttng_ht *ht;
struct lttng_consumer_stream *stream;
- struct consumer_relayd_sock_pair *relayd;
+ struct consumer_relayd_sock_pair *relayd = NULL;
int (*data_pending)(struct lttng_consumer_stream *);
DBG("Consumer data pending command on session id %" PRIu64, id);
/* Ease our life a bit */
ht = consumer_data.stream_list_ht;
+ relayd = find_relayd_by_session_id(id);
+ if (relayd) {
+ /* Send init command for data pending. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_begin_data_pending(&relayd->control_sock,
+ relayd->relayd_session_id);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ /* Communication error thus the relayd so no data pending. */
+ goto data_not_pending;
+ }
+ }
+
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
ht->match_fct, (void *)((unsigned long) id),
/* If this call fails, the stream is being used hence data pending. */
ret = stream_try_lock(stream);
if (!ret) {
- goto data_not_pending;
+ goto data_pending;
}
/*
ret = data_pending(stream);
if (ret == 1) {
pthread_mutex_unlock(&stream->lock);
- goto data_not_pending;
+ goto data_pending;
}
}
/* Relayd check */
- if (stream->net_seq_idx != -1) {
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (!relayd) {
- /*
- * At this point, if the relayd object is not available for the
- * given stream, it is because the relayd is being cleaned up
- * so every stream associated with it (for a session id value)
- * are or will be marked for deletion hence no data pending.
- */
- pthread_mutex_unlock(&stream->lock);
- goto data_not_pending;
- }
-
+ if (relayd) {
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
if (stream->metadata_flag) {
ret = relayd_quiescent_control(&relayd->control_sock);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret == 1) {
pthread_mutex_unlock(&stream->lock);
- goto data_not_pending;
+ goto data_pending;
}
}
pthread_mutex_unlock(&stream->lock);
}
+ if (relayd) {
+ unsigned int is_data_inflight = 0;
+
+ /* Send init command for data pending. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_end_data_pending(&relayd->control_sock,
+ relayd->relayd_session_id, &is_data_inflight);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0 || !is_data_inflight) {
+ /* On error or if NO data inflight, no data is pending. */
+ goto data_not_pending;
+ }
+ }
+
/*
- * Finding _no_ node in the hash table means that the stream(s) have been
- * removed thus data is guaranteed to be available for analysis from the
- * trace files. This is *only* true for local consumer and not network
- * streaming.
+ * Finding _no_ node in the hash table and no inflight data means that the
+ * stream(s) have been removed thus data is guaranteed to be available for
+ * analysis from the trace files.
*/
+data_not_pending:
/* Data is available to be read by a viewer. */
pthread_mutex_unlock(&consumer_data.lock);
rcu_read_unlock();
return 0;
-data_not_pending:
+data_pending:
/* Data is still being extracted from buffers. */
pthread_mutex_unlock(&consumer_data.lock);
rcu_read_unlock();
struct lttcomm_sock data_sock;
struct lttng_ht_node_ulong node;
- /* Session id on the relayd side for the sockets. */
- uint64_t session_id;
+ /* Session id on both sides for the sockets. */
+ uint64_t relayd_session_id;
+ uint64_t sessiond_session_id;
};
/*
error:
return ret;
}
+
+/*
+ * Begin a data pending command for a specific session id.
+ */
+int relayd_begin_data_pending(struct lttcomm_sock *sock, uint64_t id)
+{
+ int ret;
+ struct lttcomm_relayd_begin_data_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+
+ /* Code flow error. Safety net. */
+ assert(sock);
+
+ DBG("Relayd begin data pending");
+
+ msg.session_id = htobe64(id);
+
+ /* Send command */
+ ret = send_command(sock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Recevie response */
+ ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ if (ret < 0) {
+ goto error;
+ }
+
+ reply.ret_code = be32toh(reply.ret_code);
+
+ /* Return session id or negative ret code. */
+ if (reply.ret_code != LTTNG_OK) {
+ ret = -reply.ret_code;
+ ERR("Relayd begin data pending replied error %d", ret);
+ goto error;
+ }
+
+ return 0;
+
+error:
+ return ret;
+}
+
+/*
+ * End a data pending command for a specific session id.
+ *
+ * Return 0 on success and set is_data_inflight to 0 if no data is being
+ * streamed or 1 if it is the case.
+ */
+int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id,
+ unsigned int *is_data_inflight)
+{
+ int ret;
+ struct lttcomm_relayd_end_data_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+
+ /* Code flow error. Safety net. */
+ assert(sock);
+
+ DBG("Relayd end data pending");
+
+ msg.session_id = htobe64(id);
+
+ /* Send command */
+ ret = send_command(sock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Recevie response */
+ ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ if (ret < 0) {
+ goto error;
+ }
+
+ reply.ret_code = be32toh(reply.ret_code);
+ if (reply.ret_code < 0) {
+ ret = reply.ret_code;
+ goto error;
+ }
+
+ *is_data_inflight = reply.ret_code;
+
+ DBG("Relayd end data pending is data inflight: %d", reply.ret_code);
+
+ return 0;
+
+error:
+ return ret;
+}
int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id,
uint64_t last_net_seq_num);
int relayd_quiescent_control(struct lttcomm_sock *sock);
+int relayd_begin_data_pending(struct lttcomm_sock *sock, uint64_t id);
+int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id,
+ unsigned int *is_data_inflight);
#endif /* _RELAYD_H */
uint64_t last_net_seq_num; /* Sequence number of the last packet */
} __attribute__ ((__packed__));
+struct lttcomm_relayd_begin_data_pending {
+ uint64_t session_id;
+} __attribute__ ((__packed__));
+
+struct lttcomm_relayd_end_data_pending {
+ uint64_t session_id;
+} __attribute__ ((__packed__));
+
#endif /* _RELAYD_COMM */
LTTNG_ENABLE_EVENT_WITH_FILTER = 32,
LTTNG_HEALTH_CHECK = 33,
LTTNG_DATA_PENDING = 34,
+ RELAYD_BEGIN_DATA_PENDING = 35,
+ RELAYD_END_DATA_PENDING = 36,
};
/*