sessiond: notification: synchronize notification client (and list)
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 11 Feb 2020 04:06:19 +0000 (23:06 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 11 Aug 2020 20:58:27 +0000 (16:58 -0400)
Introduce reference counting to notification_client_list and add
locks to both notification_client and notification_client list.

Of important note, this is in preparation for the introduction of
an action executor thread. The aim of this change is not to make
any part of the notitication sub-system thread-safe in any "general"
sense.

The reference counting and locks are introduced to protect a very
specific usage scenario.

The main thread of the notification subsystem and the action executor
will interact through triggers, evaluations, and client lists.

If the action executor needs to send a notification to a list of
client during the execution of an action group, it obtains the client
list and acquires a reference to it. It then locks the list to iterate
on it, allowing it to send the notification(s) to all subscribed
clients.

Holding the list lock prevents the main thread from disconnecting and
subsequently destroying a client. Holding a reference to the list also
prevents the list from being reclaimed due to a concurent 'unregister
trigger' operation.

No provision for other access scenarios are taken into account.

Squashed fix, otherwise the tests would hang.
Fix: don't hold client lock while handling subscription changes

Holding the client's lock while handling subscription changes causes a
lock inversion between the client list lock and the client lock.

This happens when a client subscribes to a condition that evaluates to
'true' at the time of the subscription. When this happens, a
notification is sent right away and that communication will attempt to
acquire the client lock.

Holding the client lock for such a long period is not needed anyhow
and we can simply protect the communication state when it is actually
modified/used.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I7f38f81fa8bc32e5384538acdffab0824862cff2

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I88a9f66d31e3127b2bcc3f04016c364e0b3fe9ce

src/bin/lttng-sessiond/notification-thread-events.c

index a2fa8a1ca854933bdf505508002b3d113f609ce4..285bcf951aafe3a5bb2801777e7a13eb06dc5ec7 100644 (file)
@@ -122,15 +122,40 @@ struct notification_client_list_element {
        struct cds_list_head node;
 };
 
+/*
+ * Thread safety of notification_client and notification_client_list.
+ *
+ * The notification thread (main thread) and the action executor
+ * interact through client lists. Hence, when the action executor
+ * thread looks-up the list of clients subscribed to a given
+ * condition, it will acquire a reference to the list and lock it
+ * while attempting to communicate with the various clients.
+ *
+ * It is not necessary to reference-count clients as they are guaranteed
+ * to be 'alive' if they are present in a list and that list is locked. Indeed,
+ * removing references to the client from those subscription lists is part of
+ * the work performed on destruction of a client.
+ *
+ * No provision for other access scenarios are taken into account;
+ * this is the bare minimum to make these accesses safe and the
+ * notification thread's state is _not_ "thread-safe" in any general
+ * sense.
+ */
 struct notification_client_list {
+       pthread_mutex_t lock;
+       struct urcu_ref ref;
        const struct lttng_trigger *trigger;
        struct cds_list_head list;
-       struct cds_lfht_node notification_trigger_ht_node;
+       /* Weak reference to container. */
+       struct cds_lfht *notification_trigger_clients_ht;
+       struct cds_lfht_node notification_trigger_clients_ht_node;
        /* call_rcu delayed reclaim. */
        struct rcu_head rcu_node;
 };
 
 struct notification_client {
+       /* Nests within the notification_client_list lock. */
+       pthread_mutex_t lock;
        notification_client_id id;
        int socket;
        /* Client protocol version. */
@@ -152,10 +177,11 @@ struct notification_client {
        struct cds_lfht_node client_id_ht_node;
        struct {
                /*
-                * If a client's communication is inactive, it means a fatal
-                * error (either a protocol error or the socket API returned
-                * a fatal error). No further communication should be attempted;
-                * the client is queued for clean-up.
+                * If a client's communication is inactive, it means that a
+                * fatal error has occurred (could be either a protocol error or
+                * the socket API returned a fatal error). No further
+                * communication should be attempted; the client is queued for
+                * clean-up.
                 */
                bool active;
                struct {
@@ -368,7 +394,7 @@ int match_client_list_condition(struct cds_lfht_node *node, const void *key)
        assert(condition_key);
 
        client_list = caa_container_of(node, struct notification_client_list,
-                       notification_trigger_ht_node);
+                       notification_trigger_clients_ht_node);
        condition = lttng_trigger_get_const_condition(client_list->trigger);
 
        return !!lttng_condition_is_equal(condition_key, condition);
@@ -696,7 +722,90 @@ error:
        return NULL;
 }
 
-/* RCU read lock must be held by the caller. */
+static
+bool notification_client_list_get(struct notification_client_list *list)
+{
+       return urcu_ref_get_unless_zero(&list->ref);
+}
+
+static
+void free_notification_client_list_rcu(struct rcu_head *node)
+{
+       free(caa_container_of(node, struct notification_client_list,
+                       rcu_node));
+}
+
+static
+void notification_client_list_release(struct urcu_ref *list_ref)
+{
+       struct notification_client_list *list =
+                       container_of(list_ref, typeof(*list), ref);
+       struct notification_client_list_element *client_list_element, *tmp;
+
+       if (list->notification_trigger_clients_ht) {
+               rcu_read_lock();
+               cds_lfht_del(list->notification_trigger_clients_ht,
+                               &list->notification_trigger_clients_ht_node);
+               rcu_read_unlock();
+               list->notification_trigger_clients_ht = NULL;
+       }
+       cds_list_for_each_entry_safe(client_list_element, tmp,
+                                    &list->list, node) {
+               free(client_list_element);
+       }
+       pthread_mutex_destroy(&list->lock);
+       call_rcu(&list->rcu_node, free_notification_client_list_rcu);
+}
+
+static
+struct notification_client_list *notification_client_list_create(
+               const struct lttng_trigger *trigger)
+{
+       struct notification_client_list *client_list =
+                       zmalloc(sizeof(*client_list));
+
+       if (!client_list) {
+               goto error;
+       }
+       pthread_mutex_init(&client_list->lock, NULL);
+       urcu_ref_init(&client_list->ref);
+       cds_lfht_node_init(&client_list->notification_trigger_clients_ht_node);
+       CDS_INIT_LIST_HEAD(&client_list->list);
+       client_list->trigger = trigger;
+error:
+       return client_list;
+}
+
+static
+void publish_notification_client_list(
+               struct notification_thread_state *state,
+               struct notification_client_list *list)
+{
+       const struct lttng_condition *condition =
+                       lttng_trigger_get_const_condition(list->trigger);
+
+       assert(!list->notification_trigger_clients_ht);
+
+       list->notification_trigger_clients_ht =
+                       state->notification_trigger_clients_ht;
+
+       rcu_read_lock();
+       cds_lfht_add(state->notification_trigger_clients_ht,
+                       lttng_condition_hash(condition),
+                       &list->notification_trigger_clients_ht_node);
+       rcu_read_unlock();
+}
+
+static
+void notification_client_list_put(struct notification_client_list *list)
+{
+       if (!list) {
+               return;
+       }
+       return urcu_ref_put(&list->ref, notification_client_list_release);
+}
+
+/* Provides a reference to the returned list. */
 static
 struct notification_client_list *get_client_list_from_condition(
        struct notification_thread_state *state,
@@ -704,20 +813,25 @@ struct notification_client_list *get_client_list_from_condition(
 {
        struct cds_lfht_node *node;
        struct cds_lfht_iter iter;
+       struct notification_client_list *list = NULL;
 
+       rcu_read_lock();
        cds_lfht_lookup(state->notification_trigger_clients_ht,
                        lttng_condition_hash(condition),
                        match_client_list_condition,
                        condition,
                        &iter);
        node = cds_lfht_iter_get_node(&iter);
+       if (node) {
+               list = container_of(node, struct notification_client_list,
+                               notification_trigger_clients_ht_node);
+               list = notification_client_list_get(list) ? list : NULL;
+       }
 
-       return node ? caa_container_of(node,
-                       struct notification_client_list,
-                       notification_trigger_ht_node) : NULL;
+       rcu_read_unlock();
+       return list;
 }
 
-/* This function must be called with the RCU read lock held. */
 static
 int evaluate_channel_condition_for_client(
                const struct lttng_condition *condition,
@@ -733,6 +847,8 @@ int evaluate_channel_condition_for_client(
        struct channel_state_sample *last_sample = NULL;
        struct lttng_channel_trigger_list *channel_trigger_list = NULL;
 
+       rcu_read_lock();
+
        /* Find the channel associated with the condition. */
        cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
                        channel_trigger_list, channel_triggers_ht_node) {
@@ -807,6 +923,7 @@ int evaluate_channel_condition_for_client(
        *session_uid = channel_info->session_info->uid;
        *session_gid = channel_info->session_info->gid;
 end:
+       rcu_read_unlock();
        return ret;
 }
 
@@ -842,7 +959,6 @@ end:
        return session_name;
 }
 
-/* This function must be called with the RCU read lock held. */
 static
 int evaluate_session_condition_for_client(
                const struct lttng_condition *condition,
@@ -856,6 +972,7 @@ int evaluate_session_condition_for_client(
        const char *session_name;
        struct session_info *session_info = NULL;
 
+       rcu_read_lock();
        session_name = get_condition_session_name(condition);
 
        /* Find the session associated with the trigger. */
@@ -909,10 +1026,10 @@ int evaluate_session_condition_for_client(
 end_session_put:
        session_info_put(session_info);
 end:
+       rcu_read_unlock();
        return ret;
 }
 
-/* This function must be called with the RCU read lock held. */
 static
 int evaluate_condition_for_client(const struct lttng_trigger *trigger,
                const struct lttng_condition *condition,
@@ -921,7 +1038,9 @@ int evaluate_condition_for_client(const struct lttng_trigger *trigger,
 {
        int ret;
        struct lttng_evaluation *evaluation = NULL;
-       struct notification_client_list client_list = { 0 };
+       struct notification_client_list client_list = {
+               .lock = PTHREAD_MUTEX_INITIALIZER,
+       };
        struct notification_client_list_element client_list_element = { 0 };
        uid_t object_uid = 0;
        gid_t object_gid = 0;
@@ -963,7 +1082,7 @@ int evaluate_condition_for_client(const struct lttng_trigger *trigger,
         * Create a temporary client list with the client currently
         * subscribing.
         */
-       cds_lfht_node_init(&client_list.notification_trigger_ht_node);
+       cds_lfht_node_init(&client_list.notification_trigger_clients_ht_node);
        CDS_INIT_LIST_HEAD(&client_list.list);
        client_list.trigger = trigger;
 
@@ -987,7 +1106,7 @@ int notification_thread_client_subscribe(struct notification_client *client,
                enum lttng_notification_channel_status *_status)
 {
        int ret = 0;
-       struct notification_client_list *client_list;
+       struct notification_client_list *client_list = NULL;
        struct lttng_condition_list_element *condition_list_element = NULL;
        struct notification_client_list_element *client_list_element = NULL;
        enum lttng_notification_channel_status status =
@@ -1016,8 +1135,6 @@ int notification_thread_client_subscribe(struct notification_client *client,
                goto error;
        }
 
-       rcu_read_lock();
-
        /*
         * Add the newly-subscribed condition to the client's subscription list.
         */
@@ -1033,20 +1150,24 @@ int notification_thread_client_subscribe(struct notification_client *client,
                 * since this trigger is not registered yet.
                 */
                free(client_list_element);
-               goto end_unlock;
+               goto end;
        }
 
        /*
         * The condition to which the client just subscribed is evaluated
         * at this point so that conditions that are already TRUE result
         * in a notification being sent out.
+        *
+        * The client_list's trigger is used without locking the list itself.
+        * This is correct since the list doesn't own the trigger and the
+        * object is immutable.
         */
        if (evaluate_condition_for_client(client_list->trigger, condition,
                        client, state)) {
                WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
                ret = -1;
                free(client_list_element);
-               goto end_unlock;
+               goto end;
        }
 
        /*
@@ -1056,13 +1177,17 @@ int notification_thread_client_subscribe(struct notification_client *client,
         */
        client_list_element->client = client;
        CDS_INIT_LIST_HEAD(&client_list_element->node);
+
+       pthread_mutex_lock(&client_list->lock);
        cds_list_add(&client_list_element->node, &client_list->list);
-end_unlock:
-       rcu_read_unlock();
+       pthread_mutex_unlock(&client_list->lock);
 end:
        if (_status) {
                *_status = status;
        }
+       if (client_list) {
+               notification_client_list_put(client_list);
+       }
        return ret;
 error:
        free(condition_list_element);
@@ -1118,23 +1243,24 @@ int notification_thread_client_unsubscribe(
         * Remove the client from the list of clients interested the trigger
         * matching the condition.
         */
-       rcu_read_lock();
        client_list = get_client_list_from_condition(state, condition);
        if (!client_list) {
-               goto end_unlock;
+               goto end;
        }
 
+       pthread_mutex_lock(&client_list->lock);
        cds_list_for_each_entry_safe(client_list_element, client_tmp,
                        &client_list->list, node) {
-               if (client_list_element->client->socket != client->socket) {
+               if (client_list_element->client->id != client->id) {
                        continue;
                }
                cds_list_del(&client_list_element->node);
                free(client_list_element);
                break;
        }
-end_unlock:
-       rcu_read_unlock();
+       pthread_mutex_unlock(&client_list->lock);
+       notification_client_list_put(client_list);
+       client_list = NULL;
 end:
        lttng_condition_destroy(condition);
        if (_status) {
@@ -1153,25 +1279,22 @@ static
 void notification_client_destroy(struct notification_client *client,
                struct notification_thread_state *state)
 {
-       struct lttng_condition_list_element *condition_list_element, *tmp;
-
        if (!client) {
                return;
        }
 
-       /* Release all conditions to which the client was subscribed. */
-       cds_list_for_each_entry_safe(condition_list_element, tmp,
-                       &client->condition_list, node) {
-               (void) notification_thread_client_unsubscribe(client,
-                               condition_list_element->condition, state, NULL);
-       }
-
+       /*
+        * The client object is not reachable by other threads, no need to lock
+        * the client here.
+        */
        if (client->socket >= 0) {
                (void) lttcomm_close_unix_sock(client->socket);
                client->socket = -1;
        }
+       client->communication.active = false;
        lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
        lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
+       pthread_mutex_destroy(&client->lock);
        call_rcu(&client->rcu_node, free_notification_client_rcu);
 }
 
@@ -1203,34 +1326,6 @@ end:
        return client;
 }
 
-/*
- * Call with rcu_read_lock held (and hold for the lifetime of the returned
- * client pointer).
- */
-static
-struct notification_client *get_client_from_id(notification_client_id id,
-               struct notification_thread_state *state)
-{
-       struct cds_lfht_iter iter;
-       struct cds_lfht_node *node;
-       struct notification_client *client = NULL;
-
-       cds_lfht_lookup(state->client_id_ht,
-                       hash_client_id(id),
-                       match_client_id,
-                       &id,
-                       &iter);
-       node = cds_lfht_iter_get_node(&iter);
-       if (!node) {
-               goto end;
-       }
-
-       client = caa_container_of(node, struct notification_client,
-                       client_id_ht_node);
-end:
-       return client;
-}
-
 static
 bool buffer_usage_condition_applies_to_channel(
                const struct lttng_condition *condition,
@@ -1821,6 +1916,7 @@ int handle_notification_thread_command_session_rotation(
                struct notification_client_list *client_list;
                struct lttng_evaluation *evaluation = NULL;
                enum lttng_condition_type condition_type;
+               bool client_list_is_empty;
 
                trigger = trigger_list_element->trigger;
                condition = lttng_trigger_get_const_condition(trigger);
@@ -1844,7 +1940,10 @@ int handle_notification_thread_command_session_rotation(
                client_list = get_client_list_from_condition(state, condition);
                assert(client_list);
 
-               if (cds_list_empty(&client_list->list)) {
+               pthread_mutex_lock(&client_list->lock);
+               client_list_is_empty = cds_list_empty(&client_list->list);
+               pthread_mutex_unlock(&client_list->lock);
+               if (client_list_is_empty) {
                        /*
                         * No clients interested in the evaluation's result,
                         * skip it.
@@ -1864,7 +1963,7 @@ int handle_notification_thread_command_session_rotation(
                        /* Internal error */
                        ret = -1;
                        cmd_result = LTTNG_ERR_UNK;
-                       goto end;
+                       goto put_list;
                }
 
                /* Dispatch evaluation result to all clients. */
@@ -1873,8 +1972,10 @@ int handle_notification_thread_command_session_rotation(
                                session_info->uid,
                                session_info->gid);
                lttng_evaluation_destroy(evaluation);
+put_list:
+               notification_client_list_put(client_list);
                if (caa_unlikely(ret)) {
-                       goto end;
+                       break;
                }
        }
 end:
@@ -2095,14 +2196,11 @@ int handle_notification_thread_command_register_trigger(
         * It is not skipped as this is the only action type currently
         * supported.
         */
-       client_list = zmalloc(sizeof(*client_list));
+       client_list = notification_client_list_create(trigger);
        if (!client_list) {
                ret = -1;
                goto error_free_ht_element;
        }
-       cds_lfht_node_init(&client_list->notification_trigger_ht_node);
-       CDS_INIT_LIST_HEAD(&client_list->list);
-       client_list->trigger = trigger;
 
        /* Build a list of clients to which this new trigger applies. */
        cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
@@ -2114,23 +2212,19 @@ int handle_notification_thread_command_register_trigger(
                client_list_element = zmalloc(sizeof(*client_list_element));
                if (!client_list_element) {
                        ret = -1;
-                       goto error_free_client_list;
+                       goto error_put_client_list;
                }
                CDS_INIT_LIST_HEAD(&client_list_element->node);
                client_list_element->client = client;
                cds_list_add(&client_list_element->node, &client_list->list);
        }
 
-       cds_lfht_add(state->notification_trigger_clients_ht,
-                       lttng_condition_hash(condition),
-                       &client_list->notification_trigger_ht_node);
-
        switch (get_condition_binding_object(condition)) {
        case LTTNG_OBJECT_TYPE_SESSION:
                /* Add the trigger to the list if it matches a known session. */
                ret = bind_trigger_to_matching_session(trigger, state);
                if (ret) {
-                       goto error_free_client_list;
+                       goto error_put_client_list;
                }
                break;
        case LTTNG_OBJECT_TYPE_CHANNEL:
@@ -2140,7 +2234,7 @@ int handle_notification_thread_command_register_trigger(
                 */
                ret = bind_trigger_to_matching_channels(trigger, state);
                if (ret) {
-                       goto error_free_client_list;
+                       goto error_put_client_list;
                }
                break;
        case LTTNG_OBJECT_TYPE_NONE:
@@ -2148,7 +2242,7 @@ int handle_notification_thread_command_register_trigger(
        default:
                ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
                ret = -1;
-               goto error_free_client_list;
+               goto error_put_client_list;
        }
 
        /*
@@ -2176,13 +2270,15 @@ int handle_notification_thread_command_register_trigger(
         * current state. Otherwise, the next evaluation cycle may only see
         * that the evaluations remain the same (true for samples n-1 and n) and
         * the client will never know that the condition has been met.
+        *
+        * No need to lock the list here as it has not been published yet.
         */
        cds_list_for_each_entry_safe(client_list_element, tmp,
                        &client_list->list, node) {
                ret = evaluate_condition_for_client(trigger, condition,
                                client_list_element->client, state);
                if (ret) {
-                       goto error_free_client_list;
+                       goto error_put_client_list;
                }
        }
 
@@ -2190,17 +2286,14 @@ int handle_notification_thread_command_register_trigger(
         * Client list ownership transferred to the
         * notification_trigger_clients_ht.
         */
+       publish_notification_client_list(state, client_list);
        client_list = NULL;
 
        *cmd_result = LTTNG_OK;
-error_free_client_list:
-       if (client_list) {
-               cds_list_for_each_entry_safe(client_list_element, tmp,
-                               &client_list->list, node) {
-                       free(client_list_element);
-               }
-               free(client_list);
-       }
+
+error_put_client_list:
+       notification_client_list_put(client_list);
+
 error_free_ht_element:
        free(trigger_ht_element);
 error:
@@ -2211,13 +2304,6 @@ error:
        return ret;
 }
 
-static
-void free_notification_client_list_rcu(struct rcu_head *node)
-{
-       free(caa_container_of(node, struct notification_client_list,
-                       rcu_node));
-}
-
 static
 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
 {
@@ -2235,7 +2321,6 @@ int handle_notification_thread_command_unregister_trigger(
        struct cds_lfht_node *triggers_ht_node;
        struct lttng_channel_trigger_list *trigger_list;
        struct notification_client_list *client_list;
-       struct notification_client_list_element *client_list_element, *tmp;
        struct lttng_trigger_ht_element *trigger_ht_element = NULL;
        struct lttng_condition *condition = lttng_trigger_get_condition(
                        trigger);
@@ -2287,13 +2372,10 @@ int handle_notification_thread_command_unregister_trigger(
        client_list = get_client_list_from_condition(state, condition);
        assert(client_list);
 
-       cds_list_for_each_entry_safe(client_list_element, tmp,
-                       &client_list->list, node) {
-               free(client_list_element);
-       }
-       cds_lfht_del(state->notification_trigger_clients_ht,
-                       &client_list->notification_trigger_ht_node);
-       call_rcu(&client_list->rcu_node, free_notification_client_list_rcu);
+       /* Put new reference and the hashtable's reference. */
+       notification_client_list_put(client_list);
+       notification_client_list_put(client_list);
+       client_list = NULL;
 
        /* Remove trigger from triggers_ht. */
        trigger_ht_element = caa_container_of(triggers_ht_node,
@@ -2429,11 +2511,14 @@ end:
        return ret;
 }
 
+/* Client lock must be acquired by caller. */
 static
 int client_reset_inbound_state(struct notification_client *client)
 {
        int ret;
 
+       ASSERT_LOCKED(client->lock);
+
        ret = lttng_dynamic_buffer_set_size(
                        &client->communication.inbound.buffer, 0);
        assert(!ret);
@@ -2464,12 +2549,16 @@ int handle_notification_thread_client_connect(
                ret = -1;
                goto error;
        }
+       pthread_mutex_init(&client->lock, NULL);
        client->id = state->next_notification_client_id++;
        CDS_INIT_LIST_HEAD(&client->condition_list);
        lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
        lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
        client->communication.inbound.expect_creds = true;
+
+       pthread_mutex_lock(&client->lock);
        ret = client_reset_inbound_state(client);
+       pthread_mutex_unlock(&client->lock);
        if (ret) {
                ERR("[notification-thread] Failed to reset client communication's inbound state");
                ret = 0;
@@ -2524,9 +2613,44 @@ error:
        return ret;
 }
 
-int handle_notification_thread_client_disconnect(
-               int client_socket,
+/* RCU read-lock must be held by the caller. */
+/* Client lock must be held by the caller */
+static
+int notification_thread_client_disconnect(
+               struct notification_client *client,
                struct notification_thread_state *state)
+{
+       int ret;
+       struct lttng_condition_list_element *condition_list_element, *tmp;
+
+       /* Acquire the client lock to disable its communication atomically. */
+       client->communication.active = false;
+       ret = lttng_poll_del(&state->events, client->socket);
+       if (ret) {
+               ERR("[notification-thread] Failed to remove client socket %d from poll set",
+                               client->socket);
+       }
+
+       cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
+       cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
+
+       /* Release all conditions to which the client was subscribed. */
+       cds_list_for_each_entry_safe(condition_list_element, tmp,
+                       &client->condition_list, node) {
+               (void) notification_thread_client_unsubscribe(client,
+                               condition_list_element->condition, state, NULL);
+       }
+
+       /*
+        * Client no longer accessible to other threads (through the
+        * client lists).
+        */
+       notification_client_destroy(client, state);
+       return ret;
+}
+
+int handle_notification_thread_client_disconnect(
+               int client_socket, struct notification_thread_state *state)
 {
        int ret = 0;
        struct notification_client *client;
@@ -2543,15 +2667,9 @@ int handle_notification_thread_client_disconnect(
                goto end;
        }
 
-       ret = lttng_poll_del(&state->events, client_socket);
-       if (ret) {
-               ERR("[notification-thread] Failed to remove client socket from poll set");
-       }
-       cds_lfht_del(state->client_socket_ht,
-                       &client->client_socket_ht_node);
-        cds_lfht_del(state->client_id_ht,
-                       &client->client_id_ht_node);
-       notification_client_destroy(client, state);
+       pthread_mutex_lock(&client->lock);
+       ret = notification_thread_client_disconnect(client, state);
+       pthread_mutex_unlock(&client->lock);
 end:
        rcu_read_unlock();
        return ret;
@@ -2567,11 +2685,13 @@ int handle_notification_thread_client_disconnect_all(
        rcu_read_lock();
        DBG("[notification-thread] Closing all client connections");
        cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
-               client_socket_ht_node) {
+                       client_socket_ht_node) {
                int ret;
 
-               ret = handle_notification_thread_client_disconnect(
-                               client->socket, state);
+               pthread_mutex_lock(&client->lock);
+               ret = notification_thread_client_disconnect(
+                               client, state);
+               pthread_mutex_unlock(&client->lock);
                if (ret) {
                        error_encoutered = true;
                }
@@ -2600,6 +2720,7 @@ int handle_notification_thread_trigger_unregister_all(
        return error_occurred ? -1 : 0;
 }
 
+/* Client lock must be acquired by caller. */
 static
 int client_flush_outgoing_queue(struct notification_client *client,
                struct notification_thread_state *state)
@@ -2607,6 +2728,8 @@ int client_flush_outgoing_queue(struct notification_client *client,
        ssize_t ret;
        size_t to_send_count;
 
+       ASSERT_LOCKED(client->lock);
+
        assert(client->communication.outbound.buffer.size != 0);
        to_send_count = client->communication.outbound.buffer.size;
        DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
@@ -2644,8 +2767,7 @@ int client_flush_outgoing_queue(struct notification_client *client,
                /* Generic error, disconnect the client. */
                ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
                                client->socket);
-               ret = handle_notification_thread_client_disconnect(
-                               client->socket, state);
+               ret = notification_thread_client_disconnect(client, state);
                if (ret) {
                        goto error;
                }
@@ -2671,6 +2793,7 @@ error:
        return -1;
 }
 
+/* Client lock must be acquired by caller. */
 static
 int client_send_command_reply(struct notification_client *client,
                struct notification_thread_state *state,
@@ -2686,6 +2809,8 @@ int client_send_command_reply(struct notification_client *client,
        };
        char buffer[sizeof(msg) + sizeof(reply)];
 
+       ASSERT_LOCKED(client->lock);
+
        if (client->communication.outbound.queued_command_reply) {
                /* Protocol error. */
                goto error;
@@ -2718,6 +2843,194 @@ error:
        return -1;
 }
 
+static
+int client_handle_message_unknown(struct notification_client *client,
+               struct notification_thread_state *state)
+{
+       int ret;
+
+       pthread_mutex_lock(&client->lock);
+
+       /*
+        * Receiving message header. The function will be called again
+        * once the rest of the message as been received and can be
+        * interpreted.
+        */
+       const struct lttng_notification_channel_message *msg;
+
+       assert(sizeof(*msg) == client->communication.inbound.buffer.size);
+       msg = (const struct lttng_notification_channel_message *)
+                             client->communication.inbound.buffer.data;
+
+       if (msg->size == 0 ||
+                       msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
+               ERR("[notification-thread] Invalid notification channel message: length = %u",
+                               msg->size);
+               ret = -1;
+               goto end;
+       }
+
+       switch (msg->type) {
+       case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
+       case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
+       case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
+               break;
+       default:
+               ret = -1;
+               ERR("[notification-thread] Invalid notification channel message: unexpected message type");
+               goto end;
+       }
+
+       client->communication.inbound.bytes_to_receive = msg->size;
+       client->communication.inbound.msg_type =
+                       (enum lttng_notification_channel_message_type) msg->type;
+       ret = lttng_dynamic_buffer_set_size(
+                       &client->communication.inbound.buffer, msg->size);
+end:
+       pthread_mutex_unlock(&client->lock);
+       return ret;
+}
+
+static
+int client_handle_message_handshake(struct notification_client *client,
+               struct notification_thread_state *state)
+{
+       int ret;
+       struct lttng_notification_channel_command_handshake *handshake_client;
+       const struct lttng_notification_channel_command_handshake handshake_reply = {
+                       .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
+                       .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
+       };
+       const struct lttng_notification_channel_message msg_header = {
+                       .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
+                       .size = sizeof(handshake_reply),
+       };
+       enum lttng_notification_channel_status status =
+                       LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
+       char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
+
+       pthread_mutex_lock(&client->lock);
+
+       memcpy(send_buffer, &msg_header, sizeof(msg_header));
+       memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
+                       sizeof(handshake_reply));
+
+       handshake_client =
+                       (struct lttng_notification_channel_command_handshake *)
+                                       client->communication.inbound.buffer
+                                                       .data;
+       client->major = handshake_client->major;
+       client->minor = handshake_client->minor;
+       if (!client->communication.inbound.creds_received) {
+               ERR("[notification-thread] No credentials received from client");
+               ret = -1;
+               goto end;
+       }
+
+       client->uid = LTTNG_SOCK_GET_UID_CRED(
+                       &client->communication.inbound.creds);
+       client->gid = LTTNG_SOCK_GET_GID_CRED(
+                       &client->communication.inbound.creds);
+       DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
+                       client->uid, client->gid, (int) client->major,
+                       (int) client->minor);
+
+       if (handshake_client->major !=
+                       LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
+       }
+
+       ret = lttng_dynamic_buffer_append(
+                       &client->communication.outbound.buffer, send_buffer,
+                       sizeof(send_buffer));
+       if (ret) {
+               ERR("[notification-thread] Failed to send protocol version to notification channel client");
+               goto end;
+       }
+
+       client->validated = true;
+       client->communication.active = true;
+
+       ret = client_flush_outgoing_queue(client, state);
+       if (ret) {
+               goto end;
+       }
+
+       ret = client_send_command_reply(client, state, status);
+       if (ret) {
+               ERR("[notification-thread] Failed to send reply to notification channel client");
+               goto end;
+       }
+
+       /* Set reception state to receive the next message header. */
+       ret = client_reset_inbound_state(client);
+       if (ret) {
+               ERR("[notification-thread] Failed to reset client communication's inbound state");
+               goto end;
+       }
+
+end:
+       pthread_mutex_unlock(&client->lock);
+       return ret;
+}
+
+static
+int client_handle_message_subscription(
+               struct notification_client *client,
+               enum lttng_notification_channel_message_type msg_type,
+               struct notification_thread_state *state)
+{
+       int ret;
+       struct lttng_condition *condition;
+       enum lttng_notification_channel_status status =
+                       LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
+       struct lttng_payload_view condition_view =
+                       lttng_payload_view_from_dynamic_buffer(
+                                       &client->communication.inbound.buffer,
+                                       0, -1);
+       size_t expected_condition_size;
+
+       pthread_mutex_lock(&client->lock);
+       expected_condition_size = client->communication.inbound.buffer.size;
+       pthread_mutex_unlock(&client->lock);
+
+       ret = lttng_condition_create_from_payload(&condition_view, &condition);
+       if (ret != expected_condition_size) {
+               ERR("[notification-thread] Malformed condition received from client");
+               goto end;
+       }
+
+       if (msg_type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
+               ret = notification_thread_client_subscribe(
+                               client, condition, state, &status);
+       } else {
+               ret = notification_thread_client_unsubscribe(
+                               client, condition, state, &status);
+       }
+       if (ret) {
+               goto end;
+       }
+
+       pthread_mutex_lock(&client->lock);
+       ret = client_send_command_reply(client, state, status);
+       if (ret) {
+               ERR("[notification-thread] Failed to send reply to notification channel client");
+               goto end_unlock;
+       }
+
+       /* Set reception state to receive the next message header. */
+       ret = client_reset_inbound_state(client);
+       if (ret) {
+               ERR("[notification-thread] Failed to reset client communication's inbound state");
+               goto end_unlock;
+       }
+
+end_unlock:
+       pthread_mutex_unlock(&client->lock);
+end:
+       return ret;
+}
+
 static
 int client_dispatch_message(struct notification_client *client,
                struct notification_thread_state *state)
@@ -2737,159 +3050,19 @@ int client_dispatch_message(struct notification_client *client,
        switch (client->communication.inbound.msg_type) {
        case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
        {
-               /*
-                * Receiving message header. The function will be called again
-                * once the rest of the message as been received and can be
-                * interpreted.
-                */
-               const struct lttng_notification_channel_message *msg;
-
-               assert(sizeof(*msg) ==
-                               client->communication.inbound.buffer.size);
-               msg = (const struct lttng_notification_channel_message *)
-                               client->communication.inbound.buffer.data;
-
-               if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
-                       ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
-                       ret = -1;
-                       goto end;
-               }
-
-               switch (msg->type) {
-               case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
-               case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
-               case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
-                       break;
-               default:
-                       ret = -1;
-                       ERR("[notification-thread] Invalid notification channel message: unexpected message type");
-                       goto end;
-               }
-
-               client->communication.inbound.bytes_to_receive = msg->size;
-               client->communication.inbound.msg_type =
-                               (enum lttng_notification_channel_message_type) msg->type;
-               ret = lttng_dynamic_buffer_set_size(
-                               &client->communication.inbound.buffer, msg->size);
-               if (ret) {
-                       goto end;
-               }
+               ret = client_handle_message_unknown(client, state);
                break;
        }
        case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
        {
-               struct lttng_notification_channel_command_handshake *handshake_client;
-               struct lttng_notification_channel_command_handshake handshake_reply = {
-                       .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
-                       .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
-               };
-               struct lttng_notification_channel_message msg_header = {
-                       .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
-                       .size = sizeof(handshake_reply),
-               };
-               enum lttng_notification_channel_status status =
-                               LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
-               char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
-
-               memcpy(send_buffer, &msg_header, sizeof(msg_header));
-               memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
-                               sizeof(handshake_reply));
-
-               handshake_client =
-                               (struct lttng_notification_channel_command_handshake *)
-                                       client->communication.inbound.buffer.data;
-               client->major = handshake_client->major;
-               client->minor = handshake_client->minor;
-               if (!client->communication.inbound.creds_received) {
-                       ERR("[notification-thread] No credentials received from client");
-                       ret = -1;
-                       goto end;
-               }
-
-               client->uid = LTTNG_SOCK_GET_UID_CRED(
-                               &client->communication.inbound.creds);
-               client->gid = LTTNG_SOCK_GET_GID_CRED(
-                               &client->communication.inbound.creds);
-               DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
-                               client->uid, client->gid, (int) client->major,
-                               (int) client->minor);
-
-               if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
-                       status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
-               }
-
-               ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
-                               send_buffer, sizeof(send_buffer));
-               if (ret) {
-                       ERR("[notification-thread] Failed to send protocol version to notification channel client");
-                       goto end;
-               }
-
-               ret = client_flush_outgoing_queue(client, state);
-               if (ret) {
-                       goto end;
-               }
-
-               ret = client_send_command_reply(client, state, status);
-               if (ret) {
-                       ERR("[notification-thread] Failed to send reply to notification channel client");
-                       goto end;
-               }
-
-               /* Set reception state to receive the next message header. */
-               ret = client_reset_inbound_state(client);
-               if (ret) {
-                       ERR("[notification-thread] Failed to reset client communication's inbound state");
-                       goto end;
-               }
-               client->validated = true;
-               client->communication.active = true;
+               ret = client_handle_message_handshake(client, state);
                break;
        }
        case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
        case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
        {
-               struct lttng_condition *condition;
-               enum lttng_notification_channel_status status =
-                               LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
-               struct lttng_payload_view condition_view =
-                               lttng_payload_view_from_dynamic_buffer(
-                                       &client->communication.inbound.buffer,
-                                       0, -1);
-               size_t expected_condition_size =
-                               client->communication.inbound.buffer.size;
-
-               ret = lttng_condition_create_from_payload(&condition_view,
-                               &condition);
-               if (ret != expected_condition_size) {
-                       ERR("[notification-thread] Malformed condition received from client");
-                       goto end;
-               }
-
-               if (client->communication.inbound.msg_type ==
-                               LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
-                       ret = notification_thread_client_subscribe(client,
-                                       condition, state, &status);
-               } else {
-                       ret = notification_thread_client_unsubscribe(client,
-                                       condition, state, &status);
-               }
-               if (ret) {
-                       goto end;
-               }
-
-               ret = client_send_command_reply(client, state, status);
-               if (ret) {
-                       ERR("[notification-thread] Failed to send reply to notification channel client");
-                       goto end;
-               }
-
-               /* Set reception state to receive the next message header. */
-               ret = client_reset_inbound_state(client);
-               if (ret) {
-                       ERR("[notification-thread] Failed to reset client communication's inbound state");
-                       goto end;
-               }
+               ret = client_handle_message_subscription(client,
+                               client->communication.inbound.msg_type, state);
                break;
        }
        default:
@@ -2907,6 +3080,7 @@ int handle_notification_thread_client_in(
        struct notification_client *client;
        ssize_t recv_ret;
        size_t offset;
+       bool message_is_complete = false;
 
        client = get_client_from_socket(socket, state);
        if (!client) {
@@ -2915,6 +3089,7 @@ int handle_notification_thread_client_in(
                goto end;
        }
 
+       pthread_mutex_lock(&client->lock);
        offset = client->communication.inbound.buffer.size -
                        client->communication.inbound.bytes_to_receive;
        if (client->communication.inbound.expect_creds) {
@@ -2931,12 +3106,17 @@ int handle_notification_thread_client_in(
                                client->communication.inbound.buffer.data + offset,
                                client->communication.inbound.bytes_to_receive);
        }
+       if (recv_ret >= 0) {
+               client->communication.inbound.bytes_to_receive -= recv_ret;
+               message_is_complete = client->communication.inbound
+                                                     .bytes_to_receive == 0;
+       }
+       pthread_mutex_unlock(&client->lock);
        if (recv_ret < 0) {
                goto error_disconnect_client;
        }
 
-       client->communication.inbound.bytes_to_receive -= recv_ret;
-       if (client->communication.inbound.bytes_to_receive == 0) {
+       if (message_is_complete) {
                ret = client_dispatch_message(client, state);
                if (ret) {
                        /*
@@ -2945,13 +3125,13 @@ int handle_notification_thread_client_in(
                         */
                        goto error_disconnect_client;
                }
-       } else {
-               goto end;
        }
 end:
        return ret;
 error_disconnect_client:
-       ret = handle_notification_thread_client_disconnect(socket, state);
+       pthread_mutex_lock(&client->lock);
+       ret = notification_thread_client_disconnect(client, state);
+       pthread_mutex_unlock(&client->lock);
        return ret;
 }
 
@@ -2969,7 +3149,9 @@ int handle_notification_thread_client_out(
                goto end;
        }
 
+       pthread_mutex_lock(&client->lock);
        ret = client_flush_outgoing_queue(client, state);
+       pthread_mutex_unlock(&client->lock);
        if (ret) {
                goto end;
        }
@@ -3148,6 +3330,8 @@ int client_enqueue_dropped_notification(struct notification_client *client)
                .size = 0,
        };
 
+       ASSERT_LOCKED(client->lock);
+
        ret = lttng_dynamic_buffer_append(
                        &client->communication.outbound.buffer, &msg,
                        sizeof(msg));
@@ -3222,24 +3406,28 @@ int send_evaluation_to_clients(const struct lttng_trigger *trigger,
        }
 
        /* Update payload size. */
-       ((struct lttng_notification_channel_message * ) msg_payload.buffer.data)->size =
-                       (uint32_t) (msg_payload.buffer.size - sizeof(msg_header));
+       ((struct lttng_notification_channel_message *) msg_payload.buffer.data)
+                       ->size = (uint32_t)(
+                       msg_payload.buffer.size - sizeof(msg_header));
 
+       pthread_mutex_lock(&client_list->lock);
        cds_list_for_each_entry_safe(client_list_element, tmp,
                        &client_list->list, node) {
                struct notification_client *client =
                                client_list_element->client;
 
+               ret = 0;
+               pthread_mutex_lock(&client->lock);
                if (client->uid != object_uid && client->gid != object_gid &&
                                client->uid != 0) {
                        /* Client is not allowed to monitor this channel. */
                        DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
-                       continue;
+                       goto unlock_client;
                }
 
                if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) {
                        DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
-                       continue;
+                       goto unlock_client;
                }
 
                DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
@@ -3259,25 +3447,33 @@ int send_evaluation_to_clients(const struct lttng_trigger *trigger,
                                ret = client_enqueue_dropped_notification(
                                                client);
                                if (ret) {
-                                       goto end;
+                                       goto unlock_client;
                                }
                        }
-                       continue;
+                       goto unlock_client;
                }
 
                ret = lttng_dynamic_buffer_append_buffer(
                                &client->communication.outbound.buffer,
                                &msg_payload.buffer);
                if (ret) {
-                       goto end;
+                       goto unlock_client;
                }
 
                ret = client_flush_outgoing_queue(client, state);
                if (ret) {
-                       goto end;
+                       goto unlock_client;
+               }
+unlock_client:
+               pthread_mutex_unlock(&client->lock);
+               if (ret) {
+                       goto end_unlock_list;
                }
        }
        ret = 0;
+
+end_unlock_list:
+       pthread_mutex_unlock(&client_list->lock);
 end:
        lttng_payload_reset(&msg_payload);
        return ret;
@@ -3422,9 +3618,11 @@ int handle_notification_thread_channel_sample(
                const struct lttng_condition *condition;
                const struct lttng_action *action;
                const struct lttng_trigger *trigger;
-               struct notification_client_list *client_list;
+               struct notification_client_list *client_list = NULL;
                struct lttng_evaluation *evaluation = NULL;
+               bool client_list_is_empty;
 
+               ret = 0;
                trigger = trigger_list_element->trigger;
                condition = lttng_trigger_get_const_condition(trigger);
                assert(condition);
@@ -3440,12 +3638,13 @@ int handle_notification_thread_channel_sample(
                 */
                client_list = get_client_list_from_condition(state, condition);
                assert(client_list);
-               if (cds_list_empty(&client_list->list)) {
+               client_list_is_empty = cds_list_empty(&client_list->list);
+               if (client_list_is_empty) {
                        /*
                         * No clients interested in the evaluation's result,
                         * skip it.
                         */
-                       continue;
+                       goto put_list;
                }
 
                ret = evaluate_buffer_condition(condition, &evaluation, state,
@@ -3455,11 +3654,11 @@ int handle_notification_thread_channel_sample(
                                latest_session_consumed_total,
                                channel_info);
                if (caa_unlikely(ret)) {
-                       goto end_unlock;
+                       goto put_list;
                }
 
                if (caa_likely(!evaluation)) {
-                       continue;
+                       goto put_list;
                }
 
                /* Dispatch evaluation result to all clients. */
@@ -3468,8 +3667,10 @@ int handle_notification_thread_channel_sample(
                                channel_info->session_info->uid,
                                channel_info->session_info->gid);
                lttng_evaluation_destroy(evaluation);
+put_list:
+               notification_client_list_put(client_list);
                if (caa_unlikely(ret)) {
-                       goto end_unlock;
+                       break;
                }
        }
 end_unlock:
This page took 0.04365 seconds and 4 git commands to generate.