session->id = ++last_relay_session_id;
session->sock = cmd->sock;
+ session->minor = cmd->minor;
+ session->major = cmd->major;
cmd->session = session;
reply.session_id = htobe64(session->id);
return ret;
}
+/*
+ * Handle index for a data stream.
+ *
+ * RCU read side lock MUST be acquired.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
+ int rotate_index)
+{
+ int ret = 0, index_created = 0;
+ uint64_t stream_id, data_offset;
+ struct relay_index *index, *wr_index = NULL;
+
+ assert(stream);
+
+ stream_id = stream->stream_handle;
+ /* Get data offset because we are about to update the index. */
+ data_offset = htobe64(stream->tracefile_size_current);
+
+ /*
+ * Lookup for an existing index for that stream id/sequence number. If on
+ * exists, the control thread already received the data for it thus we need
+ * to write it on disk.
+ */
+ index = relay_index_find(stream_id, net_seq_num);
+ if (!index) {
+ /* A successful creation will add the object to the HT. */
+ index = relay_index_create(stream_id, net_seq_num);
+ if (!index) {
+ ret = -1;
+ goto error;
+ }
+ index_created = 1;
+ }
+
+ if (rotate_index || stream->index_fd < 0) {
+ index->to_close_fd = stream->index_fd;
+ ret = index_create_file(stream->path_name, stream->channel_name,
+ relayd_uid, relayd_gid, stream->tracefile_size,
+ stream->tracefile_count_current);
+ if (ret < 0) {
+ /* This will close the stream's index fd if one. */
+ relay_index_free_safe(index);
+ goto error;
+ }
+ stream->index_fd = ret;
+ }
+ index->fd = stream->index_fd;
+ index->index_data.offset = data_offset;
+
+ if (index_created) {
+ /*
+ * Try to add the relay index object to the hash table. If an object
+ * already exist, destroy back the index created and set the data.
+ */
+ relay_index_add(index, &wr_index);
+ if (wr_index) {
+ /* Copy back data from the created index. */
+ wr_index->fd = index->fd;
+ wr_index->to_close_fd = index->to_close_fd;
+ wr_index->index_data.offset = data_offset;
+ free(index);
+ }
+ } else {
+ /* The index already exists so write it on disk. */
+ wr_index = index;
+ }
+
+ /* Do we have a writable ready index to write on disk. */
+ if (wr_index) {
+ ret = relay_index_write(wr_index->fd, wr_index);
+ if (ret < 0) {
+ goto error;
+ }
+ stream->total_index_received++;
+ }
+
+error:
+ return ret;
+}
+
/*
* relay_process_data: Process the data received on the data socket
*/
static
int relay_process_data(struct relay_command *cmd)
{
- int ret = 0, rotate_index = 0, index_created = 0;
+ int ret = 0, rotate_index = 0;
struct relay_stream *stream;
- struct relay_index *index, *wr_index = NULL;
struct lttcomm_relayd_data_hdr data_hdr;
- uint64_t stream_id, data_offset;
+ uint64_t stream_id;
uint64_t net_seq_num;
uint32_t data_size;
rotate_index = 1;
}
- /* Get data offset because we are about to update the index. */
- data_offset = htobe64(stream->tracefile_size_current);
-
/*
- * Lookup for an existing index for that stream id/sequence number. If on
- * exists, the control thread already received the data for it thus we need
- * to write it on disk.
+ * Index are handled in protocol version 2.4 and above. Also, snapshot and
+ * index are NOT supported.
*/
- index = relay_index_find(stream_id, net_seq_num);
- if (!index) {
- /* A successful creation will add the object to the HT. */
- index = relay_index_create(stream->stream_handle, net_seq_num);
- if (!index) {
- goto end_rcu_unlock;
- }
- index_created = 1;
- }
-
- if (rotate_index || stream->index_fd < 0) {
- index->to_close_fd = stream->index_fd;
- ret = index_create_file(stream->path_name, stream->channel_name,
- relayd_uid, relayd_gid, stream->tracefile_size,
- stream->tracefile_count_current);
+ if (stream->session->minor >= 4 && !stream->session->snapshot) {
+ ret = handle_index_data(stream, net_seq_num, rotate_index);
if (ret < 0) {
- /* This will close the stream's index fd if one. */
- relay_index_free_safe(index);
goto end_rcu_unlock;
}
- stream->index_fd = ret;
- }
- index->fd = stream->index_fd;
- index->index_data.offset = data_offset;
-
- if (index_created) {
- /*
- * Try to add the relay index object to the hash table. If an object
- * already exist, destroy back the index created and set the data.
- */
- relay_index_add(index, &wr_index);
- if (wr_index) {
- /* Copy back data from the created index. */
- wr_index->fd = index->fd;
- wr_index->to_close_fd = index->to_close_fd;
- wr_index->index_data.offset = data_offset;
- free(index);
- }
- } else {
- /* The index already exists so write it on disk. */
- wr_index = index;
- }
-
- /* Do we have a writable ready index to write on disk. */
- if (wr_index) {
- /* Starting at 2.4, create the index file if none available. */
- if (cmd->minor >= 4 && stream->index_fd < 0) {
- ret = index_create_file(stream->path_name, stream->channel_name,
- relayd_uid, relayd_gid, stream->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
- goto end_rcu_unlock;
- }
- stream->index_fd = ret;
- }
-
- ret = relay_index_write(wr_index->fd, wr_index);
- if (ret < 0) {
- goto end_rcu_unlock;
- }
- stream->total_index_received++;
}
+ /* Write data to stream output fd. */
do {
ret = write(stream->fd, data_buffer, data_size);
} while (ret < 0 && errno == EINTR);
&iter, relay_connection, sessions_ht);
}
}
-error_poll_create:
- {
- struct relay_index *index;
- cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) {
- relay_index_delete(index);
- }
- lttng_ht_destroy(indexes_ht);
- }
rcu_read_unlock();
+error_poll_create:
+ lttng_ht_destroy(indexes_ht);
indexes_ht_error:
lttng_ht_destroy(relay_connections_ht);
relay_connections_ht_error:
*/
static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
uint64_t *session_id, char *session_name, char *hostname,
- int session_live_timer)
+ int session_live_timer, unsigned int snapshot)
{
int ret;
struct lttcomm_relayd_create_session_2_4 msg;
strncpy(msg.session_name, session_name, sizeof(msg.session_name));
strncpy(msg.hostname, hostname, sizeof(msg.hostname));
msg.live_timer = htobe32(session_live_timer);
+ msg.snapshot = htobe32(snapshot);
/* Send command */
ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
* a lttng error code from the relayd.
*/
int relayd_create_session(struct lttcomm_relayd_sock *rsock, uint64_t *session_id,
- char *session_name, char *hostname, int session_live_timer)
+ char *session_name, char *hostname, int session_live_timer,
+ unsigned int snapshot)
{
int ret;
struct lttcomm_relayd_status_session reply;
ret = relayd_create_session_2_1(rsock, session_id);
case 4:
default:
- ret = relayd_create_session_2_4(rsock, session_id,
- session_name, hostname,
- session_live_timer);
+ ret = relayd_create_session_2_4(rsock, session_id, session_name,
+ hostname, session_live_timer, snapshot);
}
if (ret < 0) {