_AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_NETWORK_DATA_BIND_ADDRESS], [0.0.0.0])
_AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_NETWORK_VIEWER_BIND_ADDRESS], [localhost])
_AC_DEFINE_AND_SUBST([DEFAULT_NETWORK_RELAYD_CTRL_MAX_PAYLOAD_SIZE], [134217728])
-_AC_DEFINE_AND_SUBST([DEFAULT_ROTATE_PENDING_RELAY_TIMER], [200000])
+_AC_DEFINE_AND_SUBST([DEFAULT_ROTATE_PENDING_TIMER], [500000])
# Command short descriptions
_AC_DEFINE_QUOTED_AND_SUBST([CMD_DESCR_ADD_CONTEXT], [Add context fields to a channel])
session->rotated_after_last_stop = false;
if (session->rotate_timer_period) {
- ret = sessiond_rotate_timer_start(session,
+ ret = timer_session_rotation_schedule_timer_start(session,
session->rotate_timer_period);
if (ret < 0) {
ERR("Failed to enable rotate timer");
goto end;
}
- ret = rename_complete_chunk(session, time(NULL));
+ ret = rename_completed_chunk(session, time(NULL));
if (ret < 0) {
- ERR("Failed to rename current rotate path");
+ ERR("Failed to rename current rotation's path");
goto end;
}
goto error;
}
- if (session->rotate_relay_pending_timer_enabled) {
- sessiond_timer_rotate_pending_stop(session);
+ if (session->rotation_pending_check_timer_enabled) {
+ if (timer_session_rotation_pending_check_stop(session)) {
+ ERR("Failed to stop the \"rotation pending check\" timer of session %s",
+ session->name);
+ }
}
- if (session->rotate_timer_enabled) {
- sessiond_rotate_timer_stop(session);
+ if (session->rotation_schedule_timer_enabled) {
+ if (timer_session_rotation_schedule_timer_stop(
+ session)) {
+ ERR("Failed to stop the \"rotation schedule\" timer of session %s",
+ session->name);
+ }
}
- if (session->current_archive_id > 0 && !session->rotate_pending) {
+ if (session->current_archive_id > 0 &&
+ session->rotation_state != LTTNG_ROTATION_STATE_ONGOING) {
ret = rename_active_chunk(session);
if (ret) {
/*
DBG("Begin destroy session %s (id %" PRIu64 ")", session->name, session->id);
- if (session->rotate_relay_pending_timer_enabled) {
- sessiond_timer_rotate_pending_stop(session);
+ if (session->rotation_pending_check_timer_enabled) {
+ if (timer_session_rotation_pending_check_stop(session)) {
+ ERR("Failed to stop the \"rotation pending check\" timer of session %s",
+ session->name);
+ }
}
- if (session->rotate_timer_enabled) {
- sessiond_rotate_timer_stop(session);
+ if (session->rotation_schedule_timer_enabled) {
+ if (timer_session_rotation_schedule_timer_stop(
+ session)) {
+ ERR("Failed to stop the \"rotation schedule\" timer of session %s",
+ session->name);
+ }
}
if (session->rotate_size) {
}
}
- /*
- * A rotation is still pending, we have to wait.
- */
- if (session->rotate_pending) {
+ /* A rotation is still pending, we have to wait. */
+ if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
DBG("Rotate still pending for session %s", session->name);
ret = 1;
goto error;
struct tm *timeinfo;
char datetime[21];
time_t now;
- bool ust_active = false;
assert(session);
goto end;
}
- if (session->rotate_pending || session->rotate_pending_relay) {
+ if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
ret = -LTTNG_ERR_ROTATION_PENDING;
- DBG("Rotate already in progress");
+ DBG("Refusing to launch a rotation; a rotation is already in progress for session %s",
+ session->name);
goto end;
}
if (session->current_archive_id == 0) {
const char *base_path = NULL;
+ assert(session->kernel_session || session->ust_session);
/* Either one of the two sessions is enough to get the root path. */
- if (session->kernel_session) {
- base_path = session_get_base_path(session);
- } else if (session->ust_session) {
- base_path = session_get_base_path(session);
- } else {
- assert(0);
- }
+ base_path = session_get_base_path(session);
assert(base_path);
+
ret = lttng_strncpy(session->rotation_chunk.current_rotate_path,
base_path,
sizeof(session->rotation_chunk.current_rotate_path));
}
DBG("Current rotate path %s", session->rotation_chunk.current_rotate_path);
+ /*
+ * Channels created after this point will belong to the next
+ * archive id.
+ */
session->current_archive_id++;
- session->rotate_pending = true;
+ /*
+ * A rotation has a local step even if the destination is a relay
+ * daemon; the buffers must be consumed by the consumer daemon.
+ */
+ session->rotation_pending_local = true;
+ session->rotation_pending_relay =
+ session_get_consumer_destination_type(session) == CONSUMER_DST_NET;
session->rotation_state = LTTNG_ROTATION_STATE_ONGOING;
ret = notification_thread_command_session_rotation_ongoing(
notification_thread_handle,
session->name, session->uid, session->gid,
- session->current_archive_id);
+ session->current_archive_id - 1);
if (ret != LTTNG_OK) {
ERR("Failed to notify notification thread that a session rotation is ongoing for session %s",
session->name);
}
- /*
- * Create the path name for the next chunk.
- */
+ /* Create the path name for the next chunk. */
now = time(NULL);
if (now == (time_t) -1) {
ret = -LTTNG_ERR_ROTATION_NOT_AVAILABLE;
ret = -LTTNG_ERR_CREATE_DIR_FAIL;
goto end;
}
- ret = ust_app_rotate_session(session, &ust_active);
+ ret = ust_app_rotate_session(session);
if (ret != LTTNG_OK) {
goto end;
}
- /*
- * Handle the case where we did not start a rotation on any channel.
- * The consumer will never wake up the rotation thread to perform the
- * rename, so we have to do it here while we hold the session and
- * session_list locks.
- */
- if (!session->kernel_session && !ust_active) {
- struct lttng_trace_archive_location *location;
-
- session->rotate_pending = false;
- session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
- ret = rename_complete_chunk(session, now);
- if (ret < 0) {
- ERR("Failed to rename completed rotation chunk");
- goto end;
- }
+ }
- /* Ownership of location is transferred. */
- location = session_get_trace_archive_location(session);
- ret = notification_thread_command_session_rotation_completed(
- notification_thread_handle,
- session->name,
- session->uid,
- session->gid,
- session->current_archive_id,
- location);
- if (ret != LTTNG_OK) {
- ERR("Failed to notify notification thread that rotation is complete for session %s",
- session->name);
- }
- }
+ ret = timer_session_rotation_pending_check_start(session,
+ DEFAULT_ROTATE_PENDING_TIMER);
+ if (ret) {
+ goto end;
}
if (!session->active) {
rotate_return->rotation_id = session->current_archive_id;
}
- DBG("Cmd rotate session %s, current_archive_id %" PRIu64 " sent",
- session->name, session->current_archive_id);
+ DBG("Cmd rotate session %s, archive_id %" PRIu64 " sent",
+ session->name, session->current_archive_id - 1);
ret = LTTNG_OK;
end:
* Only start the timer if the session is active,
* otherwise it will be started when the session starts.
*/
- ret = sessiond_rotate_timer_start(session, new_value);
+ ret = timer_session_rotation_schedule_timer_start(
+ session, new_value);
if (ret) {
ERR("Failed to enable session rotation timer in ROTATION_SET_SCHEDULE command");
ret = LTTNG_ERR_UNK;
goto end;
}
} else {
- ret = sessiond_rotate_timer_stop(session);
+ ret = timer_session_rotation_schedule_timer_stop(
+ session);
if (ret) {
ERR("Failed to disable session rotation timer in ROTATION_SET_SCHEDULE command");
ret = LTTNG_ERR_UNK;
/*
* Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
+ * 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
pipe_name = "channel monitor";
command_name = "SET_CHANNEL_MONITOR_PIPE";
break;
- case LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE:
- pipe_name = "channel rotate";
- command_name = "SET_CHANNEL_ROTATE_PIPE";
- break;
default:
ERR("Unexpected command received in %s (cmd = %d)", __func__,
(int) cmd);
LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE, pipe);
}
-int consumer_send_channel_rotate_pipe(struct consumer_socket *consumer_sock,
- int pipe)
-{
- return consumer_send_pipe(consumer_sock,
- LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE, pipe);
-}
-
/*
* Set consumer subdirectory using the session name and a generated datetime if
* needed. This is appended to the current subdirectory.
int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
uid_t uid, gid_t gid, struct consumer_output *output,
char *domain_path, bool is_metadata_channel,
- uint64_t new_chunk_id,
- bool *rotate_pending_relay)
+ uint64_t new_chunk_id)
{
int ret;
struct lttcomm_consumer_msg msg;
ret = -1;
goto error;
}
- *rotate_pending_relay = true;
} else {
msg.u.rotate_channel.relayd_id = (uint64_t) -1ULL;
ret = snprintf(msg.u.rotate_channel.pathname,
}
/*
- * Ask the relay if a rotation is still pending. Must be called with the socket
- * lock held.
+ * Ask the consumer if a rotation is locally pending. Must be called with the
+ * socket lock held.
+ *
+ * Return 1 if the rotation is still pending, 0 if finished, a negative value
+ * on error.
+ */
+int consumer_check_rotation_pending_local(struct consumer_socket *socket,
+ uint64_t session_id, uint64_t chunk_id)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+ uint32_t pending = 0;
+
+ assert(socket);
+
+ DBG("Asking consumer to locally check for pending rotation for session %" PRIu64 ", chunk id %" PRIu64,
+ session_id, chunk_id);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.cmd_type = LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL;
+ msg.u.check_rotation_pending_local.session_id = session_id;
+ msg.u.check_rotation_pending_local.chunk_id = chunk_id;
+
+ health_code_update();
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ goto error;
+ }
+
+ ret = consumer_socket_recv(socket, &pending, sizeof(pending));
+ if (ret < 0) {
+ goto error;
+ }
+
+ ret = pending;
+
+error:
+ health_code_update();
+ return ret;
+}
+
+/*
+ * Ask the consumer if a rotation is pending on the relayd. Must be called with
+ * the socket lock held.
*
* Return 1 if the rotation is still pending, 0 if finished, a negative value
* on error.
*/
-int consumer_rotate_pending_relay(struct consumer_socket *socket,
- struct consumer_output *output, uint64_t session_id,
+int consumer_check_rotation_pending_relay(struct consumer_socket *socket,
+ const struct consumer_output *output, uint64_t session_id,
uint64_t chunk_id)
{
int ret;
assert(socket);
- DBG("Consumer rotate pending on relay for session %" PRIu64 ", chunk id %" PRIu64,
+ DBG("Asking consumer to check for pending rotation on relay for session %" PRIu64 ", chunk id %" PRIu64,
session_id, chunk_id);
assert(output->type == CONSUMER_DST_NET);
memset(&msg, 0, sizeof(msg));
- msg.cmd_type = LTTNG_CONSUMER_ROTATE_PENDING_RELAY;
- msg.u.rotate_pending_relay.session_id = session_id;
- msg.u.rotate_pending_relay.relayd_id = output->net_seq_index;
- msg.u.rotate_pending_relay.chunk_id = chunk_id;
+ msg.cmd_type = LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY;
+ msg.u.check_rotation_pending_relay.session_id = session_id;
+ msg.u.check_rotation_pending_relay.relayd_id = output->net_seq_index;
+ msg.u.check_rotation_pending_relay.chunk_id = chunk_id;
health_code_update();
ret = consumer_send_msg(socket, &msg);
* consumer.
*/
int channel_monitor_pipe;
- /*
- * Write-end of the channel rotation pipe to be passed to the
- * consumer.
- */
- int channel_rotate_pipe;
/*
* The metadata socket object is handled differently and only created
* locally in this object thus it's the only reference available in the
char *session_name, char *hostname, int session_live_timer);
int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock,
int pipe);
-int consumer_send_channel_rotate_pipe(struct consumer_socket *consumer_sock,
- int pipe);
int consumer_send_destroy_relayd(struct consumer_socket *sock,
struct consumer_output *consumer);
int consumer_recv_status_reply(struct consumer_socket *sock);
uid_t uid, gid_t gid, const char *session_path, int wait,
uint64_t nb_packets_per_stream, uint64_t trace_archive_id);
+/* Rotation commands. */
int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
uid_t uid, gid_t gid, struct consumer_output *output,
- char *domain_path, bool is_metadata_channel, uint64_t new_chunk_id,
- bool *rotate_pending_relay);
+ char *domain_path, bool is_metadata_channel, uint64_t new_chunk_id);
int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id,
const struct consumer_output *output, const char *old_path,
const char *new_path, uid_t uid, gid_t gid);
-int consumer_rotate_pending_relay(struct consumer_socket *socket,
- struct consumer_output *output, uint64_t session_id,
+int consumer_check_rotation_pending_local(struct consumer_socket *socket,
+ uint64_t session_id, uint64_t chunk_id);
+int consumer_check_rotation_pending_relay(struct consumer_socket *socket,
+ const struct consumer_output *output, uint64_t session_id,
uint64_t chunk_id);
int consumer_mkdir(struct consumer_socket *socket, uint64_t session_id,
const struct consumer_output *output, const char *path,
socket, node.node) {
struct ltt_kernel_channel *chan;
- /*
- * Account the metadata channel first to make sure the
- * number of channels waiting for a rotation cannot
- * reach 0 before we complete the iteration over all
- * the channels.
- */
- ret = rotate_add_channel_pending(ksess->metadata->key,
- LTTNG_DOMAIN_KERNEL, session);
- if (ret < 0) {
- ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
- goto error;
- }
-
/* For each channel, ask the consumer to rotate it. */
cds_list_for_each_entry(chan, &ksess->channel_list.head, list) {
- ret = rotate_add_channel_pending(chan->key,
- LTTNG_DOMAIN_KERNEL, session);
- if (ret < 0) {
- ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
- goto error;
- }
-
- DBG("Rotate channel %" PRIu64 ", session %s", chan->key, session->name);
+ DBG("Rotate kernel channel %" PRIu64 ", session %s",
+ chan->key, session->name);
ret = consumer_rotate_channel(socket, chan->key,
ksess->uid, ksess->gid, ksess->consumer,
ksess->consumer->subdir,
/* is_metadata_channel */ false,
- session->current_archive_id,
- &session->rotate_pending_relay);
+ session->current_archive_id);
if (ret < 0) {
ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
goto error;
ksess->uid, ksess->gid, ksess->consumer,
ksess->consumer->subdir,
/* is_metadata_channel */ true,
- session->current_archive_id,
- &session->rotate_pending_relay);
+ session->current_archive_id);
if (ret < 0) {
ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
goto error;
.err_sock = -1,
.cmd_sock = -1,
.channel_monitor_pipe = -1,
- .channel_rotate_pipe = -1,
.pid_mutex = PTHREAD_MUTEX_INITIALIZER,
.lock = PTHREAD_MUTEX_INITIALIZER,
.cond = PTHREAD_COND_INITIALIZER,
.err_sock = -1,
.cmd_sock = -1,
.channel_monitor_pipe = -1,
- .channel_rotate_pipe = -1,
.pid_mutex = PTHREAD_MUTEX_INITIALIZER,
.lock = PTHREAD_MUTEX_INITIALIZER,
.cond = PTHREAD_COND_INITIALIZER,
.err_sock = -1,
.cmd_sock = -1,
.channel_monitor_pipe = -1,
- .channel_rotate_pipe = -1,
.pid_mutex = PTHREAD_MUTEX_INITIALIZER,
.lock = PTHREAD_MUTEX_INITIALIZER,
.cond = PTHREAD_COND_INITIALIZER,
PERROR("UST consumerd64 channel monitor pipe close");
}
}
- if (kconsumer_data.channel_rotate_pipe >= 0) {
- ret = close(kconsumer_data.channel_rotate_pipe);
- if (ret < 0) {
- PERROR("kernel consumer channel rotate pipe close");
- }
- }
- if (ustconsumer32_data.channel_rotate_pipe >= 0) {
- ret = close(ustconsumer32_data.channel_rotate_pipe);
- if (ret < 0) {
- PERROR("UST consumerd32 channel rotate pipe close");
- }
- }
- if (ustconsumer64_data.channel_rotate_pipe >= 0) {
- ret = close(ustconsumer64_data.channel_rotate_pipe);
- if (ret < 0) {
- PERROR("UST consumerd64 channel rotate pipe close");
- }
- }
}
/*
/*
* Transfer the write-end of the channel monitoring and rotate pipe
- * to the consumer by issuing a SET_CHANNEL_MONITOR_PIPE and
- * SET_CHANNEL_ROTATE_PIPE commands.
+ * to the consumer by issuing a SET_CHANNEL_MONITOR_PIPE command.
*/
cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
if (!cmd_socket_wrapper) {
goto error;
}
- ret = consumer_send_channel_rotate_pipe(cmd_socket_wrapper,
- consumer_data->channel_rotate_pipe);
- if (ret) {
- goto error;
- }
-
/* Discard the socket wrapper as it is no longer needed. */
consumer_destroy_socket(cmd_socket_wrapper);
cmd_socket_wrapper = NULL;
}
/*
- * Setup socket for consumer 64 bit. No need for atomic access
+ * Setup socket for consumer 32 bit. No need for atomic access
* since it was set above and can ONLY be set in this thread.
*/
ret = consumer_create_socket(&ustconsumer32_data,
return ret;
}
-static
-struct rotation_thread_timer_queue *create_rotate_timer_queue(void)
-{
- struct rotation_thread_timer_queue *queue = NULL;
-
- queue = zmalloc(sizeof(struct rotation_thread_timer_queue));
- if (!queue) {
- PERROR("Failed to allocate timer rotate queue");
- goto end;
- }
-
- queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
- CDS_INIT_LIST_HEAD(&queue->list);
- pthread_mutex_init(&queue->lock, NULL);
-
-end:
- return queue;
-}
-
-static
-void destroy_rotate_timer_queue(struct rotation_thread_timer_queue *queue)
-{
- struct sessiond_rotation_timer *node, *tmp_node;
-
- if (!queue) {
- return;
- }
-
- lttng_pipe_destroy(queue->event_pipe);
-
- pthread_mutex_lock(&queue->lock);
- /* Empty wait queue. */
- cds_list_for_each_entry_safe(node, tmp_node, &queue->list, head) {
- cds_list_del(&node->head);
- free(node);
- }
- pthread_mutex_unlock(&queue->lock);
-
- pthread_mutex_destroy(&queue->lock);
- free(queue);
-}
-
/*
* main
*/
bool notification_thread_launched = false;
bool rotation_thread_launched = false;
bool timer_thread_launched = false;
- struct lttng_pipe *ust32_channel_rotate_pipe = NULL,
- *ust64_channel_rotate_pipe = NULL,
- *kernel_channel_rotate_pipe = NULL;
struct timer_thread_parameters timer_thread_ctx;
/* Queue of rotation jobs populated by the sessiond-timer. */
struct rotation_thread_timer_queue *rotation_timer_queue = NULL;
goto exit_set_signal_handler;
}
- if (sessiond_timer_signal_init()) {
+ if (timer_signal_init()) {
retval = -1;
goto exit_set_signal_handler;
}
retval = -1;
goto exit_init_data;
}
- kernel_channel_rotate_pipe = lttng_pipe_open(0);
- if (!kernel_channel_rotate_pipe) {
- ERR("Failed to create kernel consumer channel rotate pipe");
- retval = -1;
- goto exit_init_data;
- }
- kconsumer_data.channel_rotate_pipe =
- lttng_pipe_release_writefd(
- kernel_channel_rotate_pipe);
- if (kconsumer_data.channel_rotate_pipe < 0) {
- retval = -1;
- goto exit_init_data;
- }
}
/* Set consumer initial state */
retval = -1;
goto exit_init_data;
}
- ust32_channel_rotate_pipe = lttng_pipe_open(0);
- if (!ust32_channel_rotate_pipe) {
- ERR("Failed to create 32-bit user space consumer channel rotate pipe");
- retval = -1;
- goto exit_init_data;
- }
- ustconsumer32_data.channel_rotate_pipe = lttng_pipe_release_writefd(
- ust32_channel_rotate_pipe);
- if (ustconsumer32_data.channel_rotate_pipe < 0) {
- retval = -1;
- goto exit_init_data;
- }
/*
- * The rotation_timer_queue structure is shared between the sessiond timer
- * thread and the rotation thread. The main() keeps the ownership and
- * destroys it when both threads have quit.
+ * The rotation_thread_timer_queue structure is shared between the
+ * sessiond timer thread and the rotation thread. The main thread keeps
+ * its ownership and destroys it when both threads have been joined.
*/
- rotation_timer_queue = create_rotate_timer_queue();
+ rotation_timer_queue = rotation_thread_timer_queue_create();
if (!rotation_timer_queue) {
retval = -1;
goto exit_init_data;
}
- timer_thread_ctx.rotation_timer_queue = rotation_timer_queue;
+ timer_thread_ctx.rotation_thread_job_queue = rotation_timer_queue;
ust64_channel_monitor_pipe = lttng_pipe_open(0);
if (!ust64_channel_monitor_pipe) {
retval = -1;
goto exit_init_data;
}
- ust64_channel_rotate_pipe = lttng_pipe_open(0);
- if (!ust64_channel_rotate_pipe) {
- ERR("Failed to create 64-bit user space consumer channel rotate pipe");
- retval = -1;
- goto exit_init_data;
- }
- ustconsumer64_data.channel_rotate_pipe = lttng_pipe_release_writefd(
- ust64_channel_rotate_pipe);
- if (ustconsumer64_data.channel_rotate_pipe < 0) {
- retval = -1;
- goto exit_init_data;
- }
/*
* Init UST app hash table. Alloc hash table before this point since
/* Create timer thread. */
ret = pthread_create(&timer_thread, default_pthread_attr(),
- sessiond_timer_thread, &timer_thread_ctx);
+ timer_thread_func, &timer_thread_ctx);
if (ret) {
errno = ret;
PERROR("pthread_create timer");
/* rotation_thread_data acquires the pipes' read side. */
rotation_thread_handle = rotation_thread_handle_create(
- ust32_channel_rotate_pipe,
- ust64_channel_rotate_pipe,
- kernel_channel_rotate_pipe,
thread_quit_pipe[0],
rotation_timer_queue,
notification_thread_handle,
}
if (timer_thread_launched) {
- kill(getpid(), LTTNG_SESSIOND_SIG_EXIT);
+ timer_exit();
ret = pthread_join(timer_thread, &status);
if (ret) {
errno = ret;
* After the rotation and timer thread have quit, we can safely destroy
* the rotation_timer_queue.
*/
- destroy_rotate_timer_queue(rotation_timer_queue);
+ rotation_thread_timer_queue_destroy(rotation_timer_queue);
rcu_thread_offline();
rcu_unregister_thread();
lttng_pipe_destroy(ust32_channel_monitor_pipe);
lttng_pipe_destroy(ust64_channel_monitor_pipe);
lttng_pipe_destroy(kernel_channel_monitor_pipe);
- lttng_pipe_destroy(ust32_channel_rotate_pipe);
- lttng_pipe_destroy(ust64_channel_rotate_pipe);
- lttng_pipe_destroy(kernel_channel_rotate_pipe);
exit_ht_cleanup:
health_app_destroy(health_sessiond);
/*
* Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
#include <urcu/list.h>
#include <urcu/rculfhash.h>
-unsigned long hash_channel_key(struct rotation_channel_key *key)
-{
- return hash_key_u64(&key->key, lttng_ht_seed) ^ hash_key_ulong(
- (void *) (unsigned long) key->domain, lttng_ht_seed);
-}
-
-int rotate_add_channel_pending(uint64_t key, enum lttng_domain_type domain,
- struct ltt_session *session)
-{
- int ret;
- struct rotation_channel_info *new_info;
- struct rotation_channel_key channel_key = { .key = key,
- .domain = domain };
-
- new_info = zmalloc(sizeof(struct rotation_channel_info));
- if (!new_info) {
- goto error;
- }
-
- new_info->channel_key.key = key;
- new_info->channel_key.domain = domain;
- new_info->session_id = session->id;
- cds_lfht_node_init(&new_info->rotate_channels_ht_node);
-
- session->nr_chan_rotate_pending++;
- cds_lfht_add(channel_pending_rotate_ht,
- hash_channel_key(&channel_key),
- &new_info->rotate_channels_ht_node);
-
- ret = 0;
- goto end;
-
-error:
- ret = -1;
-end:
- return ret;
-}
-
/* The session's lock must be held by the caller. */
static
int session_rename_chunk(struct ltt_session *session, char *current_path,
*
* Returns 0 on success, a negative value on error.
*/
-int rename_complete_chunk(struct ltt_session *session, time_t ts)
+int rename_completed_chunk(struct ltt_session *session, time_t ts)
{
struct tm *timeinfo;
char new_path[LTTNG_PATH_MAX];
return ret;
}
-int relay_rotate_pending(struct ltt_session *session, uint64_t chunk_id)
-{
- int ret;
- struct consumer_socket *socket;
- struct consumer_output *output;
- struct lttng_ht_iter iter;
-
- /*
- * Either one of the sessions is enough to find the consumer_output
- * and uid/gid.
- */
- if (session->kernel_session) {
- output = session->kernel_session->consumer;
- } else if (session->ust_session) {
- output = session->ust_session->consumer;
- } else {
- assert(0);
- }
-
- if (!output || !output->socks) {
- ERR("No consumer output found");
- ret = -1;
- goto end;
- }
-
- ret = -1;
-
- rcu_read_lock();
- /*
- * We have to iterate to find a socket, but we only need to send the
- * rotate pending command to one consumer, so we break after the first
- * one.
- */
- cds_lfht_for_each_entry(output->socks->ht, &iter.iter, socket,
- node.node) {
- pthread_mutex_lock(socket->lock);
- ret = consumer_rotate_pending_relay(socket, output, session->id,
- chunk_id);
- pthread_mutex_unlock(socket->lock);
- break;
- }
- rcu_read_unlock();
-
-end:
- return ret;
-}
-
int subscribe_session_consumed_size_rotation(struct ltt_session *session, uint64_t size,
struct notification_thread_handle *notification_thread_handle)
{
/*
* Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
#ifndef ROTATE_H
#define ROTATE_H
-#include <lttng/notification/channel-internal.h>
#include "rotation-thread.h"
+#include <stdint.h>
-/*
- * Key in channel_pending_rotate_ht to map a channel to a
- * struct rotation_channel_info.
- */
-struct rotation_channel_key {
- uint64_t key;
- enum lttng_domain_type domain;
-};
-
-/*
- * Added in channel_pending_rotate_ht everytime we start the rotation of a
- * channel. The consumer notifies the rotation thread with the channel_key to
- * inform a rotation is complete, we use that information to lookup the related
- * session from channel_pending_rotate_ht.
- */
-struct rotation_channel_info {
- uint64_t session_id;
- struct rotation_channel_key channel_key;
- struct cds_lfht_node rotate_channels_ht_node;
-};
-
-
-extern struct cds_lfht *channel_pending_rotate_ht;
-extern struct lttng_notification_channel *rotate_notification_channel;
-
-unsigned long hash_channel_key(struct rotation_channel_key *key);
-
-/* session lock must be held by this function's caller. */
-int rename_complete_chunk(struct ltt_session *session, time_t ts);
-
-int relay_rotate_pending(struct ltt_session *session, uint64_t chunk_id);
-
-/*
- * When we start the rotation of a channel, we add its information in
- * channel_pending_rotate_ht. This is called in the context of
- * thread_manage_client when the client asks for a rotation, in the context
- * of the sessiond_timer thread when periodic rotations are enabled and from
- * the rotation_thread when size-based rotations are enabled.
- */
-int rotate_add_channel_pending(uint64_t key, enum lttng_domain_type domain,
- struct ltt_session *session);
+int rotate_add_pending_rotation(struct ltt_session *session, uint64_t chunk_id);
+int rename_completed_chunk(struct ltt_session *session, time_t ts);
/*
* Subscribe/unsubscribe the notification_channel from the rotation_thread to
/*
* Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
#include <urcu.h>
#include <urcu/list.h>
-#include <urcu/rculfhash.h>
-
-/*
- * Store a struct rotation_channel_info for each channel that is currently
- * being rotated by the consumer.
- */
-struct cds_lfht *channel_pending_rotate_ht;
struct lttng_notification_channel *rotate_notification_channel = NULL;
-struct rotation_thread_state {
+struct rotation_thread {
struct lttng_poll_event events;
};
+struct rotation_thread_job {
+ enum rotation_thread_job_type type;
+ uint64_t session_id;
+ /* List member in struct rotation_thread_timer_queue. */
+ struct cds_list_head head;
+};
+
+/*
+ * The timer thread enqueues jobs and wakes up the rotation thread.
+ * When the rotation thread wakes up, it empties the queue.
+ */
+struct rotation_thread_timer_queue {
+ struct lttng_pipe *event_pipe;
+ struct cds_list_head list;
+ pthread_mutex_t lock;
+};
+
+struct rotation_thread_handle {
+ int quit_pipe;
+ struct rotation_thread_timer_queue *rotation_timer_queue;
+ /* Access to the notification thread cmd_queue */
+ struct notification_thread_handle *notification_thread_handle;
+ sem_t *notification_thread_ready;
+};
+
static
-void channel_rotation_info_destroy(struct rotation_channel_info *channel_info)
+const char *get_job_type_str(enum rotation_thread_job_type job_type)
{
- assert(channel_info);
- free(channel_info);
+ switch (job_type) {
+ case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
+ return "CHECK_PENDING_ROTATION";
+ case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
+ return "SCHEDULED_ROTATION";
+ default:
+ abort();
+ }
}
-static
-int match_channel_info(struct cds_lfht_node *node, const void *key)
+struct rotation_thread_timer_queue *rotation_thread_timer_queue_create(void)
{
- struct rotation_channel_key *channel_key = (struct rotation_channel_key *) key;
- struct rotation_channel_info *channel_info;
+ struct rotation_thread_timer_queue *queue = NULL;
- channel_info = caa_container_of(node, struct rotation_channel_info,
- rotate_channels_ht_node);
+ queue = zmalloc(sizeof(*queue));
+ if (!queue) {
+ PERROR("Failed to allocate timer rotate queue");
+ goto end;
+ }
- return !!((channel_key->key == channel_info->channel_key.key) &&
- (channel_key->domain == channel_info->channel_key.domain));
+ queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
+ CDS_INIT_LIST_HEAD(&queue->list);
+ pthread_mutex_init(&queue->lock, NULL);
+end:
+ return queue;
}
-static
-struct rotation_channel_info *lookup_channel_pending(uint64_t key,
- enum lttng_domain_type domain)
+void log_job_destruction(const struct rotation_thread_job *job)
{
- struct cds_lfht_iter iter;
- struct cds_lfht_node *node;
- struct rotation_channel_info *channel_info = NULL;
- struct rotation_channel_key channel_key = { .key = key,
- .domain = domain };
-
- cds_lfht_lookup(channel_pending_rotate_ht,
- hash_channel_key(&channel_key),
- match_channel_info,
- &channel_key, &iter);
- node = cds_lfht_iter_get_node(&iter);
- if (!node) {
- goto end;
+ enum lttng_error_level log_level;
+ const char *job_type_str = get_job_type_str(job->type);
+
+ switch (job->type) {
+ case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
+ /*
+ * Not a problem, the scheduled rotation is racing with the teardown
+ * of the daemon. In this case, the rotation will not happen, which
+ * is not a problem (or at least, not important enough to delay
+ * the shutdown of the session daemon).
+ */
+ log_level = PRINT_DBG;
+ break;
+ case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
+ /* This is not expected to happen; warn the user. */
+ log_level = PRINT_WARN;
+ break;
+ default:
+ abort();
}
- channel_info = caa_container_of(node, struct rotation_channel_info,
- rotate_channels_ht_node);
- cds_lfht_del(channel_pending_rotate_ht, node);
-end:
- return channel_info;
+ LOG(log_level, "Rotation thread timer queue still contains job of type %s targeting session %" PRIu64 " on destruction",
+ job_type_str, job->session_id);
}
-/*
- * Destroy the thread data previously created by the init function.
- */
-void rotation_thread_handle_destroy(
- struct rotation_thread_handle *handle)
+void rotation_thread_timer_queue_destroy(
+ struct rotation_thread_timer_queue *queue)
{
- int ret;
+ struct rotation_thread_job *job, *tmp_job;
- if (!handle) {
- goto end;
+ if (!queue) {
+ return;
}
- if (handle->ust32_consumer >= 0) {
- ret = close(handle->ust32_consumer);
- if (ret) {
- PERROR("close 32-bit consumer channel rotation pipe");
- }
- }
- if (handle->ust64_consumer >= 0) {
- ret = close(handle->ust64_consumer);
- if (ret) {
- PERROR("close 64-bit consumer channel rotation pipe");
- }
- }
- if (handle->kernel_consumer >= 0) {
- ret = close(handle->kernel_consumer);
- if (ret) {
- PERROR("close kernel consumer channel rotation pipe");
- }
+ lttng_pipe_destroy(queue->event_pipe);
+
+ pthread_mutex_lock(&queue->lock);
+ /* Empty wait queue. */
+ cds_list_for_each_entry_safe(job, tmp_job, &queue->list, head) {
+ log_job_destruction(job);
+ cds_list_del(&job->head);
+ free(job);
}
+ pthread_mutex_unlock(&queue->lock);
+ pthread_mutex_destroy(&queue->lock);
+ free(queue);
+}
-end:
+/*
+ * Destroy the thread data previously created by the init function.
+ */
+void rotation_thread_handle_destroy(
+ struct rotation_thread_handle *handle)
+{
free(handle);
}
struct rotation_thread_handle *rotation_thread_handle_create(
- struct lttng_pipe *ust32_channel_rotate_pipe,
- struct lttng_pipe *ust64_channel_rotate_pipe,
- struct lttng_pipe *kernel_channel_rotate_pipe,
- int thread_quit_pipe,
+ int quit_pipe,
struct rotation_thread_timer_queue *rotation_timer_queue,
struct notification_thread_handle *notification_thread_handle,
sem_t *notification_thread_ready)
goto end;
}
- if (ust32_channel_rotate_pipe) {
- handle->ust32_consumer =
- lttng_pipe_release_readfd(
- ust32_channel_rotate_pipe);
- if (handle->ust32_consumer < 0) {
- goto error;
+ handle->quit_pipe = quit_pipe;
+ handle->rotation_timer_queue = rotation_timer_queue;
+ handle->notification_thread_handle = notification_thread_handle;
+ handle->notification_thread_ready = notification_thread_ready;
+
+end:
+ return handle;
+}
+
+/*
+ * Called with the rotation_thread_timer_queue lock held.
+ * Return true if the same timer job already exists in the queue, false if not.
+ */
+static
+bool timer_job_exists(const struct rotation_thread_timer_queue *queue,
+ enum rotation_thread_job_type job_type, uint64_t session_id)
+{
+ bool exists = false;
+ struct rotation_thread_job *job;
+
+ cds_list_for_each_entry(job, &queue->list, head) {
+ if (job->session_id == session_id && job->type == job_type) {
+ exists = true;
+ goto end;
}
- } else {
- handle->ust32_consumer = -1;
}
- if (ust64_channel_rotate_pipe) {
- handle->ust64_consumer =
- lttng_pipe_release_readfd(
- ust64_channel_rotate_pipe);
- if (handle->ust64_consumer < 0) {
- goto error;
- }
- } else {
- handle->ust64_consumer = -1;
+end:
+ return exists;
+}
+
+void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
+ enum rotation_thread_job_type job_type, uint64_t session_id)
+{
+ int ret;
+ const char * const dummy = "!";
+ struct rotation_thread_job *job = NULL;
+ const char *job_type_str = get_job_type_str(job_type);
+
+ pthread_mutex_lock(&queue->lock);
+ if (timer_job_exists(queue, session_id, job_type)) {
+ /*
+ * This timer job is already pending, we don't need to add
+ * it.
+ */
+ goto end;
}
- if (kernel_channel_rotate_pipe) {
- handle->kernel_consumer =
- lttng_pipe_release_readfd(
- kernel_channel_rotate_pipe);
- if (handle->kernel_consumer < 0) {
- goto error;
+
+ job = zmalloc(sizeof(struct rotation_thread_job));
+ if (!job) {
+ PERROR("Failed to allocate rotation thread job of type \"%s\" for session id %" PRIu64,
+ job_type_str, session_id);
+ goto end;
+ }
+ job->type = job_type;
+ job->session_id = session_id;
+ cds_list_add_tail(&job->head, &queue->list);
+
+ ret = lttng_write(lttng_pipe_get_writefd(queue->event_pipe), dummy,
+ 1);
+ if (ret < 0) {
+ /*
+ * We do not want to block in the timer handler, the job has
+ * been enqueued in the list, the wakeup pipe is probably full,
+ * the job will be processed when the rotation_thread catches
+ * up.
+ */
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ /*
+ * Not an error, but would be surprising and indicate
+ * that the rotation thread can't keep up with the
+ * current load.
+ */
+ DBG("Wake-up pipe of rotation thread job queue is full");
+ goto end;
}
- } else {
- handle->kernel_consumer = -1;
+ PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session id %" PRIu64,
+ job_type_str, session_id);
+ goto end;
}
- handle->thread_quit_pipe = thread_quit_pipe;
- handle->rotation_timer_queue = rotation_timer_queue;
- handle->notification_thread_handle = notification_thread_handle;
- handle->notification_thread_ready = notification_thread_ready;
end:
- return handle;
-error:
- rotation_thread_handle_destroy(handle);
- return NULL;
+ pthread_mutex_unlock(&queue->lock);
}
static
int ret;
/*
- * Create pollset with size 5:
- * - sessiond quit pipe
- * - sessiond timer pipe,
- * - consumerd (32-bit user space) channel rotate pipe,
- * - consumerd (64-bit user space) channel rotate pipe,
- * - consumerd (kernel) channel rotate pipe,
+ * Create pollset with size 2:
+ * - quit pipe,
+ * - rotation thread timer queue pipe,
*/
- ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
+ ret = lttng_poll_create(poll_set, 2, LTTNG_CLOEXEC);
if (ret < 0) {
goto end;
}
- ret = lttng_poll_add(poll_set, handle->thread_quit_pipe,
+ ret = lttng_poll_add(poll_set, handle->quit_pipe,
LPOLLIN | LPOLLERR);
if (ret < 0) {
- ERR("[rotation-thread] Failed to add thread_quit_pipe fd to pollset");
+ ERR("[rotation-thread] Failed to add quit_pipe fd to pollset");
goto error;
}
ret = lttng_poll_add(poll_set,
ERR("[rotation-thread] Failed to add rotate_pending fd to pollset");
goto error;
}
- ret = lttng_poll_add(poll_set, handle->ust32_consumer,
- LPOLLIN | LPOLLERR);
- if (ret < 0) {
- ERR("[rotation-thread] Failed to add ust-32 channel rotation pipe fd to pollset");
- goto error;
- }
- ret = lttng_poll_add(poll_set, handle->ust64_consumer,
- LPOLLIN | LPOLLERR);
- if (ret < 0) {
- ERR("[rotation-thread] Failed to add ust-64 channel rotation pipe fd to pollset");
- goto error;
- }
- if (handle->kernel_consumer >= 0) {
- ret = lttng_poll_add(poll_set, handle->kernel_consumer,
- LPOLLIN | LPOLLERR);
- if (ret < 0) {
- ERR("[rotation-thread] Failed to add kernel channel rotation pipe fd to pollset");
- goto error;
- }
- }
end:
return ret;
}
static
-void fini_thread_state(struct rotation_thread_state *state)
+void fini_thread_state(struct rotation_thread *state)
{
- int ret;
-
lttng_poll_clean(&state->events);
- ret = cds_lfht_destroy(channel_pending_rotate_ht, NULL);
- assert(!ret);
if (rotate_notification_channel) {
lttng_notification_channel_destroy(rotate_notification_channel);
}
static
int init_thread_state(struct rotation_thread_handle *handle,
- struct rotation_thread_state *state)
+ struct rotation_thread *state)
{
int ret;
goto end;
}
- channel_pending_rotate_ht = cds_lfht_new(DEFAULT_HT_SIZE,
- 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
- if (!channel_pending_rotate_ht) {
- ERR("[rotation-thread] Failed to create channel pending rotation hash table");
- ret = -1;
- goto end;
- }
-
/*
* We wait until the notification thread is ready to create the
* notification channel and add it to the poll_set.
}
static
-int handle_channel_rotation_pipe(int fd, uint32_t revents,
- struct rotation_thread_handle *handle,
- struct rotation_thread_state *state)
+int check_session_rotation_pending_local_on_consumer(
+ const struct ltt_session *session,
+ struct consumer_socket *socket, bool *rotation_completed)
{
- int ret = 0;
- enum lttng_domain_type domain;
- struct rotation_channel_info *channel_info;
- struct ltt_session *session = NULL;
- uint64_t key;
-
- if (fd == handle->ust32_consumer ||
- fd == handle->ust64_consumer) {
- domain = LTTNG_DOMAIN_UST;
- } else if (fd == handle->kernel_consumer) {
- domain = LTTNG_DOMAIN_KERNEL;
+ int ret;
+
+ pthread_mutex_lock(socket->lock);
+ DBG("[rotation-thread] Checking for locally pending rotation on the %s consumer for session %s",
+ lttng_consumer_type_str(socket->type),
+ session->name);
+ ret = consumer_check_rotation_pending_local(socket,
+ session->id,
+ session->current_archive_id - 1);
+ pthread_mutex_unlock(socket->lock);
+
+ if (ret == 0) {
+ /* Rotation was completed on this consumer. */
+ DBG("[rotation-thread] Local rotation of trace archive %" PRIu64 " of session \"%s\" was completed on the %s consumer",
+ session->current_archive_id - 1,
+ session->name,
+ lttng_consumer_type_str(socket->type));
+ *rotation_completed = true;
+ } else if (ret == 1) {
+ /* Rotation pending on this consumer. */
+ DBG("[rotation-thread] Local rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the %s consumer",
+ session->current_archive_id - 1,
+ session->name,
+ lttng_consumer_type_str(socket->type));
+ *rotation_completed = false;
+ ret = 0;
} else {
- ERR("[rotation-thread] Unknown channel rotation pipe fd %d",
- fd);
- abort();
+ /* Not a fatal error. */
+ ERR("[rotation-thread] Encountered an error when checking if local rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the %s consumer",
+ session->current_archive_id - 1,
+ session->name,
+ lttng_consumer_type_str(socket->type));
+ *rotation_completed = false;
}
+ return ret;
+}
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- ret = lttng_poll_del(&state->events, fd);
- if (ret) {
- ERR("[rotation-thread] Failed to remove consumer "
- "rotation pipe from poll set");
+static
+int check_session_rotation_pending_local(struct ltt_session *session)
+{
+ int ret;
+ struct consumer_socket *socket;
+ struct cds_lfht_iter iter;
+ bool rotation_completed = true;
+
+ /*
+ * Check for a local pending rotation on all consumers (32-bit
+ * user space, 64-bit user space, and kernel).
+ */
+ DBG("[rotation-thread] Checking for pending local rotation on session \"%s\", trace archive %" PRIu64,
+ session->name, session->current_archive_id - 1);
+
+ rcu_read_lock();
+ if (!session->ust_session) {
+ goto skip_ust;
+ }
+ cds_lfht_for_each_entry(session->ust_session->consumer->socks->ht,
+ &iter, socket, node.node) {
+ ret = check_session_rotation_pending_local_on_consumer(session,
+ socket, &rotation_completed);
+ if (ret || !rotation_completed) {
+ goto end;
}
- goto end;
}
- do {
- ret = read(fd, &key, sizeof(key));
- } while (ret == -1 && errno == EINTR);
- if (ret != sizeof(key)) {
- ERR("[rotation-thread] Failed to read from pipe (fd = %i)",
- fd);
- ret = -1;
- goto end;
+skip_ust:
+ if (!session->kernel_session) {
+ goto skip_kernel;
}
+ cds_lfht_for_each_entry(session->kernel_session->consumer->socks->ht,
+ &iter, socket, node.node) {
+ ret = check_session_rotation_pending_local_on_consumer(session,
+ socket, &rotation_completed);
+ if (ret || !rotation_completed) {
+ goto end;
+ }
+ }
+skip_kernel:
+end:
+ rcu_read_unlock();
- DBG("[rotation-thread] Received notification for chan %" PRIu64
- ", domain %d", key, domain);
-
- channel_info = lookup_channel_pending(key, domain);
- if (!channel_info) {
- ERR("[rotation-thread] Failed to find channel_info (key = %"
- PRIu64 ")", key);
- ret = -1;
- goto end;
+ if (rotation_completed) {
+ DBG("[rotation-thread] Local rotation of trace archive %" PRIu64 " of session \"%s\" is complete on all consumers",
+ session->current_archive_id - 1,
+ session->name);
+ session->rotation_pending_local = false;
}
- rcu_read_lock();
- session_lock_list();
- session = session_find_by_id(channel_info->session_id);
- if (!session) {
- /*
- * The session may have been destroyed before we had a chance to
- * perform this action, return gracefully.
- */
- DBG("[rotation-thread] Session %" PRIu64 " not found",
- channel_info->session_id);
- ret = 0;
- goto end_unlock_session_list;
+ if (ret) {
+ session->rotation_state = LTTNG_ROTATION_STATE_ERROR;
}
+ return 0;
+}
- session_lock(session);
- if (--session->nr_chan_rotate_pending == 0) {
- time_t now = time(NULL);
-
- if (now == (time_t) -1) {
- session->rotation_state = LTTNG_ROTATION_STATE_ERROR;
- ret = LTTNG_ERR_UNK;
- goto end_unlock_session;
- }
+static
+int check_session_rotation_pending_relay(struct ltt_session *session)
+{
+ int ret;
+ struct consumer_socket *socket;
+ struct cds_lfht_iter iter;
+ bool rotation_completed = true;
+ const struct consumer_output *output;
- ret = rename_complete_chunk(session, now);
- if (ret < 0) {
- ERR("Failed to rename completed rotation chunk");
- goto end_unlock_session;
- }
- session->rotate_pending = false;
- session->last_chunk_start_ts = session->current_chunk_start_ts;
- if (session->rotate_pending_relay) {
- ret = sessiond_timer_rotate_pending_start(
- session,
- DEFAULT_ROTATE_PENDING_RELAY_TIMER);
- if (ret) {
- ERR("Failed to enable rotate pending timer");
- ret = -1;
- goto end_unlock_session;
- }
- } else {
- struct lttng_trace_archive_location *location;
-
- session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
- /* Ownership of location is transferred. */
- location = session_get_trace_archive_location(session);
- ret = notification_thread_command_session_rotation_completed(
- notification_thread_handle,
- session->name,
- session->uid,
- session->gid,
- session->current_archive_id,
- location);
- if (ret != LTTNG_OK) {
- ERR("Failed to notify notification thread that rotation is complete for session %s",
- session->name);
- }
+ /*
+ * Check for a pending rotation on any consumer as we only use
+ * it as a "tunnel" to the relayd.
+ */
- }
- DBG("Rotation completed for session %s", session->name);
+ rcu_read_lock();
+ if (session->ust_session) {
+ cds_lfht_first(session->ust_session->consumer->socks->ht,
+ &iter);
+ output = session->ust_session->consumer;
+ } else {
+ cds_lfht_first(session->kernel_session->consumer->socks->ht,
+ &iter);
+ output = session->kernel_session->consumer;
}
+ assert(cds_lfht_iter_get_node(&iter));
- ret = 0;
+ socket = caa_container_of(cds_lfht_iter_get_node(&iter),
+ typeof(*socket), node.node);
+
+ pthread_mutex_lock(socket->lock);
+ DBG("[rotation-thread] Checking for pending relay rotation on session \"%s\", trace archive %" PRIu64 " through the %s consumer",
+ session->name, session->current_archive_id - 1,
+ lttng_consumer_type_str(socket->type));
+ ret = consumer_check_rotation_pending_relay(socket,
+ output,
+ session->id,
+ session->current_archive_id - 1);
+ pthread_mutex_unlock(socket->lock);
+
+ if (ret == 0) {
+ /* Rotation was completed on the relay. */
+ DBG("[rotation-thread] Relay rotation of trace archive %" PRIu64 " of session \"%s\" was completed",
+ session->current_archive_id - 1,
+ session->name);
+ } else if (ret == 1) {
+ /* Rotation pending on relay. */
+ DBG("[rotation-thread] Relay rotation of trace archive %" PRIu64 " of session \"%s\" is pending",
+ session->current_archive_id - 1,
+ session->name);
+ rotation_completed = false;
+ } else {
+ /* Not a fatal error. */
+ ERR("[rotation-thread] Encountered an error when checking if rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the relay",
+ session->current_archive_id - 1,
+ session->name);
+ session->rotation_state = LTTNG_ROTATION_STATE_ERROR;
+ rotation_completed = false;
+ }
-end_unlock_session:
- channel_rotation_info_destroy(channel_info);
- session_unlock(session);
-end_unlock_session_list:
- session_unlock_list();
rcu_read_unlock();
-end:
- return ret;
+
+ if (rotation_completed) {
+ DBG("[rotation-thread] Totation of trace archive %" PRIu64 " of session \"%s\" is complete on the relay",
+ session->current_archive_id - 1,
+ session->name);
+ session->rotation_pending_relay = false;
+ }
+ return 0;
}
/*
- * Process the rotate_pending check, called with session lock held.
+ * Check if the last rotation was completed, called with session lock held.
*/
static
-int rotate_pending_relay_timer(struct ltt_session *session)
+int check_session_rotation_pending(struct ltt_session *session,
+ struct notification_thread_handle *notification_thread_handle)
{
int ret;
+ struct lttng_trace_archive_location *location;
+ time_t now;
- DBG("[rotation-thread] Check rotate pending on session %" PRIu64,
- session->id);
- ret = relay_rotate_pending(session, session->current_archive_id - 1);
- if (ret < 0) {
- ERR("[rotation-thread] Check relay rotate pending");
- goto end;
- }
- if (ret == 0) {
- struct lttng_trace_archive_location *location;
+ DBG("[rotation-thread] Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
+ session->name, session->current_archive_id - 1);
+
+ if (session->rotation_pending_local) {
+ /* Updates session->rotation_pending_local as needed. */
+ ret = check_session_rotation_pending_local(session);
+ if (ret) {
+ goto end;
+ }
- DBG("[rotation-thread] Rotation completed on the relay for "
- "session %" PRIu64, session->id);
/*
- * Now we can clear the pending flag in the session. New
- * rotations can start now.
+ * No need to check for a pending rotation on the relay
+ * since the rotation is not even completed locally yet.
*/
- session->rotate_pending_relay = false;
- session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
-
- session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
- /* Ownership of location is transferred. */
- location = session_get_trace_archive_location(session);
- ret = notification_thread_command_session_rotation_completed(
- notification_thread_handle,
- session->name,
- session->uid,
- session->gid,
- session->current_archive_id,
- location);
- if (ret != LTTNG_OK) {
- ERR("Failed to notify notification thread that rotation is complete for session %s",
- session->name);
+ if (session->rotation_pending_local) {
+ goto end;
}
- } else if (ret == 1) {
- DBG("[rotation-thread] Rotation still pending on the relay for "
- "session %" PRIu64, session->id);
- ret = sessiond_timer_rotate_pending_start(session,
- DEFAULT_ROTATE_PENDING_RELAY_TIMER);
+ }
+
+ if (session->rotation_pending_relay) {
+ /* Updates session->rotation_pending_relay as needed. */
+ ret = check_session_rotation_pending_relay(session);
if (ret) {
- ERR("Re-enabling rotate pending timer");
- ret = -1;
+ goto end;
+ }
+
+ if (session->rotation_pending_relay) {
goto end;
}
}
- ret = 0;
+ DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " completed for "
+ "session %s", session->current_archive_id - 1,
+ session->name);
+ /* Rename the completed trace archive's location. */
+ now = time(NULL);
+ if (now == (time_t) -1) {
+ session->rotation_state = LTTNG_ROTATION_STATE_ERROR;
+ ret = LTTNG_ERR_UNK;
+ goto end;
+ }
+
+ ret = rename_completed_chunk(session, now);
+ if (ret < 0) {
+ ERR("Failed to rename completed rotation chunk");
+ goto end;
+ }
+ session->last_chunk_start_ts = session->current_chunk_start_ts;
+
+ /*
+ * Now we can clear the "ONGOING" state in the session. New
+ * rotations can start now.
+ */
+ session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
+
+ /* Ownership of location is transferred. */
+ location = session_get_trace_archive_location(session);
+ ret = notification_thread_command_session_rotation_completed(
+ notification_thread_handle,
+ session->name,
+ session->uid,
+ session->gid,
+ session->current_archive_id,
+ location);
+ if (ret != LTTNG_OK) {
+ ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s",
+ session->name);
+ }
+
+ ret = 0;
end:
+ if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
+ DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " is still pending for session %s",
+ session->current_archive_id - 1, session->name);
+ ret = timer_session_rotation_pending_check_start(session,
+ DEFAULT_ROTATE_PENDING_TIMER);
+ if (ret) {
+ ERR("Re-enabling rotate pending timer");
+ ret = -1;
+ goto end;
+ }
+ }
+
return ret;
}
-/*
- * Process the rotate_timer, called with session lock held.
- */
+/* Call with the session lock held. */
static
-int rotate_timer(struct ltt_session *session)
+int launch_session_rotation(struct ltt_session *session)
{
int ret;
+ struct lttng_rotate_session_return rotation_return;
- /*
- * Complete _at most_ one scheduled rotation on a stopped session.
- */
- if (!session->active && session->rotate_timer_enabled &&
- session->rotated_after_last_stop) {
- ret = 0;
- goto end;
- }
+ DBG("[rotation-thread] Launching scheduled time-based rotation on session \"%s\"",
+ session->name);
- /* Ignore this timer if a rotation is already in progress. */
- if (session->rotate_pending || session->rotate_pending_relay) {
- ret = 0;
- goto end;
+ ret = cmd_rotate_session(session, &rotation_return);
+ if (ret == LTTNG_OK) {
+ DBG("[rotation-thread] Scheduled time-based rotation successfully launched on session \"%s\"",
+ session->name);
+ } else {
+ /* Don't consider errors as fatal. */
+ DBG("[rotation-thread] Scheduled time-based rotation aborted for session %s: %s",
+ session->name, lttng_strerror(ret));
}
+ return 0;
+}
- DBG("[rotation-thread] Rotate timer on session %s", session->name);
+static
+int run_job(struct rotation_thread_job *job, struct ltt_session *session,
+ struct notification_thread_handle *notification_thread_handle)
+{
+ int ret;
- ret = cmd_rotate_session(session, NULL);
- if (ret == -LTTNG_ERR_ROTATION_PENDING) {
- DBG("Scheduled rotation aborted since a rotation is already in progress");
- ret = 0;
- goto end;
- } else if (ret != LTTNG_OK) {
- ERR("[rotation-thread] Automatic time-triggered rotation failed with error code %i", ret);
- ret = -1;
- goto end;
+ switch (job->type) {
+ case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
+ ret = launch_session_rotation(session);
+ break;
+ case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
+ ret = check_session_rotation_pending(session,
+ notification_thread_handle);
+ break;
+ default:
+ abort();
}
-
- ret = 0;
-
-end:
return ret;
}
static
-int handle_rotate_timer_pipe(uint32_t revents,
- struct rotation_thread_handle *handle,
- struct rotation_thread_state *state,
+int handle_job_queue(struct rotation_thread_handle *handle,
+ struct rotation_thread *state,
struct rotation_thread_timer_queue *queue)
{
int ret = 0;
int fd = lttng_pipe_get_readfd(queue->event_pipe);
struct ltt_session *session;
- char buf[1];
+ char buf;
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- ret = lttng_poll_del(&state->events, fd);
- if (ret) {
- ERR("[rotation-thread] Failed to remove consumer "
- "rotate pending pipe from poll set");
- }
- goto end;
- }
-
- ret = lttng_read(fd, buf, 1);
+ ret = lttng_read(fd, &buf, 1);
if (ret != 1) {
ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
ret = -1;
}
for (;;) {
- struct sessiond_rotation_timer *timer_data;
+ struct rotation_thread_job *job;
- /*
- * Take the queue lock only to pop elements from the list.
- */
+ /* Take the queue lock only to pop an element from the list. */
pthread_mutex_lock(&queue->lock);
if (cds_list_empty(&queue->list)) {
pthread_mutex_unlock(&queue->lock);
break;
}
- timer_data = cds_list_first_entry(&queue->list,
- struct sessiond_rotation_timer, head);
- cds_list_del(&timer_data->head);
+ job = cds_list_first_entry(&queue->list,
+ typeof(*job), head);
+ cds_list_del(&job->head);
pthread_mutex_unlock(&queue->lock);
- /*
- * session lock to lookup the session ID.
- */
session_lock_list();
- session = session_find_by_id(timer_data->session_id);
+ session = session_find_by_id(job->session_id);
if (!session) {
DBG("[rotation-thread] Session %" PRIu64 " not found",
- timer_data->session_id);
+ job->session_id);
/*
- * This is a non-fatal error, and we cannot report it to the
- * user (timer), so just print the error and continue the
- * processing.
+ * This is a non-fatal error, and we cannot report it to
+ * the user (timer), so just print the error and
+ * continue the processing.
+ *
+ * While the timer thread will purge pending signals for
+ * a session on the session's destruction, it is
+ * possible for a job targeting that session to have
+ * already been queued before it was destroyed.
*/
session_unlock_list();
- free(timer_data);
+ free(job);
continue;
}
- /*
- * Take the session lock and release the session_list lock.
- */
session_lock(session);
session_unlock_list();
- if (timer_data->signal == LTTNG_SESSIOND_SIG_ROTATE_PENDING) {
- ret = rotate_pending_relay_timer(session);
- } else if (timer_data->signal == LTTNG_SESSIOND_SIG_ROTATE_TIMER) {
- ret = rotate_timer(session);
- } else {
- ERR("Unknown signal in rotate timer %d", timer_data->signal);
- ret = -1;
- }
+ ret = run_job(job, session, handle->notification_thread_handle);
session_unlock(session);
- free(timer_data);
+ free(job);
if (ret) {
- ERR("Error processing timer");
goto end;
}
}
return ret;
}
-int handle_condition(
- const struct lttng_condition *condition,
+static
+int handle_condition(const struct lttng_condition *condition,
const struct lttng_evaluation *evaluation,
struct notification_thread_handle *notification_thread_handle)
{
}
static
-int handle_notification_channel(int fd, uint32_t revents,
+int handle_notification_channel(int fd,
struct rotation_thread_handle *handle,
- struct rotation_thread_state *state)
+ struct rotation_thread *state)
{
int ret;
bool notification_pending;
{
int ret;
struct rotation_thread_handle *handle = data;
- struct rotation_thread_state state;
+ struct rotation_thread thread;
DBG("[rotation-thread] Started rotation thread");
goto end;
}
- rcu_register_thread();
- rcu_thread_online();
-
health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
health_code_update();
- ret = init_thread_state(handle, &state);
+ ret = init_thread_state(handle, &thread);
if (ret) {
goto end;
}
health_poll_entry();
DBG("[rotation-thread] Entering poll wait");
- ret = lttng_poll_wait(&state.events, -1);
+ ret = lttng_poll_wait(&thread.events, -1);
DBG("[rotation-thread] Poll wait returned (%i)", ret);
health_poll_exit();
if (ret < 0) {
fd_count = ret;
for (i = 0; i < fd_count; i++) {
- int fd = LTTNG_POLL_GETFD(&state.events, i);
- uint32_t revents = LTTNG_POLL_GETEV(&state.events, i);
+ int fd = LTTNG_POLL_GETFD(&thread.events, i);
+ uint32_t revents = LTTNG_POLL_GETEV(&thread.events, i);
DBG("[rotation-thread] Handling fd (%i) activity (%u)",
fd, revents);
- if (fd == handle->thread_quit_pipe) {
+ if (revents & LPOLLERR) {
+ ERR("[rotation-thread] Polling returned an error on fd %i", fd);
+ goto error;
+ }
+
+ if (fd == handle->quit_pipe) {
DBG("[rotation-thread] Quit pipe activity");
+ /* TODO flush the queue. */
goto exit;
} else if (fd == lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe)) {
- ret = handle_rotate_timer_pipe(revents,
- handle, &state, handle->rotation_timer_queue);
+ ret = handle_job_queue(handle, &thread,
+ handle->rotation_timer_queue);
if (ret) {
ERR("[rotation-thread] Failed to handle rotation timer pipe event");
goto error;
}
- } else if (fd == handle->ust32_consumer ||
- fd == handle->ust64_consumer ||
- fd == handle->kernel_consumer) {
- ret = handle_channel_rotation_pipe(fd,
- revents, handle, &state);
- if (ret) {
- ERR("[rotation-thread] Failed to handle channel rotation pipe");
- goto error;
- }
} else if (fd == rotate_notification_channel->socket) {
- ret = handle_notification_channel(fd, revents,
- handle, &state);
+ ret = handle_notification_channel(fd, handle,
+ &thread);
if (ret) {
ERR("[rotation-thread] Error occured while handling activity on notification channel socket");
goto error;
exit:
error:
DBG("[rotation-thread] Exit");
- fini_thread_state(&state);
+ fini_thread_state(&thread);
health_unregister(health_sessiond);
- rcu_thread_offline();
- rcu_unregister_thread();
end:
return NULL;
}
/*
* Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
#include <pthread.h>
#include <semaphore.h>
#include "session.h"
+#include "notification-thread.h"
-/*
- * The timer thread enqueues struct sessiond_rotation_timer objects in the list
- * and wake up the rotation thread. When the rotation thread wakes up, it
- * empties the queue.
- */
-struct rotation_thread_timer_queue {
- struct lttng_pipe *event_pipe;
- struct cds_list_head list;
- pthread_mutex_t lock;
-};
-
-struct rotation_thread_handle {
- /*
- * Read side of pipes used to communicate with the rotation thread.
- */
- /* Notification from the consumers */
- int ust32_consumer;
- int ust64_consumer;
- int kernel_consumer;
- /* quit pipe */
- int thread_quit_pipe;
+extern struct lttng_notification_channel *rotate_notification_channel;
- struct rotation_thread_timer_queue *rotation_timer_queue;
+enum rotation_thread_job_type {
+ ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION,
+ ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION
+};
- /* Access to the notification thread cmd_queue */
- struct notification_thread_handle *notification_thread_handle;
+struct rotation_thread_timer_queue;
+struct rotation_thread_handle;
- sem_t *notification_thread_ready;
-};
+struct rotation_thread_timer_queue *rotation_thread_timer_queue_create(void);
+void rotation_thread_timer_queue_destroy(
+ struct rotation_thread_timer_queue *queue);
struct rotation_thread_handle *rotation_thread_handle_create(
- struct lttng_pipe *ust32_channel_rotate_pipe,
- struct lttng_pipe *ust64_channel_rotate_pipe,
- struct lttng_pipe *kernel_channel_rotate_pipe,
int thread_quit_pipe,
struct rotation_thread_timer_queue *rotation_timer_queue,
struct notification_thread_handle *notification_thread_handle,
void rotation_thread_handle_destroy(
struct rotation_thread_handle *handle);
+void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
+ enum rotation_thread_job_type job_type, uint64_t session_id);
+
void *thread_rotation(void *data);
#endif /* ROTATION_THREAD_H */
goto error;
}
- new_session->rotate_pending = false;
+ new_session->rotation_pending_local = false;
+ new_session->rotation_pending_relay = false;
new_session->rotation_state = LTTNG_ROTATION_STATE_NO_ROTATION;
- new_session->rotate_pending_relay = false;
- new_session->rotate_relay_pending_timer_enabled = false;
- new_session->rotate_timer = false;
+
+ new_session->rotation_pending_check_timer_enabled = false;
+ new_session->rotation_schedule_timer_enabled = false;
/* Add new session to the session list */
session_lock_list();
*/
uint64_t current_archive_id;
/*
- * Rotation is pending between the time it starts until the consumer has
- * finished extracting the data. If the session uses a relay, data related
- * to a rotation can still be in flight after that, see
- * rotate_pending_relay.
- */
- bool rotate_pending;
- /*
- * True until the relay has finished the rotation of all the streams.
+ * Rotation is considered pending between the time it is launched up
+ * until the moment when the data has been writen at the destination
+ * and the trace archive has been renamed.
+ *
+ * When tracing locally, only 'rotation_pending_local' is used since
+ * no remote checks are needed. However, when tracing to a relay daemon,
+ * a second check is needed to ensure that the data has been
+ * commited at the remote destination.
*/
- bool rotate_pending_relay;
+ bool rotation_pending_local;
+ bool rotation_pending_relay;
/* Current state of a rotation. */
enum lttng_rotation_state rotation_state;
- /*
- * Number of channels waiting for a rotation.
- * When this number reaches 0, we can handle the rename of the chunk
- * folder and inform the client that the rotate is finished.
- */
- unsigned int nr_chan_rotate_pending;
struct {
/*
* When the rotation is in progress, the temporary path name is
*/
time_t current_chunk_start_ts;
/*
- * Timer to check periodically if a relay has completed the last
- * rotation.
+ * Timer to check periodically if a relay and/or consumer has completed
+ * the last rotation.
*/
- bool rotate_relay_pending_timer_enabled;
- timer_t rotate_relay_pending_timer;
+ bool rotation_pending_check_timer_enabled;
+ timer_t rotation_pending_check_timer;
/* Timer to periodically rotate a session. */
- bool rotate_timer_enabled;
- timer_t rotate_timer;
+ bool rotation_schedule_timer_enabled;
+ timer_t rotation_schedule_timer;
/* Value for periodic rotations, 0 if disabled. */
uint64_t rotate_timer_period;
/* Value for size-based rotations, 0 if disabled. */
/*
* Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
#include "health-sessiond.h"
#include "rotation-thread.h"
+#define LTTNG_SESSIOND_SIG_QS SIGRTMIN + 10
+#define LTTNG_SESSIOND_SIG_EXIT SIGRTMIN + 11
+#define LTTNG_SESSIOND_SIG_PENDING_ROTATION_CHECK SIGRTMIN + 12
+#define LTTNG_SESSIOND_SIG_SCHEDULED_ROTATION SIGRTMIN + 13
+
+#define UINT_TO_PTR(value) \
+ ({ \
+ assert(value <= UINTPTR_MAX); \
+ (void *) (uintptr_t) value; \
+ })
+#define PTR_TO_UINT(ptr) ((uintptr_t) ptr)
+
+/*
+ * Handle timer teardown race wrt memory free of private data by sessiond
+ * signals are handled by a single thread, which permits a synchronization
+ * point between handling of each signal. Internal lock ensures mutual
+ * exclusion.
+ */
static
-struct timer_signal_data timer_signal = {
+struct timer_signal_data {
+ /* Thread managing signals. */
+ pthread_t tid;
+ int qs_done;
+ pthread_mutex_t lock;
+} timer_signal = {
.tid = 0,
.qs_done = 0,
.lock = PTHREAD_MUTEX_INITIALIZER,
if (ret) {
PERROR("sigemptyset");
}
- ret = sigaddset(mask, LTTNG_SESSIOND_SIG_TEARDOWN);
+ ret = sigaddset(mask, LTTNG_SESSIOND_SIG_QS);
if (ret) {
PERROR("sigaddset teardown");
}
if (ret) {
PERROR("sigaddset exit");
}
- ret = sigaddset(mask, LTTNG_SESSIOND_SIG_ROTATE_PENDING);
+ ret = sigaddset(mask, LTTNG_SESSIOND_SIG_PENDING_ROTATION_CHECK);
if (ret) {
- PERROR("sigaddset switch");
+ PERROR("sigaddset pending rotation check");
}
- ret = sigaddset(mask, LTTNG_SESSIOND_SIG_ROTATE_TIMER);
+ ret = sigaddset(mask, LTTNG_SESSIOND_SIG_SCHEDULED_ROTATION);
if (ret) {
- PERROR("sigaddset switch");
+ PERROR("sigaddset scheduled rotation");
}
}
/*
- * This is the same function as consumer_timer_signal_thread_qs, when it
+ * This is the same function as timer_signal_thread_qs, when it
* returns, it means that no timer signr is currently pending or being handled
* by the timer thread. This cannot be called from the timer thread.
*/
static
-void sessiond_timer_signal_thread_qs(unsigned int signr)
+void timer_signal_thread_qs(unsigned int signr)
{
sigset_t pending_set;
int ret;
cmm_smp_mb();
/*
- * Kill with LTTNG_SESSIOND_SIG_TEARDOWN, so signal management thread
+ * Kill with LTTNG_SESSIOND_SIG_QS, so signal management thread
* wakes up.
*/
- kill(getpid(), LTTNG_SESSIOND_SIG_TEARDOWN);
+ kill(getpid(), LTTNG_SESSIOND_SIG_QS);
while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
caa_cpu_relax();
* a positive value if no timer was created (not an error).
*/
static
-int session_timer_start(timer_t *timer_id, struct ltt_session *session,
+int timer_start(timer_t *timer_id, uint64_t session_id,
unsigned int timer_interval_us, int signal, bool one_shot)
{
int ret = 0, delete_ret;
struct sigevent sev;
struct itimerspec its;
- assert(session);
-
sev.sigev_notify = SIGEV_SIGNAL;
sev.sigev_signo = signal;
- sev.sigev_value.sival_ptr = session;
- ret = timer_create(CLOCKID, &sev, timer_id);
+ sev.sigev_value.sival_ptr = UINT_TO_PTR(session_id);
+ ret = timer_create(CLOCK_MONOTONIC, &sev, timer_id);
if (ret == -1) {
PERROR("timer_create");
goto end;
}
static
-int session_timer_stop(timer_t *timer_id, int signal)
+int timer_stop(timer_t *timer_id, int signal)
{
int ret = 0;
goto end;
}
- sessiond_timer_signal_thread_qs(signal);
+ timer_signal_thread_qs(signal);
*timer_id = 0;
end:
return ret;
}
-int sessiond_timer_rotate_pending_start(struct ltt_session *session,
+int timer_session_rotation_pending_check_start(struct ltt_session *session,
unsigned int interval_us)
{
int ret;
- DBG("Enabling rotate pending timer on session %" PRIu64, session->id);
+ DBG("Enabling session rotation pending check timer on session %" PRIu64,
+ session->id);
/*
* We arm this timer in a one-shot mode so we don't have to disable it
- * explicitly (which could deadlock if the timer thread is blocked writing
- * in the rotation_timer_pipe).
+ * explicitly (which could deadlock if the timer thread is blocked
+ * writing in the rotation_timer_pipe).
+ *
* Instead, we re-arm it if needed after the rotation_pending check as
- * returned. Also, this timer is usually only needed once, so there is no
- * need to go through the whole signal teardown scheme everytime.
+ * returned. Also, this timer is usually only needed once, so there is
+ * no need to go through the whole signal teardown scheme everytime.
*/
- ret = session_timer_start(&session->rotate_relay_pending_timer,
- session, interval_us,
- LTTNG_SESSIOND_SIG_ROTATE_PENDING,
+ ret = timer_start(&session->rotation_pending_check_timer,
+ session->id, interval_us,
+ LTTNG_SESSIOND_SIG_PENDING_ROTATION_CHECK,
/* one-shot */ true);
if (ret == 0) {
- session->rotate_relay_pending_timer_enabled = true;
+ session->rotation_pending_check_timer_enabled = true;
}
return ret;
}
/*
- * Stop and delete the channel's live timer.
- * Called with session and session_list locks held.
+ * Call with session and session_list locks held.
*/
-int sessiond_timer_rotate_pending_stop(struct ltt_session *session)
+int timer_session_rotation_pending_check_stop(struct ltt_session *session)
{
int ret;
assert(session);
- DBG("Disabling timer rotate pending on session %" PRIu64, session->id);
- ret = session_timer_stop(&session->rotate_relay_pending_timer,
- LTTNG_SESSIOND_SIG_ROTATE_PENDING);
+ DBG("Disabling session rotation pending check timer on session %" PRIu64,
+ session->id);
+ ret = timer_stop(&session->rotation_pending_check_timer,
+ LTTNG_SESSIOND_SIG_PENDING_ROTATION_CHECK);
if (ret == -1) {
- ERR("Failed to stop rotate_pending timer");
+ ERR("Failed to stop rotate_pending_check timer");
} else {
- session->rotate_relay_pending_timer_enabled = false;
+ session->rotation_pending_check_timer_enabled = false;
}
return ret;
}
-int sessiond_rotate_timer_start(struct ltt_session *session,
+/*
+ * Call with session and session_list locks held.
+ */
+int timer_session_rotation_schedule_timer_start(struct ltt_session *session,
unsigned int interval_us)
{
int ret;
- DBG("Enabling rotation timer on session \"%s\" (%ui µs)", session->name,
+ DBG("Enabling scheduled rotation timer on session \"%s\" (%ui µs)", session->name,
interval_us);
- ret = session_timer_start(&session->rotate_timer, session, interval_us,
- LTTNG_SESSIOND_SIG_ROTATE_TIMER, false);
+ ret = timer_start(&session->rotation_schedule_timer, session->id,
+ interval_us, LTTNG_SESSIOND_SIG_SCHEDULED_ROTATION,
+ /* one-shot */ false);
if (ret < 0) {
goto end;
}
- session->rotate_timer_enabled = true;
+ session->rotation_schedule_timer_enabled = true;
end:
return ret;
}
/*
- * Stop and delete the channel's live timer.
+ * Call with session and session_list locks held.
*/
-int sessiond_rotate_timer_stop(struct ltt_session *session)
+int timer_session_rotation_schedule_timer_stop(struct ltt_session *session)
{
int ret = 0;
assert(session);
- if (!session->rotate_timer_enabled) {
+ if (!session->rotation_schedule_timer_enabled) {
goto end;
}
- DBG("Disabling rotation timer on session %s", session->name);
- ret = session_timer_stop(&session->rotate_timer,
- LTTNG_SESSIOND_SIG_ROTATE_TIMER);
+ DBG("Disabling scheduled rotation timer on session %s", session->name);
+ ret = timer_stop(&session->rotation_schedule_timer,
+ LTTNG_SESSIOND_SIG_SCHEDULED_ROTATION);
if (ret < 0) {
- ERR("Failed to stop rotate timer of session \"%s\"",
+ ERR("Failed to stop scheduled rotation timer of session \"%s\"",
session->name);
goto end;
}
- session->rotate_timer_enabled = false;
+ session->rotation_schedule_timer_enabled = false;
ret = 0;
end:
return ret;
* Block the RT signals for the entire process. It must be called from the
* sessiond main before creating the threads
*/
-int sessiond_timer_signal_init(void)
+int timer_signal_init(void)
{
int ret;
sigset_t mask;
return 0;
}
-/*
- * Called with the rotation_timer_queue lock held.
- * Return true if the same timer job already exists in the queue, false if not.
- */
-static
-bool check_duplicate_timer_job(struct timer_thread_parameters *ctx,
- struct ltt_session *session, unsigned int signal)
-{
- bool ret = false;
- struct sessiond_rotation_timer *node;
-
- rcu_read_lock();
- cds_list_for_each_entry(node, &ctx->rotation_timer_queue->list, head) {
- if (node->session_id == session->id && node->signal == signal) {
- ret = true;
- goto end;
- }
- }
-
-end:
- rcu_read_unlock();
- return ret;
-}
-
-/*
- * Add the session ID and signal value to the rotation_timer_queue if it is
- * not already there and wakeup the rotation thread. The rotation thread
- * empties the whole queue everytime it is woken up. The event_pipe is
- * non-blocking, if it would block, we just return because we know the
- * rotation thread will be awaken anyway.
- */
-static
-int enqueue_timer_rotate_job(struct timer_thread_parameters *ctx,
- struct ltt_session *session, unsigned int signal)
-{
- int ret;
- char *c = "!";
- struct sessiond_rotation_timer *timer_data = NULL;
-
- pthread_mutex_lock(&ctx->rotation_timer_queue->lock);
- if (check_duplicate_timer_job(ctx, session, signal)) {
- /*
- * This timer job is already pending, we don't need to add
- * it.
- */
- ret = 0;
- goto end;
- }
-
- timer_data = zmalloc(sizeof(struct sessiond_rotation_timer));
- if (!timer_data) {
- PERROR("Allocation of timer data");
- ret = -1;
- goto end;
- }
- timer_data->session_id = session->id;
- timer_data->signal = signal;
- cds_list_add_tail(&timer_data->head,
- &ctx->rotation_timer_queue->list);
-
- ret = lttng_write(
- lttng_pipe_get_writefd(ctx->rotation_timer_queue->event_pipe),
- c, 1);
- if (ret < 0) {
- /*
- * We do not want to block in the timer handler, the job has been
- * enqueued in the list, the wakeup pipe is probably full, the job
- * will be processed when the rotation_thread catches up.
- */
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- ret = 0;
- goto end;
- }
- PERROR("Timer wakeup rotation thread");
- goto end;
- }
-
- ret = 0;
-
-end:
- pthread_mutex_unlock(&ctx->rotation_timer_queue->lock);
- return ret;
-}
-
-/*
- * Ask the rotation thread to check if the last rotation started in this
- * session is still pending on the relay.
- */
-static
-void relay_rotation_pending_timer(struct timer_thread_parameters *ctx,
- int sig, siginfo_t *si)
-{
- struct ltt_session *session = si->si_value.sival_ptr;
-
- assert(session);
-
- (void) enqueue_timer_rotate_job(ctx, session,
- LTTNG_SESSIOND_SIG_ROTATE_PENDING);
-}
-
-/*
- * Handle the LTTNG_SESSIOND_SIG_ROTATE_TIMER timer. Add the session ID to
- * the rotation_timer_queue so the rotation thread can trigger a new rotation
- * on that session.
- */
-static
-void rotate_timer(struct timer_thread_parameters *ctx, int sig, siginfo_t *si)
-{
- int ret;
- /*
- * The session cannot be freed/destroyed while we are running this
- * signal handler.
- */
- struct ltt_session *session = si->si_value.sival_ptr;
- assert(session);
-
- ret = enqueue_timer_rotate_job(ctx, session, LTTNG_SESSIOND_SIG_ROTATE_TIMER);
- if (ret) {
- PERROR("wakeup rotate pipe");
- }
-}
-
/*
* This thread is the sighandler for the timer signals.
*/
-void *sessiond_timer_thread(void *data)
+void *timer_thread_func(void *data)
{
int signr;
sigset_t mask;
rcu_thread_online();
health_register(health_sessiond, HEALTH_SESSIOND_TYPE_TIMER);
-
health_code_update();
/* Only self thread will receive signal mask. */
PERROR("sigwaitinfo");
}
continue;
- } else if (signr == LTTNG_SESSIOND_SIG_TEARDOWN) {
+ } else if (signr == LTTNG_SESSIOND_SIG_QS) {
cmm_smp_mb();
CMM_STORE_SHARED(timer_signal.qs_done, 1);
cmm_smp_mb();
- DBG("Signal timer metadata thread teardown");
} else if (signr == LTTNG_SESSIOND_SIG_EXIT) {
goto end;
- } else if (signr == LTTNG_SESSIOND_SIG_ROTATE_PENDING) {
- relay_rotation_pending_timer(ctx, info.si_signo, &info);
- } else if (signr == LTTNG_SESSIOND_SIG_ROTATE_TIMER) {
- rotate_timer(ctx, info.si_signo, &info);
+ } else if (signr == LTTNG_SESSIOND_SIG_PENDING_ROTATION_CHECK) {
+ rotation_thread_enqueue_job(ctx->rotation_thread_job_queue,
+ ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION,
+ /* session_id */ PTR_TO_UINT(info.si_value.sival_ptr));
+ } else if (signr == LTTNG_SESSIOND_SIG_SCHEDULED_ROTATION) {
+ rotation_thread_enqueue_job(ctx->rotation_thread_job_queue,
+ ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION,
+ /* session_id */ PTR_TO_UINT(info.si_value.sival_ptr));
} else {
ERR("Unexpected signal %d\n", info.si_signo);
}
rcu_unregister_thread();
return NULL;
}
+
+void timer_exit(void)
+{
+ kill(getpid(), LTTNG_SESSIOND_SIG_EXIT);
+}
/*
* Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2 only,
#include "session.h"
-#define LTTNG_SESSIOND_SIG_TEARDOWN SIGRTMIN + 10
-#define LTTNG_SESSIOND_SIG_EXIT SIGRTMIN + 11
-#define LTTNG_SESSIOND_SIG_ROTATE_PENDING SIGRTMIN + 12
-#define LTTNG_SESSIOND_SIG_ROTATE_TIMER SIGRTMIN + 13
-
-#define CLOCKID CLOCK_MONOTONIC
-
-/*
- * Handle timer teardown race wrt memory free of private data by sessiond
- * signals are handled by a single thread, which permits a synchronization
- * point between handling of each signal. Internal lock ensures mutual
- * exclusion.
- */
-struct timer_signal_data {
- /* Thread managing signals. */
- pthread_t tid;
- int qs_done;
- pthread_mutex_t lock;
-};
-
struct timer_thread_parameters {
- struct rotation_thread_timer_queue *rotation_timer_queue;
+ struct rotation_thread_timer_queue *rotation_thread_job_queue;
};
-struct sessiond_rotation_timer {
- uint64_t session_id;
- unsigned int signal;
- /* List member in struct rotation_thread_timer_queue. */
- struct cds_list_head head;
-};
+int timer_signal_init(void);
+void *timer_thread_func(void *data);
-void *sessiond_timer_thread(void *data);
-int sessiond_timer_signal_init(void);
+void timer_exit(void);
-int sessiond_timer_rotate_pending_start(struct ltt_session *session,
+/* Start a session's rotation pending check timer (one-shot mode). */
+int timer_session_rotation_pending_check_start(struct ltt_session *session,
unsigned int interval_us);
-int sessiond_timer_rotate_pending_stop(struct ltt_session *session);
+/* Stop a session's rotation pending check timer. */
+int timer_session_rotation_pending_check_stop(struct ltt_session *session);
-int sessiond_rotate_timer_start(struct ltt_session *session,
+/* Start a session's rotation schedule timer. */
+int timer_session_rotation_schedule_timer_start(struct ltt_session *session,
unsigned int interval_us);
-int sessiond_rotate_timer_stop(struct ltt_session *session);
+/* Stop a session's rotation schedule timer. */
+int timer_session_rotation_schedule_timer_stop(struct ltt_session *session);
#endif /* SESSIOND_TIMER_H */
*
* Return 0 on success or else a negative value.
*/
-int ust_app_rotate_session(struct ltt_session *session, bool *ust_active)
+int ust_app_rotate_session(struct ltt_session *session)
{
int ret = 0;
struct lttng_ht_iter iter;
goto error;
}
- /*
- * Account the metadata channel first to make sure the
- * number of channels waiting for a rotation cannot
- * reach 0 before we complete the iteration over all
- * the channels.
- */
- ret = rotate_add_channel_pending(
- reg->registry->reg.ust->metadata_key,
- LTTNG_DOMAIN_UST, session);
- if (ret < 0) {
- ret = reg->bits_per_long == 32 ?
- -LTTNG_ERR_UST_CONSUMER32_FAIL :
- -LTTNG_ERR_UST_CONSUMER64_FAIL;
- goto error;
- }
-
ret = snprintf(pathname, sizeof(pathname),
DEFAULT_UST_TRACE_DIR "/" DEFAULT_UST_TRACE_UID_PATH,
reg->uid, reg->bits_per_long);
/* Rotate the data channels. */
cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
reg_chan, node.node) {
- ret = rotate_add_channel_pending(
- reg_chan->consumer_key,
- LTTNG_DOMAIN_UST, session);
- if (ret < 0) {
- ret = reg->bits_per_long == 32 ?
- -LTTNG_ERR_UST_CONSUMER32_FAIL :
- -LTTNG_ERR_UST_CONSUMER64_FAIL;
- goto error;
- }
ret = consumer_rotate_channel(socket,
reg_chan->consumer_key,
usess->uid, usess->gid,
usess->consumer, pathname,
/* is_metadata_channel */ false,
- session->current_archive_id,
- &session->rotate_pending_relay);
+ session->current_archive_id);
if (ret < 0) {
goto error;
}
usess->uid, usess->gid,
usess->consumer, pathname,
/* is_metadata_channel */ true,
- session->current_archive_id,
- &session->rotate_pending_relay);
+ session->current_archive_id);
if (ret < 0) {
goto error;
}
- *ust_active = true;
}
break;
}
goto error;
}
- /*
- * Account the metadata channel first to make sure the
- * number of channels waiting for a rotation cannot
- * reach 0 before we complete the iteration over all
- * the channels.
- */
- ret = rotate_add_channel_pending(registry->metadata_key,
- LTTNG_DOMAIN_UST, session);
- if (ret < 0) {
- ret = app->bits_per_long == 32 ?
- -LTTNG_ERR_UST_CONSUMER32_FAIL :
- -LTTNG_ERR_UST_CONSUMER64_FAIL;
- goto error;
- }
/* Rotate the data channels. */
cds_lfht_for_each_entry(ua_sess->channels->ht, &chan_iter.iter,
ua_chan, node.node) {
- ret = rotate_add_channel_pending(
- ua_chan->key, LTTNG_DOMAIN_UST,
- session);
- if (ret < 0) {
- ret = app->bits_per_long == 32 ?
- -LTTNG_ERR_UST_CONSUMER32_FAIL :
- -LTTNG_ERR_UST_CONSUMER64_FAIL;
- goto error;
- }
ret = consumer_rotate_channel(socket, ua_chan->key,
ua_sess->euid, ua_sess->egid,
ua_sess->consumer, pathname,
/* is_metadata_channel */ false,
- session->current_archive_id,
- &session->rotate_pending_relay);
+ session->current_archive_id);
if (ret < 0) {
goto error;
}
ua_sess->euid, ua_sess->egid,
ua_sess->consumer, pathname,
/* is_metadata_channel */ true,
- session->current_archive_id,
- &session->rotate_pending_relay);
+ session->current_archive_id);
if (ret < 0) {
goto error;
}
- *ust_active = true;
}
break;
}
struct consumer_output *consumer,
int overwrite, uint64_t *discarded, uint64_t *lost);
int ust_app_regenerate_statedump_all(struct ltt_ust_session *usess);
-int ust_app_rotate_session(struct ltt_session *session, bool *ust_active);
+int ust_app_rotate_session(struct ltt_session *session);
static inline
int ust_app_supported(void)
}
static inline
-int ust_app_rotate_session(struct ltt_session *session, bool *ust_active)
+int ust_app_rotate_session(struct ltt_session *session)
{
return 0;
}
rcu_read_unlock();
}
-static
-int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
- uint64_t key)
-{
- ssize_t ret;
-
- do {
- ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
- } while (ret == -1 && errno == EINTR);
- if (ret == -1) {
- PERROR("Failed to write to the channel rotation pipe");
- } else {
- DBG("Sent channel rotation notification for channel key %"
- PRIu64, key);
- ret = 0;
- }
-
- return (int) ret;
-}
-
/*
* Perform operations that need to be done after a stream has
* rotated and released the stream lock.
abort();
}
- if (--stream->chan->nr_stream_rotate_pending == 0) {
- DBG("Rotation of channel \"%s\" completed, notifying the session daemon",
- stream->chan->name);
- ret = rotate_notify_sessiond(ctx, stream->chan->key);
- }
pthread_mutex_unlock(&stream->chan->lock);
-
return ret;
}
} else {
ret = rotate_local_stream(ctx, stream);
}
+ stream->trace_archive_id++;
if (ret < 0) {
- ERR("Rotate stream");
+ ERR("Failed to rotate stream, ret = %i", ret);
goto error;
}
}
}
-int lttng_consumer_rotate_pending_relay(uint64_t session_id,
+/* Stream lock must be acquired by the caller. */
+static
+bool check_stream_rotation_pending(const struct lttng_consumer_stream *stream,
+ uint64_t session_id, uint64_t chunk_id)
+{
+ bool pending = false;
+
+ if (stream->session_id != session_id) {
+ /* Skip. */
+ goto end;
+ }
+
+ /*
+ * If the stream's archive_id belongs to the chunk being rotated (or an
+ * even older one), it means that the consumer has not consumed all the
+ * buffers that belong to the chunk being rotated. Therefore, the
+ * rotation is considered as ongoing/pending.
+ */
+ pending = stream->trace_archive_id <= chunk_id;
+end:
+ return pending;
+}
+
+/* RCU read lock must be acquired by the caller. */
+int lttng_consumer_check_rotation_pending_local(uint64_t session_id,
+ uint64_t chunk_id)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+ bool rotation_pending = false;
+
+ /* Start with the metadata streams... */
+ cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
+ pthread_mutex_lock(&stream->lock);
+ rotation_pending = check_stream_rotation_pending(stream,
+ session_id, chunk_id);
+ pthread_mutex_unlock(&stream->lock);
+ if (rotation_pending) {
+ goto end;
+ }
+ }
+
+ /* ... followed by the data streams. */
+ cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+ pthread_mutex_lock(&stream->lock);
+ rotation_pending = check_stream_rotation_pending(stream,
+ session_id, chunk_id);
+ pthread_mutex_unlock(&stream->lock);
+ if (rotation_pending) {
+ goto end;
+ }
+ }
+
+end:
+ return !!rotation_pending;
+}
+
+int lttng_consumer_check_rotation_pending_relay(uint64_t session_id,
uint64_t relayd_id, uint64_t chunk_id)
{
int ret;
relayd = consumer_find_relayd(relayd_id);
if (!relayd) {
- ERR("Failed to find relayd");
+ ERR("Failed to find relayd id %" PRIu64, relayd_id);
ret = -1;
goto end;
}
* Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
* Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
* 2012 - David Goulet <dgoulet@efficios.com>
+ * 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2 only,
LTTNG_CONSUMER_LOST_PACKETS,
LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL,
LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE,
- LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE,
LTTNG_CONSUMER_ROTATE_CHANNEL,
LTTNG_CONSUMER_ROTATE_RENAME,
- LTTNG_CONSUMER_ROTATE_PENDING_RELAY,
+ LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL,
+ LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY,
LTTNG_CONSUMER_MKDIR,
};
/* Copy of the sequence number of the last packet extracted. */
uint64_t last_sequence_number;
/*
- * Session's current trace archive id at the time of the creation of
- * this stream.
+ * A stream is created with a trace_archive_id matching the session's
+ * current trace archive id at the time of the creation of the stream.
+ * It is incremented when the rotate_position is reached.
*/
uint64_t trace_archive_id;
/*
* to the session daemon (write-only).
*/
int channel_monitor_pipe;
- /*
- * Pipe used to inform the session daemon that a stream has finished
- * its rotation (write-only).
- */
- int channel_rotate_pipe;
};
/*
/* Flag used to temporarily pause data consumption from testpoints. */
extern int data_consumption_paused;
+/* Return a human-readable consumer type string that is suitable for logging. */
+static inline
+const char *lttng_consumer_type_str(enum lttng_consumer_type type)
+{
+ switch (type) {
+ case LTTNG_CONSUMER_UNKNOWN:
+ return "unknown";
+ case LTTNG_CONSUMER_KERNEL:
+ return "kernel";
+ case LTTNG_CONSUMER32_UST:
+ return "32-bit user space";
+ case LTTNG_CONSUMER64_UST:
+ return "64-bit user space";
+ default:
+ abort();
+ }
+}
+
/*
* Init consumer data structures.
*/
struct lttng_consumer_local_data *ctx);
int lttng_consumer_rotate_rename(const char *current_path, const char *new_path,
uid_t uid, gid_t gid, uint64_t relayd_id);
-int lttng_consumer_rotate_pending_relay( uint64_t session_id,
+int lttng_consumer_check_rotation_pending_local(uint64_t session_id,
+ uint64_t chunk_id);
+int lttng_consumer_check_rotation_pending_relay(uint64_t session_id,
uint64_t relayd_id, uint64_t chunk_id);
void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream);
int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid,
* Default timer value in usec for the rotate pending polling check on the
* relay when a rotation has completed on the consumer.
*/
-#define DEFAULT_ROTATE_PENDING_RELAY_TIMER CONFIG_DEFAULT_ROTATE_PENDING_RELAY_TIMER
+#define DEFAULT_ROTATE_PENDING_TIMER CONFIG_DEFAULT_ROTATE_PENDING_TIMER
/*
* Returns the default subbuf size.
}
break;
}
- case LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE:
- {
- int channel_rotate_pipe;
- int flags;
-
- ret_code = LTTCOMM_CONSUMERD_SUCCESS;
- /* Successfully received the command's type. */
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
- goto error_fatal;
- }
-
- ret = lttcomm_recv_fds_unix_sock(sock, &channel_rotate_pipe, 1);
- if (ret != (ssize_t) sizeof(channel_rotate_pipe)) {
- ERR("Failed to receive channel rotate pipe");
- goto error_fatal;
- }
-
- DBG("Received channel rotate pipe (%d)", channel_rotate_pipe);
- ctx->channel_rotate_pipe = channel_rotate_pipe;
- /* Set the pipe as non-blocking. */
- ret = fcntl(channel_rotate_pipe, F_GETFL, 0);
- if (ret == -1) {
- PERROR("fcntl get flags of the channel rotate pipe");
- goto error_fatal;
- }
- flags = ret;
-
- ret = fcntl(channel_rotate_pipe, F_SETFL, flags | O_NONBLOCK);
- if (ret == -1) {
- PERROR("fcntl set O_NONBLOCK flag of the channel rotate pipe");
- goto error_fatal;
- }
- DBG("Channel rotate pipe set as non-blocking");
- ret_code = LTTCOMM_CONSUMERD_SUCCESS;
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
- goto error_fatal;
- }
- break;
- }
case LTTNG_CONSUMER_ROTATE_CHANNEL:
{
DBG("Consumer rotate channel %" PRIu64, msg.u.rotate_channel.key);
}
break;
}
- case LTTNG_CONSUMER_ROTATE_PENDING_RELAY:
+ case LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL:
{
int pending;
uint32_t pending_reply;
- DBG("Consumer rotate pending on relay for session %" PRIu64,
- msg.u.rotate_pending_relay.session_id);
- pending = lttng_consumer_rotate_pending_relay(
- msg.u.rotate_pending_relay.session_id,
- msg.u.rotate_pending_relay.relayd_id,
- msg.u.rotate_pending_relay.chunk_id);
+ DBG("Perform local check of pending rotation for session id %" PRIu64,
+ msg.u.check_rotation_pending_local.session_id);
+ pending = lttng_consumer_check_rotation_pending_local(
+ msg.u.check_rotation_pending_local.session_id,
+ msg.u.check_rotation_pending_local.chunk_id);
if (pending < 0) {
- ERR("Rotate pending relay failed");
+ ERR("Local rotation pending check failed with code %i", pending);
ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
} else {
pending_reply = !!pending;
ret = lttcomm_send_unix_sock(sock, &pending_reply,
sizeof(pending_reply));
if (ret < 0) {
- PERROR("send data pending ret code");
+ PERROR("Failed to send rotation pending return code");
+ goto error_fatal;
+ }
+ break;
+ }
+ case LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY:
+ {
+ int pending;
+ uint32_t pending_reply;
+
+ DBG("Perform relayd check of pending rotation for session id %" PRIu64,
+ msg.u.check_rotation_pending_relay.session_id);
+ pending = lttng_consumer_check_rotation_pending_relay(
+ msg.u.check_rotation_pending_relay.session_id,
+ msg.u.check_rotation_pending_relay.relayd_id,
+ msg.u.check_rotation_pending_relay.chunk_id);
+ if (pending < 0) {
+ ERR("Relayd rotation pending check failed with code %i", pending);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ } else {
+ pending_reply = !!pending;
+ }
+
+ health_code_update();
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
+ if (pending < 0) {
+ /*
+ * An error occured while running the command;
+ * don't send the 'pending' flag as the sessiond
+ * will not read it.
+ */
+ break;
+ }
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &pending_reply,
+ sizeof(pending_reply));
+ if (ret < 0) {
+ PERROR("Failed to send rotation pending return code");
goto error_fatal;
}
break;
* operation.
*/
struct lttcomm_consumer_msg {
- uint32_t cmd_type; /* enum consumerd_command */
+ uint32_t cmd_type; /* enum lttng_consumer_command */
union {
struct {
uint64_t channel_key;
uint32_t uid;
uint32_t gid;
} LTTNG_PACKED rotate_rename;
+ struct {
+ uint64_t session_id;
+ uint64_t chunk_id;
+ } LTTNG_PACKED check_rotation_pending_local;
struct {
uint64_t relayd_id;
uint64_t session_id;
uint64_t chunk_id;
- } LTTNG_PACKED rotate_pending_relay;
+ } LTTNG_PACKED check_rotation_pending_relay;
struct {
char path[LTTNG_PATH_MAX];
uint64_t relayd_id; /* Relayd id if apply. */
}
goto end_msg_sessiond;
}
- case LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE:
- {
- int channel_rotate_pipe;
- int flags;
-
- ret_code = LTTCOMM_CONSUMERD_SUCCESS;
- /* Successfully received the command's type. */
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
- goto error_fatal;
- }
-
- ret = lttcomm_recv_fds_unix_sock(sock, &channel_rotate_pipe, 1);
- if (ret != sizeof(channel_rotate_pipe)) {
- ERR("Failed to receive channel rotate pipe");
- goto error_fatal;
- }
-
- DBG("Received channel rotate pipe (%d)", channel_rotate_pipe);
- ctx->channel_rotate_pipe = channel_rotate_pipe;
- /* Set the pipe as non-blocking. */
- ret = fcntl(channel_rotate_pipe, F_GETFL, 0);
- if (ret == -1) {
- PERROR("fcntl get flags of the channel rotate pipe");
- goto error_fatal;
- }
- flags = ret;
-
- ret = fcntl(channel_rotate_pipe, F_SETFL, flags | O_NONBLOCK);
- if (ret == -1) {
- PERROR("fcntl set O_NONBLOCK flag of the channel rotate pipe");
- goto error_fatal;
- }
- DBG("Channel rotate pipe set as non-blocking");
- ret_code = LTTCOMM_CONSUMERD_SUCCESS;
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
- goto error_fatal;
- }
- break;
- }
case LTTNG_CONSUMER_ROTATE_CHANNEL:
{
/*
}
break;
}
- case LTTNG_CONSUMER_ROTATE_PENDING_RELAY:
+ case LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL:
{
int pending;
uint32_t pending_reply;
- DBG("Consumer rotate pending on relay for session %" PRIu64,
- msg.u.rotate_pending_relay.session_id);
- pending = lttng_consumer_rotate_pending_relay(
- msg.u.rotate_pending_relay.session_id,
- msg.u.rotate_pending_relay.relayd_id,
- msg.u.rotate_pending_relay.chunk_id);
+ DBG("Perform local check of pending rotation for session id %" PRIu64,
+ msg.u.check_rotation_pending_local.session_id);
+ pending = lttng_consumer_check_rotation_pending_local(
+ msg.u.check_rotation_pending_local.session_id,
+ msg.u.check_rotation_pending_local.chunk_id);
if (pending < 0) {
- ERR("Rotate pending relay failed");
- ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+ ERR("Local rotation pending check failed with code %i", pending);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
} else {
pending_reply = !!pending;
}
health_code_update();
- /* Send whether the command was successful. */
ret = consumer_send_status_msg(sock, ret_code);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
if (pending < 0) {
/*
* An error occured while running the command;
- * don't send the 'pending' reply as the sessiond
+ * don't send the 'pending' flag as the sessiond
* will not read it.
*/
break;
}
- /* Send back the command's payload (pending reply). */
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &pending_reply,
+ sizeof(pending_reply));
+ if (ret < 0) {
+ PERROR("Failed to send rotation pending return code");
+ goto error_fatal;
+ }
+ break;
+ }
+ case LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY:
+ {
+ int pending;
+ uint32_t pending_reply;
+
+ DBG("Perform relayd check of pending rotation for session id %" PRIu64,
+ msg.u.check_rotation_pending_relay.session_id);
+ pending = lttng_consumer_check_rotation_pending_relay(
+ msg.u.check_rotation_pending_relay.session_id,
+ msg.u.check_rotation_pending_relay.relayd_id,
+ msg.u.check_rotation_pending_relay.chunk_id);
+ if (pending < 0) {
+ ERR("Relayd rotation pending check failed with code %i", pending);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ } else {
+ pending_reply = !!pending;
+ }
+
+ health_code_update();
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
+ if (pending < 0) {
+ /*
+ * An error occured while running the command;
+ * don't send the 'pending' flag as the sessiond
+ * will not read it.
+ */
+ break;
+ }
+
+ /* Send back returned value to session daemon */
ret = lttcomm_send_unix_sock(sock, &pending_reply,
sizeof(pending_reply));
if (ret < 0) {
- PERROR("send data pending ret code");
+ PERROR("Failed to send rotation pending return code");
goto error_fatal;
}
break;
static struct ltt_ust_session *usess;
static struct lttng_domain dom;
-/*
- * Stub to prevent an undefined reference in this test without having to link
- * the entire tree because of a cascade of dependencies. This is not used,
- * it is just there to prevent GCC from complaining.
- */
-int rotate_add_channel_pending(uint64_t key, enum lttng_domain_type domain,
- struct ltt_session *session)
-{
- ERR("Stub called instead of the real function");
- abort();
- return -1;
-}
-
/*
* Return random string of 10 characters.
* Not thread-safe.