_AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_NETWORK_CONTROL_BIND_ADDRESS], [0.0.0.0])
_AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_NETWORK_DATA_BIND_ADDRESS], [0.0.0.0])
_AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_NETWORK_VIEWER_BIND_ADDRESS], [localhost])
+_AC_DEFINE_AND_SUBST([DEFAULT_ROTATE_PENDING_RELAY_TIMER], [200000])
# Command short descriptions
_AC_DEFINE_QUOTED_AND_SUBST([CMD_DESCR_ADD_CONTEXT], [Add context fields to a channel])
{
struct relay_session *session = conn->session;
struct lttcomm_relayd_rotate_pending msg;
- struct lttcomm_relayd_generic_reply reply;
+ struct lttcomm_relayd_rotate_pending_reply reply;
struct lttng_ht_iter iter;
struct relay_stream *stream;
int ret = 0;
send_reply:
rcu_read_unlock();
memset(&reply, 0, sizeof(reply));
- reply.ret_code = htobe32(rotate_pending ? 1 : 0);
+ reply.generic.ret_code = htobe32((uint32_t) LTTNG_OK);
+ reply.is_pending = (uint8_t) !!rotate_pending;
network_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
sizeof(reply), 0);
if (network_ret < (ssize_t) sizeof(reply)) {
goto error;
}
+ if (session->rotate_relay_pending_timer_enabled) {
+ sessiond_timer_rotate_pending_stop(session);
+ }
+
if (session->rotate_count > 0 && !session->rotate_pending) {
ret = rename_active_chunk(session);
if (ret) {
DBG("Begin destroy session %s (id %" PRIu64 ")", session->name, session->id);
+ if (session->rotate_relay_pending_timer_enabled) {
+ sessiond_timer_rotate_pending_stop(session);
+ }
+
/*
* The rename of the current chunk is performed at stop, but if we rotated
* the session after the previous stop command, we need to rename the
return ret;
}
+/*
+ * Ask the relay if a rotation is still pending. Must be called with the socket
+ * lock held.
+ *
+ * Return 1 if the rotation is still pending, 0 if finished, a negative value
+ * on error.
+ */
+int consumer_rotate_pending_relay(struct consumer_socket *socket,
+ struct consumer_output *output, uint64_t session_id,
+ uint64_t chunk_id)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+ uint32_t pending = 0;
+
+ assert(socket);
+
+ DBG("Consumer rotate pending on relay for session %" PRIu64 ", chunk id % " PRIu64,
+ session_id, chunk_id);
+ assert(output->type == CONSUMER_DST_NET);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.cmd_type = LTTNG_CONSUMER_ROTATE_PENDING_RELAY;
+ msg.u.rotate_pending_relay.session_id = session_id;
+ msg.u.rotate_pending_relay.relayd_id = output->net_seq_index;
+ msg.u.rotate_pending_relay.chunk_id = chunk_id;
+
+ health_code_update();
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ goto error;
+ }
+
+ ret = consumer_socket_recv(socket, &pending, sizeof(pending));
+ if (ret < 0) {
+ goto error;
+ }
+
+ ret = pending;
+
+error:
+ health_code_update();
+ return ret;
+}
+
/*
* Ask the consumer to create a directory.
*
int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id,
const struct consumer_output *output, const char *old_path,
const char *new_path, uid_t uid, gid_t gid);
+int consumer_rotate_pending_relay(struct consumer_socket *socket,
+ struct consumer_output *output, uint64_t session_id,
+ uint64_t chunk_id);
int consumer_mkdir(struct consumer_socket *socket, uint64_t session_id,
const struct consumer_output *output, const char *path,
uid_t uid, gid_t gid);
#include <signal.h>
#include <inttypes.h>
+#include <lttng/rotate-internal.h>
+
#include "session.h"
#include "rotate.h"
#include "rotation-thread.h"
end:
return ret;
}
+
+int relay_rotate_pending(struct ltt_session *session, uint64_t chunk_id)
+{
+ int ret;
+ struct consumer_socket *socket;
+ struct consumer_output *output;
+ struct lttng_ht_iter iter;
+
+ /*
+ * Either one of the sessions is enough to find the consumer_output
+ * and uid/gid.
+ */
+ if (session->kernel_session) {
+ output = session->kernel_session->consumer;
+ } else if (session->ust_session) {
+ output = session->ust_session->consumer;
+ } else {
+ assert(0);
+ }
+
+ if (!output || !output->socks) {
+ ERR("No consumer output found");
+ ret = -1;
+ goto end;
+ }
+
+ ret = -1;
+
+ rcu_read_lock();
+ /*
+ * We have to iterate to find a socket, but we only need to send the
+ * rotate pending command to one consumer, so we break after the first
+ * one.
+ */
+ cds_lfht_for_each_entry(output->socks->ht, &iter.iter, socket,
+ node.node) {
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_rotate_pending_relay(socket, output, session->id,
+ chunk_id);
+ pthread_mutex_unlock(socket->lock);
+ break;
+ }
+ rcu_read_unlock();
+
+end:
+ return ret;
+}
/* session lock must be held by this function's caller. */
int rename_complete_chunk(struct ltt_session *session, time_t ts);
+int relay_rotate_pending(struct ltt_session *session, uint64_t chunk_id);
+
/*
* When we start the rotation of a channel, we add its information in
* channel_pending_rotate_ht. This is called in the context of
session->rotate_pending = false;
session->rotation_status = LTTNG_ROTATION_STATUS_COMPLETED;
session->last_chunk_start_ts = session->current_chunk_start_ts;
+ if (session->rotate_pending_relay) {
+ ret = sessiond_timer_rotate_pending_start(
+ session,
+ DEFAULT_ROTATE_PENDING_RELAY_TIMER);
+ if (ret) {
+ ERR("Failed to enable rotate pending timer");
+ ret = -1;
+ goto end_unlock_session;
+ }
+ }
DBG("Rotation completed for session %s", session->name);
}
return ret;
}
+/*
+ * Process the rotate_pending check, called with session lock held.
+ */
+static
+int rotate_pending_relay_timer(struct ltt_session *session)
+{
+ int ret;
+
+ DBG("[rotation-thread] Check rotate pending on session %" PRIu64,
+ session->id);
+ ret = relay_rotate_pending(session, session->rotate_count - 1);
+ if (ret < 0) {
+ ERR("[rotation-thread] Check relay rotate pending");
+ goto end;
+ }
+ if (ret == 0) {
+ DBG("[rotation-thread] Rotation completed on the relay for "
+ "session %" PRIu64, session->id);
+ /*
+ * Now we can clear the pending flag in the session. New
+ * rotations can start now.
+ */
+ session->rotate_pending_relay = false;
+ } else if (ret == 1) {
+ DBG("[rotation-thread] Rotation still pending on the relay for "
+ "session %" PRIu64, session->id);
+ ret = sessiond_timer_rotate_pending_start(session,
+ DEFAULT_ROTATE_PENDING_RELAY_TIMER);
+ if (ret) {
+ ERR("Re-enabling rotate pending timer");
+ ret = -1;
+ goto end;
+ }
+ }
+
+ ret = 0;
+
+end:
+ return ret;
+}
+
+static
+int handle_rotate_timer_pipe(uint32_t revents,
+ struct rotation_thread_handle *handle,
+ struct rotation_thread_state *state,
+ struct rotation_thread_timer_queue *queue)
+{
+ int ret = 0;
+ int fd = lttng_pipe_get_readfd(queue->event_pipe);
+ struct ltt_session *session;
+ char buf[1];
+
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ret = lttng_poll_del(&state->events, fd);
+ if (ret) {
+ ERR("[rotation-thread] Failed to remove consumer "
+ "rotate pending pipe from poll set");
+ }
+ goto end;
+ }
+
+ ret = lttng_read(fd, buf, 1);
+ if (ret != 1) {
+ ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
+ ret = -1;
+ goto end;
+ }
+
+ for (;;) {
+ struct sessiond_rotation_timer *timer_data;
+
+ /*
+ * Take the queue lock only to pop elements from the list.
+ */
+ pthread_mutex_lock(&queue->lock);
+ if (cds_list_empty(&queue->list)) {
+ pthread_mutex_unlock(&queue->lock);
+ break;
+ }
+ timer_data = cds_list_first_entry(&queue->list,
+ struct sessiond_rotation_timer, head);
+ cds_list_del(&timer_data->head);
+ pthread_mutex_unlock(&queue->lock);
+
+ /*
+ * session lock to lookup the session ID.
+ */
+ session_lock_list();
+ session = session_find_by_id(timer_data->session_id);
+ if (!session) {
+ DBG("[rotation-thread] Session %" PRIu64 " not found",
+ timer_data->session_id);
+ /*
+ * This is a non-fatal error, and we cannot report it to the
+ * user (timer), so just print the error and continue the
+ * processing.
+ */
+ session_unlock_list();
+ free(timer_data);
+ continue;
+ }
+
+ /*
+ * Take the session lock and release the session_list lock.
+ */
+ session_lock(session);
+ session_unlock_list();
+
+ if (timer_data->signal == LTTNG_SESSIOND_SIG_ROTATE_PENDING) {
+ ret = rotate_pending_relay_timer(session);
+ } else {
+ ERR("Unknown signal in rotate timer %d", timer_data->signal);
+ ret = -1;
+ }
+ session_unlock(session);
+ free(timer_data);
+ if (ret) {
+ ERR("Error processing timer");
+ goto end;
+ }
+ }
+
+ ret = 0;
+
+end:
+ return ret;
+}
+
void *thread_rotation(void *data)
{
int ret;
if (fd == handle->thread_quit_pipe) {
DBG("[rotation-thread] Quit pipe activity");
goto exit;
+ } else if (fd == lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe)) {
+ ret = handle_rotate_timer_pipe(revents,
+ handle, &state, handle->rotation_timer_queue);
+ if (ret) {
+ ERR("[rotation-thread] Failed to handle rotation timer pipe event");
+ goto error;
+ }
} else if (fd == handle->ust32_consumer ||
fd == handle->ust64_consumer ||
fd == handle->kernel_consumer) {
new_session->rotate_pending = false;
new_session->rotate_pending_relay = false;
+ new_session->rotate_relay_pending_timer_enabled = false;
/* Add new session to the session list */
session_lock_list();
* with the current timestamp.
*/
time_t current_chunk_start_ts;
+ /*
+ * Timer to check periodically if a relay has completed the last
+ * rotation.
+ */
+ bool rotate_relay_pending_timer_enabled;
+ timer_t rotate_relay_pending_timer;
/*
* Keep a state if this session was rotated after the last stop command.
* We only allow one rotation after a stop. At destroy, we also need to
if (ret) {
PERROR("sigaddset exit");
}
+ ret = sigaddset(mask, LTTNG_SESSIOND_SIG_ROTATE_PENDING);
+ if (ret) {
+ PERROR("sigaddset switch");
+ }
}
/*
return ret;
}
+int sessiond_timer_rotate_pending_start(struct ltt_session *session,
+ unsigned int interval_us)
+{
+ int ret;
+
+ DBG("Enabling rotate pending timer on session %" PRIu64, session->id);
+ /*
+ * We arm this timer in a one-shot mode so we don't have to disable it
+ * explicitly (which could deadlock if the timer thread is blocked writing
+ * in the rotation_timer_pipe).
+ * Instead, we re-arm it if needed after the rotation_pending check as
+ * returned. Also, this timer is usually only needed once, so there is no
+ * need to go through the whole signal teardown scheme everytime.
+ */
+ ret = session_timer_start(&session->rotate_relay_pending_timer,
+ session, interval_us,
+ LTTNG_SESSIOND_SIG_ROTATE_PENDING,
+ /* one-shot */ true);
+ if (ret == 0) {
+ session->rotate_relay_pending_timer_enabled = true;
+ }
+
+ return ret;
+}
+
+/*
+ * Stop and delete the channel's live timer.
+ * Called with session and session_list locks held.
+ */
+void sessiond_timer_rotate_pending_stop(struct ltt_session *session)
+{
+ int ret;
+
+ assert(session);
+
+ DBG("Disabling timer rotate pending on session %" PRIu64, session->id);
+ ret = session_timer_stop(&session->rotate_relay_pending_timer,
+ LTTNG_SESSIOND_SIG_ROTATE_PENDING);
+ if (ret == -1) {
+ ERR("Failed to stop rotate_pending timer");
+ }
+
+ session->rotate_relay_pending_timer_enabled = false;
+}
+
/*
* Block the RT signals for the entire process. It must be called from the
* sessiond main before creating the threads
return 0;
}
+/*
+ * Called with the rotation_timer_queue lock held.
+ * Return true if the same timer job already exists in the queue, false if not.
+ */
+static
+bool check_duplicate_timer_job(struct timer_thread_parameters *ctx,
+ struct ltt_session *session, unsigned int signal)
+{
+ bool ret = false;
+ struct sessiond_rotation_timer *node;
+
+ rcu_read_lock();
+ cds_list_for_each_entry(node, &ctx->rotation_timer_queue->list, head) {
+ if (node->session_id == session->id && node->signal == signal) {
+ ret = true;
+ goto end;
+ }
+ }
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Add the session ID and signal value to the rotation_timer_queue if it is
+ * not already there and wakeup the rotation thread. The rotation thread
+ * empties the whole queue everytime it is woken up. The event_pipe is
+ * non-blocking, if it would block, we just return because we know the
+ * rotation thread will be awaken anyway.
+ */
+static
+int enqueue_timer_rotate_job(struct timer_thread_parameters *ctx,
+ struct ltt_session *session, unsigned int signal)
+{
+ int ret;
+ bool has_duplicate_timer_job;
+ char *c = "!";
+
+ pthread_mutex_lock(&ctx->rotation_timer_queue->lock);
+ has_duplicate_timer_job = check_duplicate_timer_job(ctx, session,
+ signal);
+
+ if (!has_duplicate_timer_job) {
+ struct sessiond_rotation_timer *timer_data = NULL;
+
+ timer_data = zmalloc(sizeof(struct sessiond_rotation_timer));
+ if (!timer_data) {
+ PERROR("Allocation of timer data");
+ goto error;
+ }
+ timer_data->session_id = session->id;
+ timer_data->signal = signal;
+ cds_list_add_tail(&timer_data->head,
+ &ctx->rotation_timer_queue->list);
+ } else {
+ /*
+ * This timer job is already pending, we don't need to add
+ * it.
+ */
+ pthread_mutex_unlock(&ctx->rotation_timer_queue->lock);
+ ret = 0;
+ goto end;
+ }
+ pthread_mutex_unlock(&ctx->rotation_timer_queue->lock);
+
+ ret = lttng_write(
+ lttng_pipe_get_writefd(ctx->rotation_timer_queue->event_pipe),
+ c, 1);
+ if (ret < 0) {
+ /*
+ * We do not want to block in the timer handler, the job has been
+ * enqueued in the list, the wakeup pipe is probably full, the job
+ * will be processed when the rotation_thread catches up.
+ */
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ ret = 0;
+ goto end;
+ }
+ PERROR("Timer wakeup rotation thread");
+ goto error;
+ }
+
+ ret = 0;
+ goto end;
+
+error:
+ ret = -1;
+end:
+ return ret;
+}
+
+/*
+ * Ask the rotation thread to check if the last rotation started in this
+ * session is still pending on the relay.
+ */
+static
+void relay_rotation_pending_timer(struct timer_thread_parameters *ctx,
+ int sig, siginfo_t *si)
+{
+ int ret;
+ struct ltt_session *session = si->si_value.sival_ptr;
+ assert(session);
+
+ ret = enqueue_timer_rotate_job(ctx, session, LTTNG_SESSIOND_SIG_ROTATE_PENDING);
+ if (ret) {
+ PERROR("wakeup rotate pipe");
+ }
+}
+
/*
* This thread is the sighandler for the timer signals.
*/
DBG("Signal timer metadata thread teardown");
} else if (signr == LTTNG_SESSIOND_SIG_EXIT) {
goto end;
+ } else if (signr == LTTNG_SESSIOND_SIG_ROTATE_PENDING) {
+ relay_rotation_pending_timer(ctx, info.si_signo, &info);
} else {
ERR("Unexpected signal %d\n", info.si_signo);
}
#define LTTNG_SESSIOND_SIG_TEARDOWN SIGRTMIN + 10
#define LTTNG_SESSIOND_SIG_EXIT SIGRTMIN + 11
+#define LTTNG_SESSIOND_SIG_ROTATE_PENDING SIGRTMIN + 12
#define CLOCKID CLOCK_MONOTONIC
void *sessiond_timer_thread(void *data);
int sessiond_timer_signal_init(void);
+int sessiond_timer_rotate_pending_start(struct ltt_session *session, unsigned int
+ interval_us);
+void sessiond_timer_rotate_pending_stop(struct ltt_session *session);
+
#endif /* SESSIOND_TIMER_H */
}
}
+int lttng_consumer_rotate_pending_relay(uint64_t session_id,
+ uint64_t relayd_id, uint64_t chunk_id)
+{
+ int ret;
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd");
+ ret = -1;
+ goto end;
+ }
+
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_rotate_pending(&relayd->control_sock, chunk_id);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+
+end:
+ return ret;
+}
+
static
int mkdir_local(const char *path, uid_t uid, gid_t gid)
{
#define DEFAULT_LTTNG_RELAYD_TCP_KEEP_ALIVE_PROBE_INTERVAL_ENV "LTTNG_RELAYD_TCP_KEEP_ALIVE_PROBE_INTERVAL"
#define DEFAULT_LTTNG_RELAYD_TCP_KEEP_ALIVE_ABORT_THRESHOLD_ENV "LTTNG_RELAYD_TCP_KEEP_ALIVE_ABORT_THRESHOLD"
+/*
+ * Default timer value in usec for the rotate pending polling check on the
+ * relay when a rotation has completed on the consumer.
+ */
+#define DEFAULT_ROTATE_PENDING_RELAY_TIMER CONFIG_DEFAULT_ROTATE_PENDING_RELAY_TIMER
+
/*
* Returns the default subbuf size.
*
}
break;
}
+ case LTTNG_CONSUMER_ROTATE_PENDING_RELAY:
+ {
+ uint32_t pending_reply;
+
+ DBG("Consumer rotate pending on relay for session %" PRIu64,
+ msg.u.rotate_pending_relay.session_id);
+ ret = lttng_consumer_rotate_pending_relay(
+ msg.u.rotate_pending_relay.session_id,
+ msg.u.rotate_pending_relay.relayd_id,
+ msg.u.rotate_pending_relay.chunk_id);
+ if (ret < 0) {
+ ERR("Rotate pending relay failed");
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ } else {
+ pending_reply = !!ret;
+ }
+
+ health_code_update();
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &pending_reply,
+ sizeof(pending_reply));
+ if (ret < 0) {
+ PERROR("send data pending ret code");
+ goto error_fatal;
+ }
+ break;
+ }
case LTTNG_CONSUMER_MKDIR:
{
DBG("Consumer mkdir %s in session %" PRIu64,
return ret;
}
+int relayd_rotate_pending(struct lttcomm_relayd_sock *rsock, uint64_t chunk_id)
+{
+ int ret;
+ struct lttcomm_relayd_rotate_pending msg;
+ struct lttcomm_relayd_rotate_pending_reply reply;
+
+ /* Code flow error. Safety net. */
+ assert(rsock);
+
+ DBG("Querying relayd for rotate pending with chunk_id %" PRIu64,
+ chunk_id);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.chunk_id = htobe64(chunk_id);
+
+ /* Send command */
+ ret = send_command(rsock, RELAYD_ROTATE_PENDING, (void *) &msg,
+ sizeof(msg), 0);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Receive response */
+ ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+ if (ret < 0) {
+ goto error;
+ }
+
+ reply.generic.ret_code = be32toh(reply.generic.ret_code);
+
+ /* Return session id or negative ret code. */
+ if (reply.generic.ret_code != LTTNG_OK) {
+ ret = -reply.generic.ret_code;
+ ERR("Relayd rotate pending replied with error %d", ret);
+ goto error;
+ } else {
+ /* No error, just rotate pending state */
+ if (reply.is_pending == 0 || reply.is_pending == 1) {
+ ret = reply.is_pending;
+ DBG("Relayd rotate pending command completed successfully with result \"%s\"",
+ ret ? "rotation pending" : "rotation NOT pending");
+ } else {
+ ret = -LTTNG_ERR_UNK;
+ }
+ }
+
+error:
+ return ret;
+}
+
int relayd_mkdir(struct lttcomm_relayd_sock *rsock, const char *path)
{
int ret;
const char *new_pathname, uint64_t new_chunk_id, uint64_t seq_num);
int relayd_rotate_rename(struct lttcomm_relayd_sock *sock,
const char *current_path, const char *new_path);
+int relayd_rotate_pending(struct lttcomm_relayd_sock *sock,
+ uint64_t chunk_id);
int relayd_mkdir(struct lttcomm_relayd_sock *rsock, const char *path);
#endif /* _RELAYD_H */
uint64_t chunk_id;
} LTTNG_PACKED;
+struct lttcomm_relayd_rotate_pending_reply {
+ struct lttcomm_relayd_generic_reply generic;
+ /* Valid values are [0, 1]. */
+ uint8_t is_pending;
+} LTTNG_PACKED;
+
struct lttcomm_relayd_mkdir {
/* Includes trailing NULL */
uint32_t length;
#include <lttng/save-internal.h>
#include <lttng/channel-internal.h>
#include <lttng/trigger/trigger-internal.h>
+#include <lttng/rotate-internal.h>
#include <common/compat/socket.h>
#include <common/uri.h>
#include <common/defaults.h>
LTTNG_REGISTER_TRIGGER = 43,
LTTNG_UNREGISTER_TRIGGER = 44,
LTTNG_ROTATE_SESSION = 45,
+ LTTNG_ROTATE_PENDING = 46,
};
enum lttcomm_relayd_command {
struct {
uint32_t length;
} LTTNG_PACKED trigger;
+ struct {
+ uint64_t rotate_id;
+ } LTTNG_PACKED rotate_pending;
} u;
} LTTNG_PACKED;
uint32_t uid;
uint32_t gid;
} LTTNG_PACKED rotate_rename;
+ struct {
+ uint64_t relayd_id;
+ uint64_t session_id;
+ uint64_t chunk_id;
+ } LTTNG_PACKED rotate_pending_relay;
struct {
char path[LTTNG_PATH_MAX];
uint64_t relayd_id; /* Relayd id if apply. */
msg.u.rotate_rename.relayd_id);
if (ret < 0) {
ERR("Rotate rename failed");
- ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+ }
+
+ health_code_update();
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+ break;
+ }
+ case LTTNG_CONSUMER_ROTATE_PENDING_RELAY:
+ {
+ uint32_t pending;
+
+ DBG("Consumer rotate pending on relay for session %" PRIu64,
+ msg.u.rotate_pending_relay.session_id);
+ pending = lttng_consumer_rotate_pending_relay(
+ msg.u.rotate_pending_relay.session_id,
+ msg.u.rotate_pending_relay.relayd_id,
+ msg.u.rotate_pending_relay.chunk_id);
+ if (pending < 0) {
+ ERR("Rotate pending relay failed");
+ ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
}
health_code_update();
/* Somehow, the session daemon is not responding anymore. */
goto end_nosignal;
}
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &pending, sizeof(pending));
+ if (ret < 0) {
+ PERROR("send data pending ret code");
+ goto error_fatal;
+ }
break;
}
case LTTNG_CONSUMER_MKDIR:
msg.u.mkdir.relayd_id);
if (ret < 0) {
ERR("consumer mkdir failed");
- ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
}
health_code_update();