Relay rotate pending command
authorJulien Desfossez <jdesfossez@efficios.com>
Mon, 18 Dec 2017 21:04:44 +0000 (16:04 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Wed, 21 Mar 2018 15:25:11 +0000 (11:25 -0400)
When a session rotation completes and the session is configured to send
its traces to a relay, we have to poll the relay to know when all the
chunk's data are written on its disk. To do that, we define a timer in
the sessiond and arm it when the rotation is complete. When the rotation
is complete on the relay, we clear the "rotate_pending" flag in the
session and the client can access the chunk safely.

Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
20 files changed:
configure.ac
src/bin/lttng-relayd/main.c
src/bin/lttng-sessiond/cmd.c
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/rotate.c
src/bin/lttng-sessiond/rotate.h
src/bin/lttng-sessiond/rotation-thread.c
src/bin/lttng-sessiond/session.c
src/bin/lttng-sessiond/session.h
src/bin/lttng-sessiond/sessiond-timer.c
src/bin/lttng-sessiond/sessiond-timer.h
src/common/consumer/consumer.c
src/common/defaults.h
src/common/kernel-consumer/kernel-consumer.c
src/common/relayd/relayd.c
src/common/relayd/relayd.h
src/common/sessiond-comm/relayd.h
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c

index 972a73e8b5c79b7419685a1377b4452603848400..71ec278036b1a5b23fbc25fc827a478e8eb927e5 100644 (file)
@@ -365,6 +365,7 @@ _AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_AGENT_BIND_ADDRESS], [localhost])
 _AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_NETWORK_CONTROL_BIND_ADDRESS], [0.0.0.0])
 _AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_NETWORK_DATA_BIND_ADDRESS], [0.0.0.0])
 _AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_NETWORK_VIEWER_BIND_ADDRESS], [localhost])
+_AC_DEFINE_AND_SUBST([DEFAULT_ROTATE_PENDING_RELAY_TIMER], [200000])
 
 # Command short descriptions
 _AC_DEFINE_QUOTED_AND_SUBST([CMD_DESCR_ADD_CONTEXT], [Add context fields to a channel])
