Fix: consumer: 64-bit index for relayd rather than 32-bit (v2)
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 28 Jun 2013 15:26:04 +0000 (11:26 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Fri, 28 Jun 2013 15:43:30 +0000 (11:43 -0400)
Relayd "unique" ids wrap every 32-bit, and in some cases, negative
values are considered as error.

Change this to make the error value specifically -1ULL, use a direct
comparison (since we use an unsigned 64-bit integer, comparison with 0
becomes incorrect).

Since we now use a 64-bit ID, it is assumed to _never_ wrap-around
(remember, value -1ULL is an _error_). Therefore,
consumer_add_relayd_socket() can become much more strict than it was:
instead of accepting re-use of net_seq_idx, we can now assert that upon
LTTNG_STREAM_CONTROL socket, we have indeed allocated a relayd object,
and upon LTTNG_STREAM_DATA, we have found a relayd object.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
src/common/consumer.c
src/common/consumer.h
src/common/ust-consumer/ust-consumer.c

index 0518f44ca902e8b55054d93a28a17314b89a740f..6830084e8c5a959f7407f80e0a35061089a738a2 100644 (file)
@@ -165,20 +165,20 @@ static struct lttng_consumer_stream *find_stream(uint64_t key,
        return stream;
 }
 
-static void steal_stream_key(int key, struct lttng_ht *ht)
+static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
 {
        struct lttng_consumer_stream *stream;
 
        rcu_read_lock();
        stream = find_stream(key, ht);
        if (stream) {
-               stream->key = -1ULL;
+               stream->key = (uint64_t) -1ULL;
                /*
                 * We don't want the lookup to match, but we still need
                 * to iterate on this stream when iterating over the hash table. Just
                 * change the node key.
                 */
-               stream->node.key = -1ULL;
+               stream->node.key = (uint64_t) -1ULL;
        }
        rcu_read_unlock();
 }
@@ -370,13 +370,13 @@ static void cleanup_relayd_ht(void)
  * It's atomically set without having the stream mutex locked which is fine
  * because we handle the write/read race with a pipe wakeup for each thread.
  */
-static void update_endpoint_status_by_netidx(int net_seq_idx,
+static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
                enum consumer_endpoint_status status)
 {
        struct lttng_ht_iter iter;
        struct lttng_consumer_stream *stream;
 
-       DBG("Consumer set delete flag on stream by idx %d", net_seq_idx);
+       DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
 
        rcu_read_lock();
 
@@ -409,7 +409,7 @@ static void update_endpoint_status_by_netidx(int net_seq_idx,
 static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
                struct lttng_consumer_local_data *ctx)
 {
-       int netidx;
+       uint64_t netidx;
 
        assert(relayd);
 
@@ -637,12 +637,12 @@ end:
  * Allocate and return a consumer relayd socket.
  */
 struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
-               int net_seq_idx)
+               uint64_t net_seq_idx)
 {
        struct consumer_relayd_sock_pair *obj = NULL;
 
-       /* Negative net sequence index is a failure */
-       if (net_seq_idx < 0) {
+       /* net sequence index of -1 is a failure */
+       if (net_seq_idx == (uint64_t) -1ULL) {
                goto error;
        }
 
@@ -1259,7 +1259,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        rcu_read_lock();
 
        /* Flag that the current stream if set for network streaming. */
-       if (stream->net_seq_idx != -1) {
+       if (stream->net_seq_idx != (uint64_t) -1ULL) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
                if (relayd == NULL) {
                        goto end;
@@ -1458,7 +1458,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
        rcu_read_lock();
 
        /* Flag that the current stream if set for network streaming. */
-       if (stream->net_seq_idx != -1) {
+       if (stream->net_seq_idx != (uint64_t) -1ULL) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
                if (relayd == NULL) {
                        goto end;
@@ -2996,7 +2996,7 @@ void lttng_consumer_init(void)
  * This will create a relayd socket pair and add it to the relayd hash table.
  * The caller MUST acquire a RCU read side lock before calling it.
  */
-int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
+int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
                struct pollfd *consumer_sockpoll,
                struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id)
@@ -3008,11 +3008,12 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
        assert(ctx);
        assert(relayd_sock);
 
-       DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
+       DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
 
        /* Get relayd reference if exists. */
        relayd = consumer_find_relayd(net_seq_idx);
        if (relayd == NULL) {
+               assert(sock_type == LTTNG_STREAM_CONTROL);
                /* Not found. Allocate one. */
                relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
                if (relayd == NULL) {
@@ -3028,6 +3029,11 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                 * we can notify the session daemon and continue our work without
                 * killing everything.
                 */
+       } else {
+               /*
+                * relayd key should never be found for control socket.
+                */
+               assert(sock_type != LTTNG_STREAM_CONTROL);
        }
 
        /* First send a status message before receiving the fds. */
@@ -3082,16 +3088,18 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                /* Copy received lttcomm socket */
                lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
                ret = lttcomm_create_sock(&relayd->control_sock.sock);
-               /* Immediately try to close the created socket if valid. */
-               if (relayd->control_sock.sock.fd >= 0) {
-                       if (close(relayd->control_sock.sock.fd)) {
-                               PERROR("close relayd control socket");
-                       }
-               }
                /* Handle create_sock error. */
                if (ret < 0) {
                        goto error;
                }
+               /*
+                * Close the socket created internally by
+                * lttcomm_create_sock, so we can replace it by the one
+                * received from sessiond.
+                */
+               if (close(relayd->control_sock.sock.fd)) {
+                       PERROR("close");
+               }
 
                /* Assign new file descriptor */
                relayd->control_sock.sock.fd = fd;
@@ -3128,16 +3136,18 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                /* Copy received lttcomm socket */
                lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
                ret = lttcomm_create_sock(&relayd->data_sock.sock);
-               /* Immediately try to close the created socket if valid. */
-               if (relayd->data_sock.sock.fd >= 0) {
-                       if (close(relayd->data_sock.sock.fd)) {
-                               PERROR("close relayd data socket");
-                       }
-               }
                /* Handle create_sock error. */
                if (ret < 0) {
                        goto error;
                }
+               /*
+                * Close the socket created internally by
+                * lttcomm_create_sock, so we can replace it by the one
+                * received from sessiond.
+                */
+               if (close(relayd->data_sock.sock.fd)) {
+                       PERROR("close");
+               }
 
                /* Assign new file descriptor */
                relayd->data_sock.sock.fd = fd;
index ca567d7ff8844dcdb99b5aab0bb20bf4897e4256..fa503ea111836c18335eb7863123275daa2a6edc 100644 (file)
@@ -118,8 +118,8 @@ struct lttng_consumer_channel {
        /* UID and GID of the channel. */
        uid_t uid;
        gid_t gid;
-       /* Relayd id of the channel. -1 if it does not apply. */
-       int64_t relayd_id;
+       /* Relayd id of the channel. -1ULL if it does not apply. */
+       uint64_t relayd_id;
        /*
         * Number of streams NOT initialized yet. This is used in order to not
         * delete this channel if streams are getting initialized.
@@ -275,7 +275,7 @@ struct lttng_consumer_stream {
  */
 struct consumer_relayd_sock_pair {
        /* Network sequence number. */
-       int64_t net_seq_idx;
+       uint64_t net_seq_idx;
        /* Number of stream associated with this relayd */
        unsigned int refcount;
 
@@ -510,7 +510,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel);
 
 /* lttng-relayd consumer command */
 struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
-               int net_seq_idx);
+               uint64_t net_seq_idx);
 struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key);
 struct lttng_consumer_channel *consumer_find_channel(uint64_t key);
 int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
@@ -546,7 +546,7 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx);
 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
-int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
+int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
                struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock,
                unsigned int sessiond_id);
index 432f43a60c92d78aff436370e717ca382231d466..18eb507dec003a08381f6c6a17f6da9997769850 100644 (file)
@@ -113,7 +113,7 @@ error:
  */
 static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
                const char *pathname, const char *name, uid_t uid, gid_t gid,
-               int relayd_id, uint64_t key, enum lttng_event_output output,
+               uint64_t relayd_id, uint64_t key, enum lttng_event_output output,
                uint64_t tracefile_size, uint64_t tracefile_count,
                uint64_t session_id_per_pid, unsigned int monitor)
 {
This page took 0.031426 seconds and 4 git commands to generate.