From: David Goulet Date: Wed, 15 Aug 2012 20:41:37 +0000 (-0400) Subject: Fix: multiple consumer locking problems X-Git-Tag: v2.1.0-rc1~16 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=3f8e211fbe73cbcf69d52af5e839b14d1a951ed7;p=lttng-tools.git Fix: multiple consumer locking problems First, a lot of rcu_read_unlock() were missing the consumer command handler which could make a rcu lock and return on error without unlocking. Fix goto error path in the consumer. Fix a missing lock control socket mutex. Fix memory leaks in a UST session where the consumers output object were not freed during a destroy command. Add a relayd sockets sent flag so we don't resend existing sockets to the consumer during an enable_consumer command. Signed-off-by: David Goulet --- diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index 68a51bb6c..c85529cfd 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -86,6 +86,9 @@ struct consumer_net { /* Data path for network streaming. */ struct lttng_uri data; + + /* Flag if network sockets were sent to the consumer. */ + unsigned int relayd_socks_sent; }; /* @@ -98,8 +101,8 @@ struct consumer_output { /* * The net_seq_index is the index of the network stream on the consumer - * side. It's basically the relayd socket file descriptor value so the - * consumer can identify which streams goes with which socket. + * side. It tells the consumer which streams goes to which relayd with this + * index. The relayd sockets are index with it on the consumer side. */ int net_seq_index; diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index 1e9c0be94..92bf85bc6 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -448,8 +448,8 @@ static void teardown_ust_session(struct ltt_session *session) * lock. This means that there CAN NOT be stream(s) being sent to a * consumer since this action also requires the session lock at any time. * - * At this point, we are sure that not streams data will be lost after this - * command is issued. + * At this point, we are sure that no data will be lost after this command + * is issued. */ if (usess->consumer && usess->consumer->type == CONSUMER_DST_NET) { cds_lfht_for_each_entry(usess->consumer->socks->ht, &iter.iter, socket, @@ -2097,6 +2097,15 @@ static int send_sockets_relayd_consumer(int domain, { int ret; + assert(session); + assert(consumer); + + /* Don't resend the sockets to the consumer. */ + if (consumer->dst.net.relayd_socks_sent) { + ret = LTTCOMM_OK; + goto error; + } + /* Sending control relayd socket. */ ret = send_socket_relayd_consumer(domain, session, &consumer->dst.net.control, consumer, fd); @@ -2111,6 +2120,9 @@ static int send_sockets_relayd_consumer(int domain, goto error; } + /* Flag that all relayd sockets were sent to the consumer. */ + consumer->dst.net.relayd_socks_sent = 1; + error: return ret; } @@ -3647,7 +3659,9 @@ skip_consumer: session_error: session_destroy(session); error: + rcu_read_lock(); consumer_destroy_output(consumer); + rcu_read_unlock(); consumer_error: return ret; } @@ -3949,7 +3963,6 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, case LTTNG_DOMAIN_KERNEL: /* Code flow error if we don't have a kernel session here. */ assert(ksess); - assert(ksess->consumer); /* Create consumer output if none exists */ consumer = ksess->tmp_consumer; @@ -3990,8 +4003,12 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, goto error; } - /* Don't send relayd socket if URI is NOT remote */ - if (uris[i].dtype == LTTNG_DST_PATH) { + /* + * Don't send relayd socket if URI is NOT remote or if the relayd + * sockets for the session are already sent. + */ + if (uris[i].dtype == LTTNG_DST_PATH || + consumer->dst.net.relayd_socks_sent) { continue; } @@ -4180,7 +4197,9 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) * session without this lock hence freeing the consumer output object * is valid. */ + rcu_read_lock(); consumer_destroy_output(ksess->consumer); + rcu_read_unlock(); ksess->consumer = consumer; ksess->tmp_consumer = NULL; @@ -4263,7 +4282,9 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) * session without this lock hence freeing the consumer output object * is valid. */ + rcu_read_lock(); consumer_destroy_output(usess->consumer); + rcu_read_unlock(); usess->consumer = consumer; usess->tmp_consumer = NULL; diff --git a/src/bin/lttng-sessiond/trace-ust.c b/src/bin/lttng-sessiond/trace-ust.c index d1e8b8dae..3e382b7a5 100644 --- a/src/bin/lttng-sessiond/trace-ust.c +++ b/src/bin/lttng-sessiond/trace-ust.c @@ -578,6 +578,9 @@ void trace_ust_destroy_session(struct ltt_ust_session *session) destroy_domain_pid(session->domain_pid); destroy_domain_exec(session->domain_exec); + consumer_destroy_output(session->consumer); + consumer_destroy_output(session->tmp_consumer); + free(session); rcu_read_unlock(); diff --git a/src/common/consumer.c b/src/common/consumer.c index deebd2e2b..c1dadddb3 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -274,9 +274,12 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) uatomic_dec(&relayd->refcount); assert(uatomic_read(&relayd->refcount) >= 0); + /* Closing streams requires to lock the control socket. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_send_close_stream(&relayd->control_sock, stream->relayd_stream_id, stream->next_net_seq_num - 1); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { DBG("Unable to close stream on the relayd. Continuing"); /* @@ -438,8 +441,10 @@ end: } /* - * Add relayd socket to global consumer data hashtable. + * Add relayd socket to global consumer data hashtable. RCU read side lock MUST + * be acquired before calling this. */ + int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd) { int ret = 0; @@ -451,20 +456,15 @@ int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd) goto end; } - rcu_read_lock(); - lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) relayd->net_seq_idx), &iter); node = lttng_ht_iter_get_node_ulong(&iter); if (node != NULL) { - rcu_read_unlock(); /* Relayd already exist. Ignore the insertion */ goto end; } lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node); - rcu_read_unlock(); - end: return ret; } diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 8c2bee333..f5eb35c31 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -123,6 +123,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Poll on consumer socket. */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + rcu_read_unlock(); return -EINTR; } @@ -214,6 +215,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + rcu_read_unlock(); return -EINTR; } @@ -221,6 +223,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); if (ret != sizeof(fd)) { lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + rcu_read_unlock(); return ret; } @@ -237,7 +240,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.metadata_flag); if (new_stream == NULL) { lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); - goto end; + goto end_nosignal; } /* The stream is not metadata. Get relayd reference if exists. */ @@ -250,13 +253,13 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, &new_stream->relayd_stream_id); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { - goto end; + goto end_nosignal; } } else if (msg.u.stream.net_index != -1) { ERR("Network sequence index %d unknown. Not adding stream.", msg.u.stream.net_index); free(new_stream); - goto end; + goto end_nosignal; } if (ctx->on_recv_stream != NULL) { @@ -264,7 +267,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (ret == 0) { consumer_add_stream(new_stream); } else if (ret < 0) { - goto end; + goto end_nosignal; } } else { consumer_add_stream(new_stream); @@ -275,30 +278,43 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_UPDATE_STREAM: { - if (ctx->on_update_stream != NULL) { - ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state); - if (ret == 0) { - consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state); - } else if (ret < 0) { - goto end; - } - } else { - consumer_change_stream_state(msg.u.stream.stream_key, - msg.u.stream.state); + rcu_read_unlock(); + return -ENOSYS; + } + case LTTNG_CONSUMER_DESTROY_RELAYD: + { + struct consumer_relayd_sock_pair *relayd; + + DBG("Kernel consumer destroying relayd %zu", + msg.u.destroy_relayd.net_seq_idx); + + /* Get relayd reference if exists. */ + relayd = consumer_find_relayd(msg.u.destroy_relayd.net_seq_idx); + if (relayd == NULL) { + ERR("Unable to find relayd %zu", msg.u.destroy_relayd.net_seq_idx); + goto end_nosignal; } - break; + + /* Set destroy flag for this object */ + uatomic_set(&relayd->destroy_flag, 1); + + /* Destroy the relayd if refcount is 0 else set the destroy flag. */ + if (uatomic_read(&relayd->refcount) == 0) { + consumer_destroy_relayd(relayd); + } + goto end_nosignal; } default: - break; + goto end_nosignal; } -end: + /* - * Wake-up the other end by writing a null byte in the pipe - * (non-blocking). Important note: Because writing into the - * pipe is non-blocking (and therefore we allow dropping wakeup - * data, as long as there is wakeup data present in the pipe - * buffer to wake up the other end), the other end should - * perform the following sequence for waiting: + * Wake-up the other end by writing a null byte in the pipe (non-blocking). + * Important note: Because writing into the pipe is non-blocking (and + * therefore we allow dropping wakeup data, as long as there is wakeup data + * present in the pipe buffer to wake up the other end), the other end + * should perform the following sequence for waiting: + * * 1) empty the pipe (reads). * 2) perform update operation. * 3) wait on the pipe (poll). diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index c92d59d02..486ca2634 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -108,7 +108,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return -ENOENT; } - /* relayd need RCU read-side lock */ + /* relayd needs RCU read-side lock */ rcu_read_lock(); switch (msg.cmd_type) { @@ -133,6 +133,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Poll on consumer socket. */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + rcu_read_unlock(); return -EINTR; } @@ -200,11 +201,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + rcu_read_unlock(); return -EINTR; } ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd); if (ret != sizeof(fds)) { lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + rcu_read_unlock(); return ret; } @@ -241,11 +244,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + rcu_read_unlock(); return -EINTR; } ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd); if (ret != sizeof(fds)) { lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + rcu_read_unlock(); return ret; } @@ -267,7 +272,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.metadata_flag); if (new_stream == NULL) { lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); - goto end; + goto end_nosignal; } /* The stream is not metadata. Get relayd reference if exists. */ @@ -280,13 +285,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, &new_stream->relayd_stream_id); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { - goto end; + goto end_nosignal; } } else if (msg.u.stream.net_index != -1) { ERR("Network sequence index %d unknown. Not adding stream.", msg.u.stream.net_index); free(new_stream); - goto end; + goto end_nosignal; } if (ctx->on_recv_stream != NULL) { @@ -294,7 +299,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (ret == 0) { consumer_add_stream(new_stream); } else if (ret < 0) { - goto end; + goto end_nosignal; } } else { consumer_add_stream(new_stream); @@ -315,8 +320,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Get relayd reference if exists. */ relayd = consumer_find_relayd(msg.u.destroy_relayd.net_seq_idx); if (relayd == NULL) { - ERR("Unable to find relayd %zu", - msg.u.destroy_relayd.net_seq_idx); + ERR("Unable to find relayd %zu", msg.u.destroy_relayd.net_seq_idx); + goto end_nosignal; } /* Set destroy flag for this object */ @@ -326,10 +331,11 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (uatomic_read(&relayd->refcount) == 0) { consumer_destroy_relayd(relayd); } - break; + goto end_nosignal; } case LTTNG_CONSUMER_UPDATE_STREAM: { + rcu_read_unlock(); return -ENOSYS; #if 0 if (ctx->on_update_stream != NULL) { @@ -343,20 +349,20 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state); } -#endif break; +#endif } default: break; } -end: + /* - * Wake-up the other end by writing a null byte in the pipe - * (non-blocking). Important note: Because writing into the - * pipe is non-blocking (and therefore we allow dropping wakeup - * data, as long as there is wakeup data present in the pipe - * buffer to wake up the other end), the other end should - * perform the following sequence for waiting: + * Wake-up the other end by writing a null byte in the pipe (non-blocking). + * Important note: Because writing into the pipe is non-blocking (and + * therefore we allow dropping wakeup data, as long as there is wakeup data + * present in the pipe buffer to wake up the other end), the other end + * should perform the following sequence for waiting: + * * 1) empty the pipe (reads). * 2) perform update operation. * 3) wait on the pipe (poll).