X-Git-Url: http://git.lttng.org./?a=blobdiff_plain;f=libtracectl%2Ftracectl.c;h=2101d5c4c3725e8de547fbf815237cbff988a3b3;hb=e70acc91ae21023149204a245638c1d2dffa7265;hp=d041e6bb0ce3caef7ac72bea86fe677c8349df60;hpb=3847c3bab100bfb6b01b5654c2429a5d0d162ff5;p=lttng-ust.git diff --git a/libtracectl/tracectl.c b/libtracectl/tracectl.c index d041e6bb..2101d5c4 100644 --- a/libtracectl/tracectl.c +++ b/libtracectl/tracectl.c @@ -6,18 +6,16 @@ #include #include #include +#include #include "marker.h" #include "tracer.h" #include "localerr.h" #include "ustcomm.h" +#include "relay.h" /* FIXME: remove */ -#define USE_CLONE +//#define USE_CLONE -#define UNIX_PATH_MAX 108 - -#define SOCKETDIR "/tmp/socks" -#define SOCKETDIRLEN sizeof(SOCKETDIR) #define USTSIGNAL SIGIO #define MAX_MSG_SIZE (100) @@ -26,6 +24,8 @@ char consumer_stack[10000]; +struct list_head blocked_consumers = LIST_HEAD_INIT(blocked_consumers); + static struct ustcomm_app ustcomm_app; struct tracecmd { /* no padding */ @@ -47,135 +47,26 @@ struct trctl_msg { char payload[94]; }; -char mysocketfile[UNIX_PATH_MAX] = ""; -//int pfd = -1; - struct consumer_channel { int fd; struct ltt_channel_struct *chan; }; -int consumer(void *arg) -{ - int result; - int fd; - char str[] = "Hello, this is the consumer.\n"; - struct ltt_trace_struct *trace; - struct consumer_channel *consumer_channels; - int i; - char trace_name[] = "auto"; - - ltt_lock_traces(); - trace = _ltt_trace_find(trace_name); - ltt_unlock_traces(); - - if(trace == NULL) { - CPRINTF("cannot find trace!"); - return 1; - } +struct blocked_consumer { + int fd_consumer; + int fd_producer; + int tmp_poll_idx; - consumer_channels = (struct consumer_channel *) malloc(trace->nr_channels * sizeof(struct consumer_channel)); - if(consumer_channels == NULL) { - ERR("malloc returned NULL"); - return 1; - } - - CPRINTF("opening trace files"); - for(i=0; inr_channels; i++) { - char tmp[100]; - struct ltt_channel_struct *chan = &trace->channels[i]; + /* args to ustcomm_send_reply */ + struct ustcomm_server server; + struct ustcomm_source src; - consumer_channels[i].chan = chan; - - snprintf(tmp, sizeof(tmp), "trace/%s_0", chan->channel_name); - result = consumer_channels[i].fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00644); - if(result == -1) { - perror("open"); - return -1; - } - CPRINTF("\topened trace file %s", tmp); - - } - CPRINTF("done opening trace files"); - - for(;;) { - /*wait*/ - - for(i=0; inr_channels; i++) { - struct rchan *rchan = consumer_channels[i].chan->trans_channel_data; - struct rchan_buf *rbuf = rchan->buf; - struct ltt_channel_buf_struct *lttbuf = consumer_channels[i].chan->buf; - long consumed_old; - - result = ltt_do_get_subbuf(rbuf, lttbuf, &consumed_old); - if(result < 0) { - DBG("ltt_do_get_subbuf: error: %s", strerror(-result)); - } - else { - DBG("success!"); - - result = write(consumer_channels[i].fd, rbuf->buf_data + (consumed_old & (2 * 4096-1)), 4096); - ltt_do_put_subbuf(rbuf, lttbuf, consumed_old); - } - } - - sleep(1); - } - -// CPRINTF("consumer: got a trace: %s with %d channels\n", trace_name, trace->nr_channels); -// -// struct ltt_channel_struct *chan = &trace->channels[0]; -// -// CPRINTF("channel 1 (%s) active=%u", chan->channel_name, chan->active & 1); + /* args to ltt_do_get_subbuf */ + struct rchan_buf *rbuf; + struct ltt_channel_buf_struct *lttbuf; -// struct rchan *rchan = chan->trans_channel_data; -// struct rchan_buf *rbuf = rchan->buf; -// struct ltt_channel_buf_struct *lttbuf = chan->buf; -// long consumed_old; -// -// result = fd = open("trace.out", O_WRONLY | O_CREAT | O_TRUNC, 00644); -// if(result == -1) { -// perror("open"); -// return -1; -// } - -// for(;;) { -// write(STDOUT_FILENO, str, sizeof(str)); -// -// result = ltt_do_get_subbuf(rbuf, lttbuf, &consumed_old); -// if(result < 0) { -// CPRINTF("ltt_do_get_subbuf: error: %s", strerror(-result)); -// } -// else { -// CPRINTF("success!"); -// -// result = write(fd, rbuf->buf_data + (consumed_old & (2 * 4096-1)), 4096); -// ltt_do_put_subbuf(rbuf, lttbuf, consumed_old); -// } -// -// //CPRINTF("There seems to be %ld bytes available", SUBBUF_TRUNC(local_read(<tbuf->offset), rbuf->chan) - consumed_old); -// CPRINTF("Commit count %ld", local_read(<tbuf->commit_count[0])); -// -// -// sleep(1); -// } -} - -void start_consumer(void) -{ -#ifdef USE_CLONE - int result; - - result = clone(consumer, consumer_stack+sizeof(consumer_stack)-1, CLONE_FS | CLONE_FILES | CLONE_VM | CLONE_SIGHAND | CLONE_THREAD, NULL); - if(result == -1) { - perror("clone"); - } -#else - pthread_t thread; - - pthread_create(&thread, NULL, consumer, NULL); -#endif -} + struct list_head list; +}; static void print_markers(void) { @@ -222,14 +113,93 @@ void notif_cb(void) } } -#define CONSUMER_DAEMON_SOCK SOCKETDIR "/ustd" - static int inform_consumer_daemon(void) { ustcomm_request_consumer(getpid(), "metadata"); ustcomm_request_consumer(getpid(), "ust"); } +void process_blocked_consumers(void) +{ + int n_fds = 0; + struct pollfd *fds; + struct blocked_consumer *bc; + int idx = 0; + char inbuf; + int result; + + list_for_each_entry(bc, &blocked_consumers, list) { + n_fds++; + } + + fds = (struct pollfd *) malloc(n_fds * sizeof(struct pollfd)); + if(fds == NULL) { + ERR("malloc returned NULL"); + return; + } + + list_for_each_entry(bc, &blocked_consumers, list) { + fds[idx].fd = bc->fd_producer; + fds[idx].events = POLLIN; + bc->tmp_poll_idx = idx; + idx++; + } + + result = poll(fds, n_fds, 0); + if(result == -1) { + PERROR("poll"); + return -1; + } + + list_for_each_entry(bc, &blocked_consumers, list) { + if(fds[bc->tmp_poll_idx].revents) { + long consumed_old = 0; + char *reply; + + result = read(bc->fd_producer, &inbuf, 1); + if(result == -1) { + PERROR("read"); + continue; + } + if(result == 0) { + DBG("PRODUCER END"); + + close(bc->fd_producer); + + __list_del(bc->list.prev, bc->list.next); + + result = ustcomm_send_reply(&bc->server, "END", &bc->src); + if(result < 0) { + ERR("ustcomm_send_reply failed"); + continue; + } + + continue; + } + + result = ltt_do_get_subbuf(bc->rbuf, bc->lttbuf, &consumed_old); + if(result == -EAGAIN) { + WARN("missed buffer?"); + continue; + } + else if(result < 0) { + DBG("ltt_do_get_subbuf: error: %s", strerror(-result)); + } + asprintf(&reply, "%s %ld", "OK", consumed_old); + result = ustcomm_send_reply(&bc->server, reply, &bc->src); + if(result < 0) { + ERR("ustcomm_send_reply failed"); + free(reply); + continue; + } + free(reply); + + __list_del(bc->list.prev, bc->list.next); + } + } + +} + int listener_main(void *p) { int result; @@ -244,19 +214,22 @@ int listener_main(void *p) char trace_type[] = "ustrelay"; char *recvbuf; int len; + struct ustcomm_source src; - result = ustcomm_app_recv_message(&ustcomm_app, &recvbuf); - DBG("HERE"); - if(result) { + process_blocked_consumers(); + + result = ustcomm_app_recv_message(&ustcomm_app, &recvbuf, &src, 5); + if(result < 0) { WARN("error in ustcomm_app_recv_message"); continue; } + else if(result == 0) { + /* no message */ + continue; + } DBG("received a message! it's: %s\n", recvbuf); len = strlen(recvbuf); - //if(len && recvbuf[len-1] == '\n') { - // recvbuf[len-1] = '\0'; - //} if(!strcmp(recvbuf, "print_markers")) { print_markers(); @@ -313,13 +286,20 @@ int listener_main(void *p) return; } } - else if(!strncmp(recvbuf, "get_shmid ", 10)) { + else if(nth_token_is(recvbuf, "get_shmid", 0) == 1) { struct ltt_trace_struct *trace; char trace_name[] = "auto"; int i; + char *channel_name; DBG("get_shmid"); + channel_name = nth_token(recvbuf, 1); + if(channel_name == NULL) { + ERR("get_shmid: cannot parse channel"); + goto next_cmd; + } + ltt_lock_traces(); trace = _ltt_trace_find(trace_name); ltt_unlock_traces(); @@ -332,12 +312,299 @@ int listener_main(void *p) for(i=0; inr_channels; i++) { struct rchan *rchan = trace->channels[i].trans_channel_data; struct rchan_buf *rbuf = rchan->buf; + struct ltt_channel_struct *ltt_channel = (struct ltt_channel_struct *)rchan->private_data; + struct ltt_channel_buf_struct *ltt_buf = ltt_channel->buf; + + if(!strcmp(trace->channels[i].channel_name, channel_name)) { + char *reply; + + DBG("the shmid for the requested channel is %d", rbuf->shmid); + DBG("the shmid for its buffer structure is %d", ltt_channel->buf_shmid); + asprintf(&reply, "%d %d", rbuf->shmid, ltt_channel->buf_shmid); + + result = ustcomm_send_reply(&ustcomm_app.server, reply, &src); + if(result) { + ERR("listener: get_shmid: ustcomm_send_reply failed"); + goto next_cmd; + } + + free(reply); + + break; + } + } + } + else if(nth_token_is(recvbuf, "get_n_subbufs", 0) == 1) { + struct ltt_trace_struct *trace; + char trace_name[] = "auto"; + int i; + char *channel_name; + + DBG("get_n_subbufs"); + + channel_name = nth_token(recvbuf, 1); + if(channel_name == NULL) { + ERR("get_n_subbufs: cannot parse channel"); + goto next_cmd; + } - DBG("the shmid is %d", rbuf->shmid); + ltt_lock_traces(); + trace = _ltt_trace_find(trace_name); + ltt_unlock_traces(); + + if(trace == NULL) { + CPRINTF("cannot find trace!"); + return 1; + } + + for(i=0; inr_channels; i++) { + struct rchan *rchan = trace->channels[i].trans_channel_data; + + if(!strcmp(trace->channels[i].channel_name, channel_name)) { + char *reply; + + DBG("the n_subbufs for the requested channel is %d", rchan->n_subbufs); + asprintf(&reply, "%d", rchan->n_subbufs); + + result = ustcomm_send_reply(&ustcomm_app.server, reply, &src); + if(result) { + ERR("listener: get_n_subbufs: ustcomm_send_reply failed"); + goto next_cmd; + } + + free(reply); + + break; + } + } + } + else if(nth_token_is(recvbuf, "get_subbuf_size", 0) == 1) { + struct ltt_trace_struct *trace; + char trace_name[] = "auto"; + int i; + char *channel_name; + + DBG("get_subbuf_size"); + + channel_name = nth_token(recvbuf, 1); + if(channel_name == NULL) { + ERR("get_subbuf_size: cannot parse channel"); + goto next_cmd; + } + + ltt_lock_traces(); + trace = _ltt_trace_find(trace_name); + ltt_unlock_traces(); + + if(trace == NULL) { + CPRINTF("cannot find trace!"); + return 1; + } + + for(i=0; inr_channels; i++) { + struct rchan *rchan = trace->channels[i].trans_channel_data; + + if(!strcmp(trace->channels[i].channel_name, channel_name)) { + char *reply; + + DBG("the subbuf_size for the requested channel is %d", rchan->subbuf_size); + asprintf(&reply, "%d", rchan->subbuf_size); + + result = ustcomm_send_reply(&ustcomm_app.server, reply, &src); + if(result) { + ERR("listener: get_subbuf_size: ustcomm_send_reply failed"); + goto next_cmd; + } + + free(reply); + + break; + } + } + } + else if(nth_token_is(recvbuf, "load_probe_lib", 0) == 1) { + char *libfile; + + libfile = nth_token(recvbuf, 1); + + DBG("load_probe_lib loading %s", libfile); + } + else if(nth_token_is(recvbuf, "get_subbuffer", 0) == 1) { + struct ltt_trace_struct *trace; + char trace_name[] = "auto"; + int i; + char *channel_name; + + DBG("get_subbuf"); + + channel_name = nth_token(recvbuf, 1); + if(channel_name == NULL) { + ERR("get_subbuf: cannot parse channel"); + goto next_cmd; + } + + ltt_lock_traces(); + trace = _ltt_trace_find(trace_name); + ltt_unlock_traces(); + + if(trace == NULL) { + CPRINTF("cannot find trace!"); + return 1; + } + for(i=0; inr_channels; i++) { + struct rchan *rchan = trace->channels[i].trans_channel_data; + + if(!strcmp(trace->channels[i].channel_name, channel_name)) { + struct rchan_buf *rbuf = rchan->buf; + struct ltt_channel_buf_struct *lttbuf = trace->channels[i].buf; + char *reply; + long consumed_old=0; + int fd; + struct blocked_consumer *bc; + + bc = (struct blocked_consumer *) malloc(sizeof(struct blocked_consumer)); + if(bc == NULL) { + ERR("malloc returned NULL"); + goto next_cmd; + } + bc->fd_consumer = src.fd; + bc->fd_producer = lttbuf->data_ready_fd_read; + bc->rbuf = rbuf; + bc->lttbuf = lttbuf; + bc->src = src; + bc->server = ustcomm_app.server; + + list_add(&bc->list, &blocked_consumers); + + break; + } } } + else if(nth_token_is(recvbuf, "put_subbuffer", 0) == 1) { + struct ltt_trace_struct *trace; + char trace_name[] = "auto"; + int i; + char *channel_name; + long consumed_old; + char *consumed_old_str; + char *endptr; + + DBG("put_subbuf"); + + channel_name = strdup_malloc(nth_token(recvbuf, 1)); + if(channel_name == NULL) { + ERR("put_subbuf_size: cannot parse channel"); + goto next_cmd; + } + + consumed_old_str = strdup_malloc(nth_token(recvbuf, 2)); + if(consumed_old_str == NULL) { + ERR("put_subbuf: cannot parse consumed_old"); + goto next_cmd; + } + consumed_old = strtol(consumed_old_str, &endptr, 10); + if(*endptr != '\0') { + ERR("put_subbuf: invalid value for consumed_old"); + goto next_cmd; + } + + ltt_lock_traces(); + trace = _ltt_trace_find(trace_name); + ltt_unlock_traces(); + + if(trace == NULL) { + CPRINTF("cannot find trace!"); + return 1; + } + + for(i=0; inr_channels; i++) { + struct rchan *rchan = trace->channels[i].trans_channel_data; + if(!strcmp(trace->channels[i].channel_name, channel_name)) { + struct rchan_buf *rbuf = rchan->buf; + struct ltt_channel_buf_struct *lttbuf = trace->channels[i].buf; + char *reply; + long consumed_old=0; + + result = ltt_do_put_subbuf(rbuf, lttbuf, consumed_old); + if(result < 0) { + WARN("ltt_do_put_subbuf: error"); + } + else { + DBG("ltt_do_put_subbuf: success"); + } + asprintf(&reply, "%s", "OK", consumed_old); + + result = ustcomm_send_reply(&ustcomm_app.server, reply, &src); + if(result) { + ERR("listener: put_subbuf: ustcomm_send_reply failed"); + goto next_cmd; + } + + free(reply); + + break; + } + } + + free(channel_name); + free(consumed_old_str); + } +// else if(nth_token_is(recvbuf, "get_notifications", 0) == 1) { +// struct ltt_trace_struct *trace; +// char trace_name[] = "auto"; +// int i; +// char *channel_name; +// +// DBG("get_notifications"); +// +// channel_name = strdup_malloc(nth_token(recvbuf, 1)); +// if(channel_name == NULL) { +// ERR("put_subbuf_size: cannot parse channel"); +// goto next_cmd; +// } +// +// ltt_lock_traces(); +// trace = _ltt_trace_find(trace_name); +// ltt_unlock_traces(); +// +// if(trace == NULL) { +// CPRINTF("cannot find trace!"); +// return 1; +// } +// +// for(i=0; inr_channels; i++) { +// struct rchan *rchan = trace->channels[i].trans_channel_data; +// int fd; +// +// if(!strcmp(trace->channels[i].channel_name, channel_name)) { +// struct rchan_buf *rbuf = rchan->buf; +// struct ltt_channel_buf_struct *lttbuf = trace->channels[i].buf; +// +// result = fd = ustcomm_app_detach_client(&ustcomm_app, &src); +// if(result == -1) { +// ERR("ustcomm_app_detach_client failed"); +// goto next_cmd; +// } +// +// lttbuf->wake_consumer_arg = (void *) fd; +// +// smp_wmb(); +// +// lttbuf->call_wake_consumer = 1; +// +// break; +// } +// } +// +// free(channel_name); +// } + else { + ERR("unable to parse message: %s", recvbuf); + } + + next_cmd: free(recvbuf); } } @@ -390,15 +657,15 @@ static int init_socket(void) static void destroy_socket(void) { - int result; - - if(mysocketfile[0] == '\0') - return; - - result = unlink(mysocketfile); - if(result == -1) { - PERROR("unlink"); - } +// int result; +// +// if(mysocketfile[0] == '\0') +// return; +// +// result = unlink(mysocketfile); +// if(result == -1) { +// PERROR("unlink"); +// } } static int init_signal_handler(void) @@ -546,7 +813,8 @@ static void __attribute__((destructor)) fini() } /* FIXME: wait for the consumer to be done */ - sleep(1); + //DBG("waiting 5 sec for consume"); + //sleep(5); destroy_socket(); }