static pthread_t dispatch_thread;
static pthread_t agent_reg_thread;
static pthread_t load_session_thread;
-static pthread_t rotation_thread;
static pthread_t timer_thread;
/*
/* Am I root or not. Set to 1 if the daemon is running as root */
static int is_root;
-/* Rotation thread handle. */
-static struct rotation_thread_handle *rotation_thread_handle;
-
/*
* Stop all threads by closing the thread quit pipe.
*/
struct lttng_pipe *ust32_channel_monitor_pipe = NULL,
*ust64_channel_monitor_pipe = NULL,
*kernel_channel_monitor_pipe = NULL;
- bool rotation_thread_launched = false;
bool timer_thread_launched = false;
struct lttng_thread *ht_cleanup_thread = NULL;
struct timer_thread_parameters timer_thread_ctx;
+ /* Rotation thread handle. */
+ struct rotation_thread_handle *rotation_thread_handle = NULL;
/* Queue of rotation jobs populated by the sessiond-timer. */
struct rotation_thread_timer_queue *rotation_timer_queue = NULL;
}
/* Create rotation thread. */
- ret = pthread_create(&rotation_thread, default_pthread_attr(),
- thread_rotation, rotation_thread_handle);
- if (ret) {
- errno = ret;
- PERROR("pthread_create rotation");
+ if (!launch_rotation_thread(rotation_thread_handle)) {
retval = -1;
- stop_threads();
goto exit_rotation;
}
- rotation_thread_launched = true;
/* Create thread to manage the client socket */
ret = pthread_create(&client_thread, default_pthread_attr(),
*/
rcu_barrier();
- if (rotation_thread_handle) {
- if (rotation_thread_launched) {
- ret = pthread_join(rotation_thread, &status);
- if (ret) {
- errno = ret;
- PERROR("pthread_join rotation thread");
- retval = -1;
- }
- }
- rotation_thread_handle_destroy(rotation_thread_handle);
- }
-
if (timer_thread_launched) {
timer_exit();
ret = pthread_join(timer_thread, &status);
lttng_thread_put(ht_cleanup_thread);
}
+ rcu_thread_offline();
+ rcu_unregister_thread();
+
+ if (rotation_thread_handle) {
+ rotation_thread_handle_destroy(rotation_thread_handle);
+ }
+
/*
* After the rotation and timer thread have quit, we can safely destroy
* the rotation_timer_queue.
*/
rotation_thread_timer_queue_destroy(rotation_timer_queue);
-
- rcu_thread_offline();
- rcu_unregister_thread();
-
/*
* The teardown of the notification system is performed after the
* session daemon's teardown in order to allow it to be notified
#include "session.h"
#include "timer.h"
#include "notification-thread-commands.h"
+#include "utils.h"
+#include "thread.h"
#include <urcu.h>
#include <urcu/list.h>
struct rotation_thread_timer_queue *rotation_timer_queue;
/* Access to the notification thread cmd_queue */
struct notification_thread_handle *notification_thread_handle;
+ /* Thread-specific quit pipe. */
+ struct lttng_pipe *quit_pipe;
};
static
void rotation_thread_timer_queue_destroy(
struct rotation_thread_timer_queue *queue)
{
- struct rotation_thread_job *job, *tmp_job;
-
if (!queue) {
return;
}
lttng_pipe_destroy(queue->event_pipe);
pthread_mutex_lock(&queue->lock);
- /* Empty wait queue. */
- cds_list_for_each_entry_safe(job, tmp_job, &queue->list, head) {
- log_job_destruction(job);
- cds_list_del(&job->head);
- free(job);
- }
+ assert(cds_list_empty(&queue->list));
pthread_mutex_unlock(&queue->lock);
pthread_mutex_destroy(&queue->lock);
free(queue);
void rotation_thread_handle_destroy(
struct rotation_thread_handle *handle)
{
+ lttng_pipe_destroy(handle->quit_pipe);
free(handle);
}
handle->rotation_timer_queue = rotation_timer_queue;
handle->notification_thread_handle = notification_thread_handle;
+ handle->quit_pipe = lttng_pipe_open(FD_CLOEXEC);
+ if (!handle->quit_pipe) {
+ goto error;
+ }
end:
return handle;
+error:
+ rotation_thread_handle_destroy(handle);
+ return NULL;
}
/*
int ret;
/*
- * Create pollset with size 2:
- * - quit pipe,
+ * Create pollset with size 3:
+ * - rotation thread quit pipe,
* - rotation thread timer queue pipe,
+ * - notification channel sock,
*/
- ret = sessiond_set_thread_pollset(poll_set, 2);
- if (ret) {
+ ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
+ if (ret < 0) {
goto error;
}
+
+ ret = lttng_poll_add(poll_set,
+ lttng_pipe_get_readfd(handle->quit_pipe),
+ LPOLLIN | LPOLLERR);
+ if (ret < 0) {
+ ERR("[rotation-thread] Failed to add quit pipe read fd to poll set");
+ goto error;
+ }
+
ret = lttng_poll_add(poll_set,
lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe),
LPOLLIN | LPOLLERR);
if (ret < 0) {
- ERR("[rotation-thread] Failed to add rotate_pending fd to pollset");
+ ERR("[rotation-thread] Failed to add rotate_pending fd to poll set");
goto error;
}
int ret;
struct rotation_thread_handle *handle = data;
struct rotation_thread thread;
+ const int queue_pipe_fd = lttng_pipe_get_readfd(
+ handle->rotation_timer_queue->event_pipe);
DBG("[rotation-thread] Started rotation thread");
}
} else {
/* Job queue or quit pipe activity. */
- if (fd == lttng_pipe_get_readfd(
- handle->rotation_timer_queue->event_pipe)) {
- char buf;
-
- ret = lttng_read(fd, &buf, 1);
- if (ret != 1) {
- ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
- ret = -1;
- goto error;
- }
- }
/*
* The job queue is serviced if there is
goto error;
}
- if (sessiond_check_thread_quit_pipe(fd, revents)) {
+ if (fd == queue_pipe_fd) {
+ char buf;
+
+ ret = lttng_read(fd, &buf, 1);
+ if (ret != 1) {
+ ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
+ ret = -1;
+ goto error;
+ }
+ } else {
DBG("[rotation-thread] Quit pipe activity");
goto exit;
}
end:
return NULL;
}
+
+static
+bool shutdown_rotation_thread(void *thread_data)
+{
+ struct rotation_thread_handle *handle = thread_data;
+ const int write_fd = lttng_pipe_get_writefd(handle->quit_pipe);
+
+ return notify_thread_pipe(write_fd) == 1;
+}
+
+bool launch_rotation_thread(struct rotation_thread_handle *handle)
+{
+ struct lttng_thread *thread;
+
+ thread = lttng_thread_create("Rotation",
+ thread_rotation,
+ shutdown_rotation_thread,
+ NULL,
+ handle);
+ if (!thread) {
+ goto error;
+ }
+ lttng_thread_put(thread);
+ return true;
+error:
+ return false;
+}