Fix: perform relayd socket pair cleanup on control socket error
authorJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Wed, 6 Jun 2018 01:00:28 +0000 (21:00 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 13 Sep 2018 21:12:54 +0000 (17:12 -0400)
A reference to the local context for the socket pair is used to "force" an
evaluation of the data and metadata streams since we changed the endpoint
status. This imitates what is currently done for the data socket.

This prevents hitting network timeouts multiple times in a row when an
error occurs. For now, there is no mechanism for retry hence
"terminating" all communication make sense and prevent unwanted delays
on operation.

Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/common/consumer/consumer-stream.c
src/common/consumer/consumer.c
src/common/consumer/consumer.h

index d0b1ddef2259fc57342cac15ef5003f19fc03e6f..ca2c4536cb78f99758ba1a8ec2e2b81fead12f4f 100644 (file)
@@ -73,12 +73,8 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
                        stream->next_net_seq_num - 1);
        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        if (ret < 0) {
-               DBG("Unable to close stream on the relayd. Continuing");
-               /*
-                * Continue here. There is nothing we can do for the relayd.
-                * Chances are that the relayd has closed the socket so we just
-                * continue cleaning up.
-                */
+               ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
        }
 
        /* Both conditions are met, we destroy the relayd. */
@@ -371,6 +367,15 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream,
                        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                        ret = relayd_send_index(&relayd->control_sock, element,
                                stream->relayd_stream_id, stream->next_net_seq_num - 1);
+                       if (ret < 0) {
+                               /*
+                                * Communication error with lttng-relayd,
+                                * perform cleanup now
+                                */
+                               ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
+                               lttng_consumer_cleanup_relayd(relayd);
+                               ret = -1;
+                       }
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                } else {
                        ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.",
index e0d6ea496ade52af826cf02b0fc9fe0054b64895..63a2ab288c865e338582485199a7a268064bb4af 100644 (file)
@@ -465,14 +465,13 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
  * If a local data context is available, notify the threads that the streams'
  * state have changed.
  */
-static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
-               struct lttng_consumer_local_data *ctx)
+void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
 {
        uint64_t netidx;
 
        assert(relayd);
 
-       DBG("Cleaning up relayd sockets");
+       DBG("Cleaning up relayd object ID %"PRIu64, relayd->net_seq_idx);
 
        /* Save the net sequence index before destroying the object */
        netidx = relayd->net_seq_idx;
@@ -492,10 +491,8 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
         * memory barrier ordering the updates of the end point status from the
         * read of this status which happens AFTER receiving this notify.
         */
-       if (ctx) {
-               notify_thread_lttng_pipe(ctx->consumer_data_pipe);
-               notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
-       }
+       notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
+       notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
 }
 
 /*
@@ -813,6 +810,8 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
                                stream->trace_archive_id);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
+                       ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
                        goto end;
                }
 
@@ -854,6 +853,8 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
                ret = relayd_streams_sent(&relayd->control_sock);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
+                       ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
                        goto end;
                }
        } else {
@@ -1721,7 +1722,8 @@ write_error:
         * cleanup the relayd object and all associated streams.
         */
        if (relayd && relayd_hang_up) {
-               cleanup_relayd(relayd, ctx);
+               ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
        }
 
 end:
@@ -1947,7 +1949,8 @@ write_error:
         * cleanup the relayd object and all associated streams.
         */
        if (relayd && relayd_hang_up) {
-               cleanup_relayd(relayd, ctx);
+               ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
                /* Skip splice error so the consumer does not fail */
                goto end;
        }
@@ -3646,6 +3649,7 @@ error:
         * Add relayd socket pair to consumer data hashtable. If object already
         * exists or on error, the function gracefully returns.
         */
+       relayd->ctx = ctx;
        add_relayd(relayd);
 
        /* All good! */
@@ -3772,6 +3776,8 @@ int consumer_data_pending(uint64_t id)
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
                        /* Communication error thus the relayd so no data pending. */
+                       ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
                        goto data_not_pending;
                }
        }
@@ -3813,6 +3819,13 @@ int consumer_data_pending(uint64_t id)
                                                stream->relayd_stream_id,
                                                stream->next_net_seq_num - 1);
                        }
+                       if (ret < 0) {
+                               ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                               lttng_consumer_cleanup_relayd(relayd);
+                               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+                               pthread_mutex_unlock(&stream->lock);
+                               goto data_not_pending;
+                       }
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        if (ret == 1) {
                                pthread_mutex_unlock(&stream->lock);
@@ -3831,6 +3844,8 @@ int consumer_data_pending(uint64_t id)
                                relayd->relayd_session_id, &is_data_inflight);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
+                       ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
                        goto data_not_pending;
                }
                if (is_data_inflight) {
index 190de54ed79f0c91e0e7abe6f5d9f864ed9795fc..4cc40a431ee7ba86b0a54913aa73b43bdc3216f4 100644 (file)
@@ -512,6 +512,7 @@ struct consumer_relayd_sock_pair {
        /* Session id on both sides for the sockets. */
        uint64_t relayd_session_id;
        uint64_t sessiond_session_id;
+       struct lttng_consumer_local_data *ctx;
 };
 
 /*
@@ -840,5 +841,6 @@ int lttng_consumer_rotate_pending_relay( uint64_t session_id,
 void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream);
 int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid,
                uint64_t relayd_id);
+void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd);
 
 #endif /* LIB_CONSUMER_H */
This page took 0.030182 seconds and 4 git commands to generate.