Fix: consumerd channel destroy race
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 1 May 2013 20:50:55 +0000 (16:50 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 1 May 2013 20:50:55 +0000 (16:50 -0400)
The LTTNG_CONSUMER_DESTROY_CHANNEL command is used to handle errors. It
should not call destroy_channel(), because at that point the channel
management thread has ownership of the channel. Send a notification to
the channel management thread asking it to destroy the channel instead.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
src/common/consumer.c
src/common/consumer.h
src/common/ust-consumer/ust-consumer.c

index 0e9c52e835dc594d2fa773452045208a26ea3cac..c4518cc03a7d8891715704cd40a8983f50d08650 100644 (file)
@@ -50,12 +50,14 @@ struct lttng_consumer_global_data consumer_data = {
 
 enum consumer_channel_action {
        CONSUMER_CHANNEL_ADD,
+       CONSUMER_CHANNEL_DEL,
        CONSUMER_CHANNEL_QUIT,
 };
 
 struct consumer_channel_msg {
        enum consumer_channel_action action;
-       struct lttng_consumer_channel *chan;
+       struct lttng_consumer_channel *chan;    /* add */
+       uint64_t key;                           /* del */
 };
 
 /*
@@ -91,6 +93,7 @@ static void notify_thread_pipe(int wpipe)
 
 static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_channel *chan,
+               uint64_t key,
                enum consumer_channel_action action)
 {
        struct consumer_channel_msg msg;
@@ -103,8 +106,15 @@ static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
        } while (ret < 0 && errno == EINTR);
 }
 
+void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
+               uint64_t key)
+{
+       notify_channel_pipe(ctx, NULL, key, CONSUMER_CHANNEL_DEL);
+}
+
 static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_channel **chan,
+               uint64_t *key,
                enum consumer_channel_action *action)
 {
        struct consumer_channel_msg msg;
@@ -116,6 +126,7 @@ static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
        if (ret > 0) {
                *action = msg.action;
                *chan = msg.chan;
+               *key = msg.key;
        }
        return ret;
 }
@@ -902,7 +913,7 @@ end:
 
        if (!ret && channel->wait_fd != -1 &&
                        channel->metadata_stream == NULL) {
-               notify_channel_pipe(ctx, channel, CONSUMER_CHANNEL_ADD);
+               notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
        }
        return ret;
 }
@@ -2693,8 +2704,9 @@ restart:
                                        continue;
                                } else if (revents & LPOLLIN) {
                                        enum consumer_channel_action action;
+                                       uint64_t key;
 
-                                       ret = read_channel_pipe(ctx, &chan, &action);
+                                       ret = read_channel_pipe(ctx, &chan, &key, &action);
                                        if (ret <= 0) {
                                                ERR("Error reading channel pipe");
                                                continue;
@@ -2713,6 +2725,27 @@ restart:
                                                lttng_poll_add(&events, chan->wait_fd,
                                                                LPOLLIN | LPOLLPRI);
                                                break;
+                                       case CONSUMER_CHANNEL_DEL:
+                                       {
+                                               chan = consumer_find_channel(key);
+                                               if (!chan) {
+                                                       ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
+                                                       break;
+                                               }
+                                               lttng_poll_del(&events, chan->wait_fd);
+                                               ret = lttng_ht_del(channel_ht, &iter);
+                                               assert(ret == 0);
+                                               consumer_close_channel_streams(chan);
+
+                                               /*
+                                                * Release our own refcount. Force channel deletion even if
+                                                * streams were not initialized.
+                                                */
+                                               if (!uatomic_sub_return(&chan->refcount, 1)) {
+                                                       consumer_del_channel(chan);
+                                               }
+                                               goto restart;
+                                       }
                                        case CONSUMER_CHANNEL_QUIT:
                                                /*
                                                 * Remove the pipe from the poll set and continue the loop
@@ -2933,7 +2966,7 @@ end:
         */
        notify_thread_pipe(ctx->consumer_data_pipe[1]);
 
-       notify_channel_pipe(ctx, NULL, CONSUMER_CHANNEL_QUIT);
+       notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
 
        /* Cleaning up possibly open sockets. */
        if (sock >= 0) {
index 844310681db21ccddf86b9ad47dfa5cb249bef2d..43989e4c55757d760cad1b4fdf93ca29b739453c 100644 (file)
@@ -527,5 +527,7 @@ int consumer_data_pending(uint64_t id);
 int consumer_send_status_msg(int sock, int ret_code);
 int consumer_send_status_channel(int sock,
                struct lttng_consumer_channel *channel);
+void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
+               uint64_t key);
 
 #endif /* LIB_CONSUMER_H */
index 8c15c7e135016098f3cb90a94923ec9eb3fc9d93..7d9444cd00b84300450e69676b6d42a668032e67 100644 (file)
@@ -998,17 +998,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        case LTTNG_CONSUMER_DESTROY_CHANNEL:
        {
                uint64_t key = msg.u.destroy_channel.key;
-               struct lttng_consumer_channel *channel;
-
-               channel = consumer_find_channel(key);
-               if (!channel) {
-                       ERR("UST consumer get channel key %" PRIu64 " not found", key);
-                       ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
-                       goto end_msg_sessiond;
-               }
-
-               destroy_channel(channel);
 
+               /*
+                * Only called if streams have not been sent to stream
+                * manager thread. However, channel has been sent to
+                * channel manager thread.
+                */
+               notify_thread_del_channel(ctx, key);
                goto end_msg_sessiond;
        }
        case LTTNG_CONSUMER_CLOSE_METADATA:
This page took 0.030145 seconds and 4 git commands to generate.