#include <common/common.h>
#include <common/utils.h>
+#include "lttng-relayd.h"
#include "index.h"
/*
*
* Return index object or else NULL on error.
*/
-struct relay_index *relay_index_find(uint64_t stream_id,
- uint64_t net_seq_num, struct lttng_ht *ht)
+struct relay_index *relay_index_find(uint64_t stream_id, uint64_t net_seq_num)
{
struct lttng_ht_node_two_u64 *node;
struct lttng_ht_iter iter;
struct lttng_ht_two_u64 key;
struct relay_index *index = NULL;
- assert(ht);
-
DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
stream_id, net_seq_num);
key.key1 = stream_id;
key.key2 = net_seq_num;
- lttng_ht_lookup(ht, (void *)(&key), &iter);
+ lttng_ht_lookup(indexes_ht, (void *)(&key), &iter);
node = lttng_ht_iter_get_node_two_u64(&iter);
if (node == NULL) {
goto end;
*
* RCU read side lock MUST be acquired.
*/
-void relay_index_add(struct relay_index *index, struct lttng_ht *ht,
- struct relay_index **_index)
+void relay_index_add(struct relay_index *index, struct relay_index **_index)
{
struct cds_lfht_node *node_ptr;
assert(index);
- assert(ht);
- assert(_index);
DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
index->key.key1, index->key.key2);
- node_ptr = cds_lfht_add_unique(ht->ht,
- ht->hash_fct((void *) &index->index_n.key, lttng_ht_seed),
- ht->match_fct, (void *) &index->index_n.key,
+ node_ptr = cds_lfht_add_unique(indexes_ht->ht,
+ indexes_ht->hash_fct((void *) &index->index_n.key, lttng_ht_seed),
+ indexes_ht->match_fct, (void *) &index->index_n.key,
&index->index_n.node);
if (node_ptr != &index->index_n.node) {
*_index = caa_container_of(node_ptr, struct relay_index, index_n.node);
*
* Return 0 on success else a negative value.
*/
-int relay_index_write(int fd, struct relay_index *index, struct lttng_ht *ht)
+int relay_index_write(int fd, struct relay_index *index)
{
int ret;
struct lttng_ht_iter iter;
/* Delete index from hash table. */
iter.iter.node = &index->index_n.node;
- ret = lttng_ht_del(ht, &iter);
+ ret = lttng_ht_del(indexes_ht, &iter);
assert(!ret);
call_rcu(&index->rcu_node, deferred_free_relay_index);
*
* RCU read side lock MUST be acquired.
*/
-void relay_index_delete(struct relay_index *index, struct lttng_ht *ht)
+void relay_index_delete(struct relay_index *index)
{
int ret;
struct lttng_ht_iter iter;
/* Delete index from hash table. */
iter.iter.node = &index->index_n.node;
- ret = lttng_ht_del(ht, &iter);
+ ret = lttng_ht_del(indexes_ht, &iter);
assert(!ret);
}
/*
* Destroy every relay index with the given stream id as part of the key.
*/
-void relay_index_destroy_by_stream_id(uint64_t stream_id, struct lttng_ht *ht)
+void relay_index_destroy_by_stream_id(uint64_t stream_id)
{
struct lttng_ht_iter iter;
struct relay_index *index;
- assert(ht);
- assert(ht->ht);
-
rcu_read_lock();
- cds_lfht_for_each_entry(ht->ht, &iter.iter, index, index_n.node) {
+ cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) {
if (index->key.key1 == stream_id) {
- relay_index_delete(index, ht);
+ relay_index_delete(index);
relay_index_free_safe(index);
}
}
struct relay_index *relay_index_create(uint64_t stream_id,
uint64_t net_seq_num);
-struct relay_index *relay_index_find(uint64_t stream_id,
- uint64_t net_seq_num, struct lttng_ht *ht);
-void relay_index_add(struct relay_index *index, struct lttng_ht *ht,
- struct relay_index **_index);
-int relay_index_write(int fd, struct relay_index *index, struct lttng_ht *ht);
+struct relay_index *relay_index_find(uint64_t stream_id, uint64_t net_seq_num);
+void relay_index_add(struct relay_index *index, struct relay_index **_index);
+int relay_index_write(int fd, struct relay_index *index);
void relay_index_free(struct relay_index *index);
void relay_index_free_safe(struct relay_index *index);
-void relay_index_delete(struct relay_index *index, struct lttng_ht *ht);
-void relay_index_destroy_by_stream_id(uint64_t stream_id, struct lttng_ht *ht);
+void relay_index_delete(struct relay_index *index);
+void relay_index_destroy_by_stream_id(uint64_t stream_id);
#endif /* _RELAY_INDEX_H */
extern struct lttng_ht *relay_streams_ht;
extern struct lttng_ht *viewer_streams_ht;
+extern struct lttng_ht *indexes_ht;
struct relay_stream *relay_stream_find_by_id(uint64_t stream_id);
static char *data_buffer;
static unsigned int data_buffer_size;
-/* Global hash table that stores relay index object. */
-static struct lttng_ht *indexes_ht;
-
/* We need those values for the file/dir creation. */
static uid_t relayd_uid;
static gid_t relayd_gid;
/* Global relay viewer stream hash table. */
struct lttng_ht *viewer_streams_ht;
+/* Global hash table that stores relay index object. */
+struct lttng_ht *indexes_ht;
+
/*
* usage function on stderr
*/
destroy_stream(stream, cmd->ctf_traces_ht);
}
/* Cleanup index of that stream. */
- relay_index_destroy_by_stream_id(stream->stream_handle, indexes_ht);
+ relay_index_destroy_by_stream_id(stream->stream_handle);
}
/* Make this session not visible anymore. */
*/
static
int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *indexes_ht)
+ struct relay_command *cmd)
{
int ret, send_ret, index_created = 0;
struct relay_session *session = cmd->session;
uint64_t net_seq_num;
assert(cmd);
- assert(indexes_ht);
DBG("Relay receiving index");
stream->beacon_ts_end = -1ULL;
}
- index = relay_index_find(stream->stream_handle, net_seq_num, indexes_ht);
+ index = relay_index_find(stream->stream_handle, net_seq_num);
if (!index) {
/* A successful creation will add the object to the HT. */
index = relay_index_create(stream->stream_handle, net_seq_num);
* already exist, destroy back the index created, set the data in this
* object and write it on disk.
*/
- relay_index_add(index, indexes_ht, &wr_index);
+ relay_index_add(index, &wr_index);
if (wr_index) {
copy_index_control_data(wr_index, &index_info);
free(index);
stream->index_fd = ret;
}
- ret = relay_index_write(wr_index->fd, wr_index, indexes_ht);
+ ret = relay_index_write(wr_index->fd, wr_index);
if (ret < 0) {
goto end_rcu_unlock;
}
ret = relay_end_data_pending(recv_hdr, cmd);
break;
case RELAYD_SEND_INDEX:
- ret = relay_recv_index(recv_hdr, cmd, indexes_ht);
+ ret = relay_recv_index(recv_hdr, cmd);
break;
case RELAYD_UPDATE_SYNC_INFO:
default:
* relay_process_data: Process the data received on the data socket
*/
static
-int relay_process_data(struct relay_command *cmd,
- struct lttng_ht *indexes_ht)
+int relay_process_data(struct relay_command *cmd)
{
int ret = 0, rotate_index = 0, index_created = 0;
struct relay_stream *stream;
* exists, the control thread already received the data for it thus we need
* to write it on disk.
*/
- index = relay_index_find(stream_id, net_seq_num, indexes_ht);
+ index = relay_index_find(stream_id, net_seq_num);
if (!index) {
/* A successful creation will add the object to the HT. */
index = relay_index_create(stream->stream_handle, net_seq_num);
* Try to add the relay index object to the hash table. If an object
* already exist, destroy back the index created and set the data.
*/
- relay_index_add(index, indexes_ht, &wr_index);
+ relay_index_add(index, &wr_index);
if (wr_index) {
/* Copy back data from the created index. */
wr_index->fd = index->fd;
stream->index_fd = ret;
}
- ret = relay_index_write(wr_index->fd, wr_index, indexes_ht);
+ ret = relay_index_write(wr_index->fd, wr_index);
if (ret < 0) {
goto end_rcu_unlock;
}
continue;
}
- ret = relay_process_data(relay_connection, indexes_ht);
+ ret = relay_process_data(relay_connection);
/* connection closed */
if (ret < 0) {
relay_cleanup_poll_connection(&events, pollfd);
{
struct relay_index *index;
cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) {
- relay_index_delete(index, indexes_ht);
+ relay_index_delete(index);
}
lttng_ht_destroy(indexes_ht);
}