index 45b7de8c657f9b43a426057fc07f7a299689dc75..f68ecdfbcbfe6ab9248c8603676a6a811e59d9aa 100644 (file)
@@ -2803,7 +2803,7 @@ int relay_rotate_pending(struct lttcomm_relayd_hdr *recv_hdr,
 {
        struct relay_session *session = conn->session;
        struct lttcomm_relayd_rotate_pending msg;
-       struct lttcomm_relayd_generic_reply reply;
+       struct lttcomm_relayd_rotate_pending_reply reply;
        struct lttng_ht_iter iter;
        struct relay_stream *stream;
        int ret = 0;
@@ -2883,7 +2883,8 @@ int relay_rotate_pending(struct lttcomm_relayd_hdr *recv_hdr,
 send_reply:
        rcu_read_unlock();
        memset(&reply, 0, sizeof(reply));
-       reply.ret_code = htobe32(rotate_pending ? 1 : 0);
+       reply.generic.ret_code = htobe32((uint32_t) LTTNG_OK);
+       reply.is_pending = (uint8_t) !!rotate_pending;
        network_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
                        sizeof(reply), 0);
        if (network_ret < (ssize_t) sizeof(reply)) {
index 606811e18b791784660b5145d9042b675f38dc31..a2af4c38aa40a9a040ecb5e911408367d9ef2213 100644 (file)
@@ -2651,6 +2651,10 @@ int cmd_stop_trace(struct ltt_session *session)
                goto error;
        }
 
+       if (session->rotate_relay_pending_timer_enabled) {
+               sessiond_timer_rotate_pending_stop(session);
+       }
+
        if (session->rotate_count > 0 && !session->rotate_pending) {
                ret = rename_active_chunk(session);
                if (ret) {
@@ -2949,6 +2953,10 @@ int cmd_destroy_session(struct ltt_session *session, int wpipe)
 
        DBG("Begin destroy session %s (id %" PRIu64 ")", session->name, session->id);
 
+       if (session->rotate_relay_pending_timer_enabled) {
+               sessiond_timer_rotate_pending_stop(session);
+       }
+
        /*
         * The rename of the current chunk is performed at stop, but if we rotated
         * the session after the previous stop command, we need to rename the
index a226b57268ce008fdbabbcf34355e70f4966f775..f89bb1df290fbc8d5f6682e1f1089448974ca40e 100644 (file)
@@ -1735,6 +1735,51 @@ error:
        return ret;
 }
 
+/*
+ * Ask the relay if a rotation is still pending. Must be called with the socket
+ * lock held.
+ *
+ * Return 1 if the rotation is still pending, 0 if finished, a negative value
+ * on error.
+ */
+int consumer_rotate_pending_relay(struct consumer_socket *socket,
+               struct consumer_output *output, uint64_t session_id,
+               uint64_t chunk_id)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+       uint32_t pending = 0;
+
+       assert(socket);
+
+       DBG("Consumer rotate pending on relay for session %" PRIu64 ", chunk id % " PRIu64,
+                       session_id, chunk_id);
+       assert(output->type == CONSUMER_DST_NET);
+
+       memset(&msg, 0, sizeof(msg));
+       msg.cmd_type = LTTNG_CONSUMER_ROTATE_PENDING_RELAY;
+       msg.u.rotate_pending_relay.session_id = session_id;
+       msg.u.rotate_pending_relay.relayd_id = output->net_seq_index;
+       msg.u.rotate_pending_relay.chunk_id = chunk_id;
+
+       health_code_update();
+       ret = consumer_send_msg(socket, &msg);
+       if (ret < 0) {
+               goto error;
+       }
+
+       ret = consumer_socket_recv(socket, &pending, sizeof(pending));
+       if (ret < 0) {
+               goto error;
+       }
+
+       ret = pending;
+
+error:
+       health_code_update();
+       return ret;
+}
+
 /*
  * Ask the consumer to create a directory.
  *
index fbd5b11529e156fa90dceff06ef9126fac1cb3a3..d875f2c6dda7aaa228235f5756cc061766a9c81e 100644 (file)
@@ -329,6 +329,9 @@ int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
 int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id,
                const struct consumer_output *output, const char *old_path,
                const char *new_path, uid_t uid, gid_t gid);
+int consumer_rotate_pending_relay(struct consumer_socket *socket,
+               struct consumer_output *output, uint64_t session_id,
+               uint64_t chunk_id);
 int consumer_mkdir(struct consumer_socket *socket, uint64_t session_id,
                const struct consumer_output *output, const char *path,
                uid_t uid, gid_t gid);
index 49ccea94e381fd6ee520162b8c362f310efe7e9e..3ca80ce6a8a16ad970b6a9c99ddf2a3543201b2d 100644 (file)
@@ -32,6 +32,8 @@
 #include <signal.h>
 #include <inttypes.h>
 
+#include <lttng/rotate-internal.h>
+
 #include "session.h"
 #include "rotate.h"
 #include "rotation-thread.h"
@@ -350,3 +352,50 @@ error:
 end:
        return ret;
 }
+
+int relay_rotate_pending(struct ltt_session *session, uint64_t chunk_id)
+{
+       int ret;
+       struct consumer_socket *socket;
+       struct consumer_output *output;
+       struct lttng_ht_iter iter;
+
+       /*
+        * Either one of the sessions is enough to find the consumer_output
+        * and uid/gid.
+        */
+       if (session->kernel_session) {
+               output = session->kernel_session->consumer;
+       } else if (session->ust_session) {
+               output = session->ust_session->consumer;
+       } else {
+               assert(0);
+       }
+
+       if (!output || !output->socks) {
+               ERR("No consumer output found");
+               ret = -1;
+               goto end;
+       }
+
+       ret = -1;
+
+       rcu_read_lock();
+       /*
+        * We have to iterate to find a socket, but we only need to send the
+        * rotate pending command to one consumer, so we break after the first
+        * one.
+        */
+       cds_lfht_for_each_entry(output->socks->ht, &iter.iter, socket,
+                       node.node) {
+               pthread_mutex_lock(socket->lock);
+               ret = consumer_rotate_pending_relay(socket, output, session->id,
+                               chunk_id);
+               pthread_mutex_unlock(socket->lock);
+               break;
+       }
+       rcu_read_unlock();
+
+end:
+       return ret;
+}
index e2c0829bc5bf3b7384392c4cc33f175be3ea0c69..ea2383f2c9415cc14286587bc74804da5f8f1c89 100644 (file)
@@ -48,6 +48,8 @@ unsigned long hash_channel_key(struct rotation_channel_key *key);
 /* session lock must be held by this function's caller. */
 int rename_complete_chunk(struct ltt_session *session, time_t ts);
 
+int relay_rotate_pending(struct ltt_session *session, uint64_t chunk_id);
+
 /*
  * When we start the rotation of a channel, we add its information in
  * channel_pending_rotate_ht. This is called in the context of
index a1471e3c4e33622c096e8b49c7ac454ff0515b27..21f20b40341023d78aa6533971bd77aa97e6c375 100644 (file)
@@ -369,6 +369,16 @@ int handle_channel_rotation_pipe(int fd, uint32_t revents,
                session->rotate_pending = false;
                session->rotation_status = LTTNG_ROTATION_STATUS_COMPLETED;
                session->last_chunk_start_ts = session->current_chunk_start_ts;
+               if (session->rotate_pending_relay) {
+                       ret = sessiond_timer_rotate_pending_start(
+                                       session,
+                                       DEFAULT_ROTATE_PENDING_RELAY_TIMER);
+                       if (ret) {
+                               ERR("Failed to enable rotate pending timer");
+                               ret = -1;
+                               goto end_unlock_session;
+                       }
+               }
                DBG("Rotation completed for session %s", session->name);
        }
 
@@ -384,6 +394,134 @@ end:
        return ret;
 }
 
+/*
+ * Process the rotate_pending check, called with session lock held.
+ */
+static
+int rotate_pending_relay_timer(struct ltt_session *session)
+{
+       int ret;
+
+       DBG("[rotation-thread] Check rotate pending on session %" PRIu64,
+                       session->id);
+       ret = relay_rotate_pending(session, session->rotate_count - 1);
+       if (ret < 0) {
+               ERR("[rotation-thread] Check relay rotate pending");
+               goto end;
+       }
+       if (ret == 0) {
+               DBG("[rotation-thread] Rotation completed on the relay for "
+                               "session %" PRIu64, session->id);
+               /*
+                * Now we can clear the pending flag in the session. New
+                * rotations can start now.
+                */
+               session->rotate_pending_relay = false;
+       } else if (ret == 1) {
+               DBG("[rotation-thread] Rotation still pending on the relay for "
+                               "session %" PRIu64, session->id);
+               ret = sessiond_timer_rotate_pending_start(session,
+                               DEFAULT_ROTATE_PENDING_RELAY_TIMER);
+               if (ret) {
+                       ERR("Re-enabling rotate pending timer");
+                       ret = -1;
+                       goto end;
+               }
+       }
+
+       ret = 0;
+
+end:
+       return ret;
+}
+
+static
+int handle_rotate_timer_pipe(uint32_t revents,
+               struct rotation_thread_handle *handle,
+               struct rotation_thread_state *state,
+               struct rotation_thread_timer_queue *queue)
+{
+       int ret = 0;
+       int fd = lttng_pipe_get_readfd(queue->event_pipe);
+       struct ltt_session *session;
+       char buf[1];
+
+       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+               ret = lttng_poll_del(&state->events, fd);
+               if (ret) {
+                       ERR("[rotation-thread] Failed to remove consumer "
+                                       "rotate pending pipe from poll set");
+               }
+               goto end;
+       }
+
+       ret = lttng_read(fd, buf, 1);
+       if (ret != 1) {
+               ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
+               ret = -1;
+               goto end;
+       }
+
+       for (;;) {
+               struct sessiond_rotation_timer *timer_data;
+
+               /*
+                * Take the queue lock only to pop elements from the list.
+                */
+               pthread_mutex_lock(&queue->lock);
+               if (cds_list_empty(&queue->list)) {
+                       pthread_mutex_unlock(&queue->lock);
+                       break;
+               }
+               timer_data = cds_list_first_entry(&queue->list,
+                               struct sessiond_rotation_timer, head);
+               cds_list_del(&timer_data->head);
+               pthread_mutex_unlock(&queue->lock);
+
+               /*
+                * session lock to lookup the session ID.
+                */
+               session_lock_list();
+               session = session_find_by_id(timer_data->session_id);
+               if (!session) {
+                       DBG("[rotation-thread] Session %" PRIu64 " not found",
+                                       timer_data->session_id);
+                       /*
+                        * This is a non-fatal error, and we cannot report it to the
+                        * user (timer), so just print the error and continue the
+                        * processing.
+                        */
+                       session_unlock_list();
+                       free(timer_data);
+                       continue;
+               }
+
+               /*
+                * Take the session lock and release the session_list lock.
+                */
+               session_lock(session);
+               session_unlock_list();
+
+               if (timer_data->signal == LTTNG_SESSIOND_SIG_ROTATE_PENDING) {
+                       ret = rotate_pending_relay_timer(session);
+               } else {
+                       ERR("Unknown signal in rotate timer %d", timer_data->signal);
+                       ret = -1;
+               }
+               session_unlock(session);
+               free(timer_data);
+               if (ret) {
+                       ERR("Error processing timer");
+                       goto end;
+               }
+       }
+
+       ret = 0;
+
+end:
+       return ret;
+}
+
 void *thread_rotation(void *data)
 {
        int ret;
@@ -441,6 +579,13 @@ void *thread_rotation(void *data)
                        if (fd == handle->thread_quit_pipe) {
                                DBG("[rotation-thread] Quit pipe activity");
                                goto exit;
+                       } else if (fd == lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe)) {
+                               ret = handle_rotate_timer_pipe(revents,
+                                               handle, &state, handle->rotation_timer_queue);
+                               if (ret) {
+                                       ERR("[rotation-thread] Failed to handle rotation timer pipe event");
+                                       goto error;
+                               }
                        } else if (fd == handle->ust32_consumer ||
                                        fd == handle->ust64_consumer ||
                                        fd == handle->kernel_consumer) {
index 942d68c131daff04de8c8faf786a335522522cd7..0f62ab49fc4acccd824e78e8bbaad3a205583660 100644 (file)
@@ -402,6 +402,7 @@ int session_create(char *name, uid_t uid, gid_t gid)
 
        new_session->rotate_pending = false;
        new_session->rotate_pending_relay = false;
+       new_session->rotate_relay_pending_timer_enabled = false;
 
        /* Add new session to the session list */
        session_lock_list();
index 78890db3cb24bcd786e1c448552e8347e52b9685..2e22885e801c092d23feee3825d2ac222a4befab 100644 (file)
@@ -167,6 +167,12 @@ struct ltt_session {
         * with the current timestamp.
         */
        time_t current_chunk_start_ts;
+       /*
+        * Timer to check periodically if a relay has completed the last
+        * rotation.
+        */
+       bool rotate_relay_pending_timer_enabled;
+       timer_t rotate_relay_pending_timer;
        /*
         * Keep a state if this session was rotated after the last stop command.
         * We only allow one rotation after a stop. At destroy, we also need to
index d7aaca0f11fd5c8b379b39020530d581c53ef769..0d500aa5522ae92bd41a645dc587cd05c44faba7 100644 (file)
@@ -51,6 +51,10 @@ void setmask(sigset_t *mask)
        if (ret) {
                PERROR("sigaddset exit");
        }
+       ret = sigaddset(mask, LTTNG_SESSIOND_SIG_ROTATE_PENDING);
+       if (ret) {
+               PERROR("sigaddset switch");
+       }
 }
 
 /*
@@ -179,6 +183,51 @@ end:
        return ret;
 }
 
+int sessiond_timer_rotate_pending_start(struct ltt_session *session,
+               unsigned int interval_us)
+{
+       int ret;
+
+       DBG("Enabling rotate pending timer on session %" PRIu64, session->id);
+       /*
+        * We arm this timer in a one-shot mode so we don't have to disable it
+        * explicitly (which could deadlock if the timer thread is blocked writing
+        * in the rotation_timer_pipe).
+        * Instead, we re-arm it if needed after the rotation_pending check as
+        * returned. Also, this timer is usually only needed once, so there is no
+        * need to go through the whole signal teardown scheme everytime.
+        */
+       ret = session_timer_start(&session->rotate_relay_pending_timer,
+                       session, interval_us,
+                       LTTNG_SESSIOND_SIG_ROTATE_PENDING,
+                       /* one-shot */ true);
+       if (ret == 0) {
+               session->rotate_relay_pending_timer_enabled = true;
+       }
+
+       return ret;
+}
+
+/*
+ * Stop and delete the channel's live timer.
+ * Called with session and session_list locks held.
+ */
+void sessiond_timer_rotate_pending_stop(struct ltt_session *session)
+{
+       int ret;
+
+       assert(session);
+
+       DBG("Disabling timer rotate pending on session %" PRIu64, session->id);
+       ret = session_timer_stop(&session->rotate_relay_pending_timer,
+                       LTTNG_SESSIOND_SIG_ROTATE_PENDING);
+       if (ret == -1) {
+               ERR("Failed to stop rotate_pending timer");
+       }
+
+       session->rotate_relay_pending_timer_enabled = false;
+}
+
 /*
  * Block the RT signals for the entire process. It must be called from the
  * sessiond main before creating the threads
@@ -199,6 +248,116 @@ int sessiond_timer_signal_init(void)
        return 0;
 }
 
+/*
+ * Called with the rotation_timer_queue lock held.
+ * Return true if the same timer job already exists in the queue, false if not.
+ */
+static
+bool check_duplicate_timer_job(struct timer_thread_parameters *ctx,
+               struct ltt_session *session, unsigned int signal)
+{
+       bool ret = false;
+       struct sessiond_rotation_timer *node;
+
+       rcu_read_lock();
+       cds_list_for_each_entry(node, &ctx->rotation_timer_queue->list, head) {
+               if (node->session_id == session->id && node->signal == signal) {
+                       ret = true;
+                       goto end;
+               }
+       }
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Add the session ID and signal value to the rotation_timer_queue if it is
+ * not already there and wakeup the rotation thread. The rotation thread
+ * empties the whole queue everytime it is woken up. The event_pipe is
+ * non-blocking, if it would block, we just return because we know the
+ * rotation thread will be awaken anyway.
+ */
+static
+int enqueue_timer_rotate_job(struct timer_thread_parameters *ctx,
+               struct ltt_session *session, unsigned int signal)
+{
+       int ret;
+       bool has_duplicate_timer_job;
+       char *c = "!";
+
+       pthread_mutex_lock(&ctx->rotation_timer_queue->lock);
+       has_duplicate_timer_job = check_duplicate_timer_job(ctx, session,
+                       signal);
+
+       if (!has_duplicate_timer_job) {
+               struct sessiond_rotation_timer *timer_data = NULL;
+
+               timer_data = zmalloc(sizeof(struct sessiond_rotation_timer));
+               if (!timer_data) {
+                       PERROR("Allocation of timer data");
+                       goto error;
+               }
+               timer_data->session_id = session->id;
+               timer_data->signal = signal;
+               cds_list_add_tail(&timer_data->head,
+                               &ctx->rotation_timer_queue->list);
+       } else {
+               /*
+                * This timer job is already pending, we don't need to add
+                * it.
+                */
+               pthread_mutex_unlock(&ctx->rotation_timer_queue->lock);
+               ret = 0;
+               goto end;
+       }
+       pthread_mutex_unlock(&ctx->rotation_timer_queue->lock);
+
+       ret = lttng_write(
+                       lttng_pipe_get_writefd(ctx->rotation_timer_queue->event_pipe),
+                       c, 1);
+       if (ret < 0) {
+               /*
+                * We do not want to block in the timer handler, the job has been
+                * enqueued in the list, the wakeup pipe is probably full, the job
+                * will be processed when the rotation_thread catches up.
+                */
+               if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                       ret = 0;
+                       goto end;
+               }
+               PERROR("Timer wakeup rotation thread");
+               goto error;
+       }
+
+       ret = 0;
+       goto end;
+
+error:
+       ret = -1;
+end:
+       return ret;
+}
+
+/*
+ * Ask the rotation thread to check if the last rotation started in this
+ * session is still pending on the relay.
+ */
+static
+void relay_rotation_pending_timer(struct timer_thread_parameters *ctx,
+               int sig, siginfo_t *si)
+{
+       int ret;
+       struct ltt_session *session = si->si_value.sival_ptr;
+       assert(session);
+
+       ret = enqueue_timer_rotate_job(ctx, session, LTTNG_SESSIOND_SIG_ROTATE_PENDING);
+       if (ret) {
+               PERROR("wakeup rotate pipe");
+       }
+}
+
 /*
  * This thread is the sighandler for the timer signals.
  */
@@ -244,6 +403,8 @@ void *sessiond_timer_thread(void *data)
                        DBG("Signal timer metadata thread teardown");
                } else if (signr == LTTNG_SESSIOND_SIG_EXIT) {
                        goto end;
+               } else if (signr == LTTNG_SESSIOND_SIG_ROTATE_PENDING) {
+                       relay_rotation_pending_timer(ctx, info.si_signo, &info);
                } else {
                        ERR("Unexpected signal %d\n", info.si_signo);
                }
index e1b8a7eaded2d0a23ac34dad435611d319d90f19..5c84f3830599d18184eded95889be7917e9f71ef 100644 (file)
@@ -24,6 +24,7 @@
 
 #define LTTNG_SESSIOND_SIG_TEARDOWN            SIGRTMIN + 10
 #define LTTNG_SESSIOND_SIG_EXIT                        SIGRTMIN + 11
+#define LTTNG_SESSIOND_SIG_ROTATE_PENDING      SIGRTMIN + 12
 
 #define CLOCKID CLOCK_MONOTONIC
 
@@ -54,4 +55,8 @@ struct sessiond_rotation_timer {
 void *sessiond_timer_thread(void *data);
 int sessiond_timer_signal_init(void);
 
+int sessiond_timer_rotate_pending_start(struct ltt_session *session, unsigned int
+               interval_us);
+void sessiond_timer_rotate_pending_stop(struct ltt_session *session);
+
 #endif /* SESSIOND_TIMER_H */
index 3ac00cb030bdebc60a3c864a91107873fb683f63..3638273d0797d55f549d1047175f2087a3919c5f 100644 (file)
@@ -4389,6 +4389,27 @@ int lttng_consumer_rotate_rename(const char *old_path, const char *new_path,
        }
 }
 
+int lttng_consumer_rotate_pending_relay(uint64_t session_id,
+               uint64_t relayd_id, uint64_t chunk_id)
+{
+       int ret;
+       struct consumer_relayd_sock_pair *relayd;
+
+       relayd = consumer_find_relayd(relayd_id);
+       if (!relayd) {
+               ERR("Failed to find relayd");
+               ret = -1;
+               goto end;
+       }
+
+       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+       ret = relayd_rotate_pending(&relayd->control_sock, chunk_id);
+       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+
+end:
+       return ret;
+}
+
 static
 int mkdir_local(const char *path, uid_t uid, gid_t gid)
 {
index 083e1abc9f715d86b02101ec3259b47c9b4a9069..38ab1e146740538102b5c669d0a5571ec0c6f37c 100644 (file)
 #define DEFAULT_LTTNG_RELAYD_TCP_KEEP_ALIVE_PROBE_INTERVAL_ENV "LTTNG_RELAYD_TCP_KEEP_ALIVE_PROBE_INTERVAL"
 #define DEFAULT_LTTNG_RELAYD_TCP_KEEP_ALIVE_ABORT_THRESHOLD_ENV "LTTNG_RELAYD_TCP_KEEP_ALIVE_ABORT_THRESHOLD"
 
+/*
+ * Default timer value in usec for the rotate pending polling check on the
+ * relay when a rotation has completed on the consumer.
+ */
+#define DEFAULT_ROTATE_PENDING_RELAY_TIMER     CONFIG_DEFAULT_ROTATE_PENDING_RELAY_TIMER
+
 /*
  * Returns the default subbuf size.
  *
index 2eb1ebe5d2aa1bb555d10c64e817da53179e84d6..ec67e316d714bc1dc030dfb54b065f7e18352360 100644 (file)
@@ -1180,6 +1180,40 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                break;
        }
+       case LTTNG_CONSUMER_ROTATE_PENDING_RELAY:
+       {
+               uint32_t pending_reply;
+
+               DBG("Consumer rotate pending on relay for session %" PRIu64,
+                               msg.u.rotate_pending_relay.session_id);
+               ret = lttng_consumer_rotate_pending_relay(
+                               msg.u.rotate_pending_relay.session_id,
+                               msg.u.rotate_pending_relay.relayd_id,
+                               msg.u.rotate_pending_relay.chunk_id);
+               if (ret < 0) {
+                       ERR("Rotate pending relay failed");
+                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+               } else {
+                       pending_reply = !!ret;
+               }
+
+               health_code_update();
+
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+
+               /* Send back returned value to session daemon */
+               ret = lttcomm_send_unix_sock(sock, &pending_reply,
+                               sizeof(pending_reply));
+               if (ret < 0) {
+                       PERROR("send data pending ret code");
+                       goto error_fatal;
+               }
+               break;
+       }
        case LTTNG_CONSUMER_MKDIR:
        {
                DBG("Consumer mkdir %s in session %" PRIu64,
index 242abbedff15c5c2f079460cb8805dbc567268cd..995e4880197550575667d6e18ca613645b1916b5 100644 (file)
@@ -1086,6 +1086,56 @@ error:
        return ret;
 }
 
+int relayd_rotate_pending(struct lttcomm_relayd_sock *rsock, uint64_t chunk_id)
+{
+       int ret;
+       struct lttcomm_relayd_rotate_pending msg;
+       struct lttcomm_relayd_rotate_pending_reply reply;
+
+       /* Code flow error. Safety net. */
+       assert(rsock);
+
+       DBG("Querying relayd for rotate pending with chunk_id %" PRIu64,
+                       chunk_id);
+
+       memset(&msg, 0, sizeof(msg));
+       msg.chunk_id = htobe64(chunk_id);
+
+       /* Send command */
+       ret = send_command(rsock, RELAYD_ROTATE_PENDING, (void *) &msg,
+                       sizeof(msg), 0);
+       if (ret < 0) {
+               goto error;
+       }
+
+       /* Receive response */
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+       if (ret < 0) {
+               goto error;
+       }
+
+       reply.generic.ret_code = be32toh(reply.generic.ret_code);
+
+       /* Return session id or negative ret code. */
+       if (reply.generic.ret_code != LTTNG_OK) {
+               ret = -reply.generic.ret_code;
+               ERR("Relayd rotate pending replied with error %d", ret);
+               goto error;
+       } else {
+               /* No error, just rotate pending state */
+               if (reply.is_pending == 0 || reply.is_pending == 1) {
+                       ret = reply.is_pending;
+                       DBG("Relayd rotate pending command completed successfully with result \"%s\"",
+                                       ret ? "rotation pending" : "rotation NOT pending");
+               } else {
+                       ret = -LTTNG_ERR_UNK;
+               }
+       }
+
+error:
+       return ret;
+}
+
 int relayd_mkdir(struct lttcomm_relayd_sock *rsock, const char *path)
 {
        int ret;
index f6329eb5b8f2b6912c87b224a12080a594ba2ef8..fb3e94224caf456daa8b0c1af7038d38c6d0e5e2 100644 (file)
@@ -55,6 +55,8 @@ int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id,
                const char *new_pathname, uint64_t new_chunk_id, uint64_t seq_num);
 int relayd_rotate_rename(struct lttcomm_relayd_sock *sock,
                const char *current_path, const char *new_path);
+int relayd_rotate_pending(struct lttcomm_relayd_sock *sock,
+               uint64_t chunk_id);
 int relayd_mkdir(struct lttcomm_relayd_sock *rsock, const char *path);
 
 #endif /* _RELAYD_H */
index e9a7e9ff2534a6783f80e8564a28063c8bea08b0..3bfa3eb28070a46d9d9503272735088862af7542 100644 (file)
@@ -217,6 +217,12 @@ struct lttcomm_relayd_rotate_pending {
        uint64_t chunk_id;
 } LTTNG_PACKED;
 
+struct lttcomm_relayd_rotate_pending_reply {
+       struct lttcomm_relayd_generic_reply generic;
+       /* Valid values are [0, 1]. */
+       uint8_t is_pending;
+} LTTNG_PACKED;
+
 struct lttcomm_relayd_mkdir {
        /* Includes trailing NULL */
        uint32_t length;
index e931c6931aa6cc74a7cb5cf14fb785e44ffdbdbf..4eed95dec50779bb860a2143cca1a3af09445068 100644 (file)
@@ -31,6 +31,7 @@
 #include <lttng/save-internal.h>
 #include <lttng/channel-internal.h>
 #include <lttng/trigger/trigger-internal.h>
+#include <lttng/rotate-internal.h>
 #include <common/compat/socket.h>
 #include <common/uri.h>
 #include <common/defaults.h>
@@ -101,6 +102,7 @@ enum lttcomm_sessiond_command {
        LTTNG_REGISTER_TRIGGER              = 43,
        LTTNG_UNREGISTER_TRIGGER            = 44,
        LTTNG_ROTATE_SESSION                = 45,
+       LTTNG_ROTATE_PENDING                = 46,
 };
 
 enum lttcomm_relayd_command {
@@ -330,6 +332,9 @@ struct lttcomm_session_msg {
                struct {
                        uint32_t length;
                } LTTNG_PACKED trigger;
+               struct {
+                       uint64_t rotate_id;
+               } LTTNG_PACKED rotate_pending;
        } u;
 } LTTNG_PACKED;
 
@@ -558,6 +563,11 @@ struct lttcomm_consumer_msg {
                        uint32_t uid;
                        uint32_t gid;
                } LTTNG_PACKED rotate_rename;
+               struct {
+                       uint64_t relayd_id;
+                       uint64_t session_id;
+                       uint64_t chunk_id;
+               } LTTNG_PACKED rotate_pending_relay;
                struct {
                        char path[LTTNG_PATH_MAX];
                        uint64_t relayd_id; /* Relayd id if apply. */
index 6873273a3aa939ac166c97e939745238cce71ee5..b26cc19f923d23da7806be92069c395b7a99aaa3 100644 (file)
@@ -2010,7 +2010,31 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.rotate_rename.relayd_id);
                if (ret < 0) {
                        ERR("Rotate rename failed");
-                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+                       ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+               }
+
+               health_code_update();
+
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+               break;
+       }
+       case LTTNG_CONSUMER_ROTATE_PENDING_RELAY:
+       {
+               uint32_t pending;
+
+               DBG("Consumer rotate pending on relay for session %" PRIu64,
+                               msg.u.rotate_pending_relay.session_id);
+               pending = lttng_consumer_rotate_pending_relay(
+                               msg.u.rotate_pending_relay.session_id,
+                               msg.u.rotate_pending_relay.relayd_id,
+                               msg.u.rotate_pending_relay.chunk_id);
+               if (pending < 0) {
+                       ERR("Rotate pending relay failed");
+                       ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
                }
 
                health_code_update();
@@ -2020,6 +2044,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        /* Somehow, the session daemon is not responding anymore. */
                        goto end_nosignal;
                }
+
+               /* Send back returned value to session daemon */
+               ret = lttcomm_send_unix_sock(sock, &pending, sizeof(pending));
+               if (ret < 0) {
+                       PERROR("send data pending ret code");
+                       goto error_fatal;
+               }
                break;
        }
        case LTTNG_CONSUMER_MKDIR:
@@ -2033,7 +2064,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.mkdir.relayd_id);
                if (ret < 0) {
                        ERR("consumer mkdir failed");
-                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+                       ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
                }
 
                health_code_update();
This page took 0.040894 seconds and 4 git commands to generate.