Fix: code refactoring of viewer streams in relayd
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 4b059815d56dea0ac60b3a615be026db4041f9af..166e473e998ecd4d135afcb91a3b6592b73ca8bc 100644 (file)
@@ -59,6 +59,8 @@
 #include "lttng-relayd.h"
 #include "live.h"
 #include "health-relayd.h"
+#include "testpoint.h"
+#include "viewer-stream.h"
 
 /* command line options */
 char *opt_output_path;
@@ -85,7 +87,7 @@ const char *tracing_group_name = DEFAULT_TRACING_GROUP;
  * Quit pipe for all threads. This permits a single cancellation point
  * for all threads when receiving an event on the pipe.
  */
-static int thread_quit_pipe[2] = { -1, -1 };
+int thread_quit_pipe[2] = { -1, -1 };
 
 /*
  * This pipe is used to inform the worker thread that a command is queued and
@@ -165,12 +167,13 @@ int parse_args(int argc, char **argv)
                { "help", 0, 0, 'h', },
                { "output", 1, 0, 'o', },
                { "verbose", 0, 0, 'v', },
+               { "background", 0, 0, 'b' },
                { NULL, 0, 0, 0, },
        };
 
        while (1) {
                int option_index = 0;
-               c = getopt_long(argc, argv, "dhv" "C:D:L:o:g:",
+               c = getopt_long(argc, argv, "dhv" "C:D:L:o:g:b",
                                long_options, &option_index);
                if (c == -1) {
                        break;
@@ -216,6 +219,9 @@ int parse_args(int argc, char **argv)
                case 'd':
                        opt_daemon = 1;
                        break;
+               case 'b':
+                       opt_background = 1;
+                       break;
                case 'g':
                        tracing_group_name = optarg;
                        break;
@@ -613,6 +619,10 @@ void *relay_thread_listener(void *data)
 
        lttng_relay_notify_ready();
 
+       if (testpoint(relayd_thread_listener)) {
+               goto error_testpoint;
+       }
+
        while (1) {
                health_code_update();
 
@@ -713,6 +723,7 @@ restart:
 exit:
 error:
 error_poll_add:
+error_testpoint:
        lttng_poll_clean(&events);
 error_create_poll:
        if (data_sock->fd >= 0) {
@@ -756,6 +767,10 @@ void *relay_thread_dispatcher(void *data)
 
        health_register(health_relayd, HEALTH_RELAYD_TYPE_DISPATCHER);
 
+       if (testpoint(relayd_thread_dispatcher)) {
+               goto error_testpoint;
+       }
+
        health_code_update();
 
        while (!CMM_LOAD_SHARED(dispatch_thread_exit)) {
@@ -802,6 +817,7 @@ void *relay_thread_dispatcher(void *data)
        err = 0;
 
 error:
+error_testpoint:
        if (err) {
                health_error();
                ERR("Health error occurred in %s", __func__);
@@ -883,7 +899,7 @@ static void destroy_stream(struct relay_stream *stream)
                }
        }
 
-       vstream = live_find_viewer_stream_by_id(stream->stream_handle);
+       vstream = viewer_stream_find_by_id(stream->stream_handle);
        if (vstream) {
                /*
                 * Set the last good value into the viewer stream. This is done
@@ -1009,11 +1025,16 @@ int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
        session->sock = cmd->sock;
        session->minor = cmd->minor;
        session->major = cmd->major;
+       pthread_mutex_init(&session->viewer_ready_lock, NULL);
        cmd->session = session;
 
        reply.session_id = htobe64(session->id);
 
        switch (cmd->minor) {
+               case 1:
+               case 2:
+               case 3:
+                       break;
                case 4: /* LTTng sessiond 2.4 */
                default:
                        ret = cmd_create_session_2_4(cmd, session);
@@ -1052,6 +1073,8 @@ void set_viewer_ready_flag(struct relay_command *cmd)
 {
        struct relay_stream_recv_handle *node, *tmp_node;
 
+       pthread_mutex_lock(&cmd->session->viewer_ready_lock);
+
        cds_list_for_each_entry_safe(node, tmp_node, &cmd->recv_head, node) {
                struct relay_stream *stream;
 
@@ -1074,6 +1097,7 @@ void set_viewer_ready_flag(struct relay_command *cmd)
                free(node);
        }
 
+       pthread_mutex_unlock(&cmd->session->viewer_ready_lock);
        return;
 }
 
@@ -1195,11 +1219,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
         * stream message is received, this list is emptied and streams are set
         * with the viewer ready flag.
         */
-       if (stream->metadata_flag) {
-               stream->viewer_ready = 1;
-       } else {
-               queue_stream_handle(stream->stream_handle, cmd);
-       }
+       queue_stream_handle(stream->stream_handle, cmd);
 
        lttng_ht_node_init_ulong(&stream->stream_n,
                        (unsigned long) stream->stream_handle);
@@ -1977,6 +1997,11 @@ int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
         */
        set_viewer_ready_flag(cmd);
 
+       /*
+        * Inform the viewer that there are new streams in the session.
+        */
+       uatomic_set(&cmd->session->new_streams, 1);
+
        reply.ret_code = htobe32(LTTNG_OK);
        send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
        if (send_ret < 0) {
@@ -2219,7 +2244,7 @@ int relay_process_data(struct relay_command *cmd)
                                (stream->oldest_tracefile_id + 1) %
                                stream->tracefile_count;
                }
-               vstream = live_find_viewer_stream_by_id(stream->stream_handle);
+               vstream = viewer_stream_find_by_id(stream->stream_handle);
                if (vstream) {
                        /*
                         * The viewer is reading a file about to be
@@ -2420,6 +2445,10 @@ void *relay_thread_worker(void *data)
 
        health_register(health_relayd, HEALTH_RELAYD_TYPE_WORKER);
 
+       if (testpoint(relayd_thread_worker)) {
+               goto error_testpoint;
+       }
+
        health_code_update();
 
        /* table of connections indexed on socket */
@@ -2681,6 +2710,7 @@ relay_connections_ht_error:
        }
        DBG("Worker thread cleanup complete");
        free(data_buffer);
+error_testpoint:
        if (err) {
                health_error();
                ERR("Health error occurred in %s", __func__);
@@ -2859,7 +2889,7 @@ int main(int argc, char **argv)
                goto exit_listener;
        }
 
-       ret = live_start_threads(live_uri, relay_ctx, thread_quit_pipe);
+       ret = live_start_threads(live_uri, relay_ctx);
        if (ret != 0) {
                ERR("Starting live viewer threads");
                goto exit_live;
This page took 0.025535 seconds and 4 git commands to generate.