From: Pierre-Marc Fournier Date: Tue, 3 Mar 2009 17:12:02 +0000 (-0500) Subject: ust: make lttd work X-Git-Tag: v1.9.1~1040 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=3a7b90de71f2a82f73f06fb14a7b77805aea1064;p=lttng-ust.git ust: make lttd work - for now, trace must be stopped and destroyed prior to program end --- diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..b3c70aa9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +*.so +*.swp +*.o +*.swo diff --git a/libtracectl/tracectl.c b/libtracectl/tracectl.c index 3732bc03..917937f2 100644 --- a/libtracectl/tracectl.c +++ b/libtracectl/tracectl.c @@ -6,6 +6,7 @@ #include #include #include +#include #include "marker.h" #include "tracer.h" @@ -23,6 +24,8 @@ char consumer_stack[10000]; +struct list_head blocked_consumers = LIST_HEAD_INIT(blocked_consumers); + static struct ustcomm_app ustcomm_app; struct tracecmd { /* no padding */ @@ -49,6 +52,22 @@ struct consumer_channel { struct ltt_channel_struct *chan; }; +struct blocked_consumer { + int fd_consumer; + int fd_producer; + int tmp_poll_idx; + + /* args to ustcomm_send_reply */ + struct ustcomm_server server; + struct ustcomm_source src; + + /* args to ltt_do_get_subbuf */ + struct rchan_buf *rbuf; + struct ltt_channel_buf_struct *lttbuf; + + struct list_head list; +}; + int consumer(void *arg) { int result; @@ -184,6 +203,87 @@ static int inform_consumer_daemon(void) ustcomm_request_consumer(getpid(), "ust"); } +void process_blocked_consumers(void) +{ + int n_fds = 0; + struct pollfd *fds; + struct blocked_consumer *bc; + int idx = 0; + char inbuf; + int result; + + list_for_each_entry(bc, &blocked_consumers, list) { + n_fds++; + } + + fds = (struct pollfd *) malloc(n_fds * sizeof(struct pollfd)); + if(fds == NULL) { + ERR("malloc returned NULL"); + return; + } + + list_for_each_entry(bc, &blocked_consumers, list) { + fds[idx].fd = bc->fd_producer; + fds[idx].events = POLLIN; + bc->tmp_poll_idx = idx; + idx++; + } + + result = poll(fds, n_fds, 0); + if(result == -1) { + PERROR("poll"); + return -1; + } + + list_for_each_entry(bc, &blocked_consumers, list) { + if(fds[bc->tmp_poll_idx].revents) { + long consumed_old = 0; + char *reply; + + result = read(bc->fd_producer, &inbuf, 1); + if(result == -1) { + PERROR("read"); + continue; + } + if(result == 0) { + DBG("PRODUCER END"); + + close(bc->fd_producer); + + __list_del(bc->list.prev, bc->list.next); + + result = ustcomm_send_reply(&bc->server, "END", &bc->src); + if(result < 0) { + ERR("ustcomm_send_reply failed"); + continue; + } + + continue; + } + + result = ltt_do_get_subbuf(bc->rbuf, bc->lttbuf, &consumed_old); + if(result == -EAGAIN) { + WARN("missed buffer?"); + continue; + } + else if(result < 0) { + DBG("ltt_do_get_subbuf: error: %s", strerror(-result)); + } + asprintf(&reply, "%s %ld", "OK", consumed_old); + result = ustcomm_send_reply(&bc->server, reply, &bc->src); + if(result < 0) { + ERR("ustcomm_send_reply failed"); + free(reply); + continue; + } + free(reply); + + __list_del(bc->list.prev, bc->list.next); + } + } + +} + int listener_main(void *p) { int result; @@ -200,11 +300,17 @@ int listener_main(void *p) int len; struct ustcomm_source src; - result = ustcomm_app_recv_message(&ustcomm_app, &recvbuf, &src, -1); - if(result <= 0) { + process_blocked_consumers(); + + result = ustcomm_app_recv_message(&ustcomm_app, &recvbuf, &src, 5); + if(result < 0) { WARN("error in ustcomm_app_recv_message"); continue; } + else if(result == 0) { + /* no message */ + continue; + } DBG("received a message! it's: %s\n", recvbuf); len = strlen(recvbuf); @@ -435,24 +541,22 @@ int listener_main(void *p) struct ltt_channel_buf_struct *lttbuf = trace->channels[i].buf; char *reply; long consumed_old=0; + int fd; + struct blocked_consumer *bc; - result = ltt_do_get_subbuf(rbuf, lttbuf, &consumed_old); - if(result < 0) { - DBG("ltt_do_get_subbuf: error: %s", strerror(-result)); - asprintf(&reply, "%s %ld", "UNAVAIL", 0); - } - else { - DBG("ltt_do_get_subbuf: success"); - asprintf(&reply, "%s %ld", "OK", consumed_old); - } - - result = ustcomm_send_reply(&ustcomm_app.server, reply, &src); - if(result) { - ERR("listener: get_subbuf: ustcomm_send_reply failed"); + bc = (struct blocked_consumer *) malloc(sizeof(struct blocked_consumer)); + if(bc == NULL) { + ERR("malloc returned NULL"); goto next_cmd; } + bc->fd_consumer = src.fd; + bc->fd_producer = lttbuf->data_ready_fd_read; + bc->rbuf = rbuf; + bc->lttbuf = lttbuf; + bc->src = src; + bc->server = ustcomm_app.server; - free(reply); + list_add(&bc->list, &blocked_consumers); break; } @@ -528,55 +632,55 @@ int listener_main(void *p) free(channel_name); free(consumed_old_str); } - else if(nth_token_is(recvbuf, "get_notifications", 0) == 1) { - struct ltt_trace_struct *trace; - char trace_name[] = "auto"; - int i; - char *channel_name; - - DBG("get_notifications"); - - channel_name = strdup_malloc(nth_token(recvbuf, 1)); - if(channel_name == NULL) { - ERR("put_subbuf_size: cannot parse channel"); - goto next_cmd; - } - - ltt_lock_traces(); - trace = _ltt_trace_find(trace_name); - ltt_unlock_traces(); - - if(trace == NULL) { - CPRINTF("cannot find trace!"); - return 1; - } - - for(i=0; inr_channels; i++) { - struct rchan *rchan = trace->channels[i].trans_channel_data; - int fd; - - if(!strcmp(trace->channels[i].channel_name, channel_name)) { - struct rchan_buf *rbuf = rchan->buf; - struct ltt_channel_buf_struct *lttbuf = trace->channels[i].buf; - - result = fd = ustcomm_app_detach_client(&ustcomm_app, &src); - if(result == -1) { - ERR("ustcomm_app_detach_client failed"); - goto next_cmd; - } - - lttbuf->wake_consumer_arg = (void *) fd; - - smp_wmb(); - - lttbuf->call_wake_consumer = 1; - - break; - } - } - - free(channel_name); - } +// else if(nth_token_is(recvbuf, "get_notifications", 0) == 1) { +// struct ltt_trace_struct *trace; +// char trace_name[] = "auto"; +// int i; +// char *channel_name; +// +// DBG("get_notifications"); +// +// channel_name = strdup_malloc(nth_token(recvbuf, 1)); +// if(channel_name == NULL) { +// ERR("put_subbuf_size: cannot parse channel"); +// goto next_cmd; +// } +// +// ltt_lock_traces(); +// trace = _ltt_trace_find(trace_name); +// ltt_unlock_traces(); +// +// if(trace == NULL) { +// CPRINTF("cannot find trace!"); +// return 1; +// } +// +// for(i=0; inr_channels; i++) { +// struct rchan *rchan = trace->channels[i].trans_channel_data; +// int fd; +// +// if(!strcmp(trace->channels[i].channel_name, channel_name)) { +// struct rchan_buf *rbuf = rchan->buf; +// struct ltt_channel_buf_struct *lttbuf = trace->channels[i].buf; +// +// result = fd = ustcomm_app_detach_client(&ustcomm_app, &src); +// if(result == -1) { +// ERR("ustcomm_app_detach_client failed"); +// goto next_cmd; +// } +// +// lttbuf->wake_consumer_arg = (void *) fd; +// +// smp_wmb(); +// +// lttbuf->call_wake_consumer = 1; +// +// break; +// } +// } +// +// free(channel_name); +// } else { ERR("unable to parse message: %s", recvbuf); } @@ -687,39 +791,12 @@ static void auto_probe_connect(struct marker *m) DBG("just auto connected marker %s %s to probe default", m->channel, m->name); } -/* Wake the consumer of a buffer - * - * wake_consumer_cb is called in tracing context so it must haste. - * - * FIXME: don't do a system call here; maybe schedule work to be done - * in listener context? Once this is done in listener context, we can - * check for the return value of send_message_fd and remove the fd if necessary - * - * @arg: the buffer - * @finished: 0: subbuffer full; 1: buffer being destroyed - */ - -static void wake_consumer_cb(void *arg, int finished) -{ - struct ltt_channel_buf_struct *ltt_buf = (struct ltt_channel_buf_struct *) arg; - int fd = (int)ACCESS_ONCE(arg); - - if(!finished) { - send_message_fd(fd, "consume", NULL); - } - else { - send_message_fd(fd, "destroyed", NULL); - } -} - static void __attribute__((constructor(101))) init0() { DBG("UST_AUTOPROBE constructor"); if(getenv("UST_AUTOPROBE")) { marker_set_new_marker_cb(auto_probe_connect); } - - relay_set_wake_consumer(wake_consumer_cb); } static void fini(void); diff --git a/libtracing/relay.c b/libtracing/relay.c index 0133f3e3..88bdfefc 100644 --- a/libtracing/relay.c +++ b/libtracing/relay.c @@ -892,9 +892,13 @@ static notrace void ltt_deliver(struct rchan_buf *buf, unsigned int subbuf_idx, struct ltt_channel_struct *channel = (struct ltt_channel_struct *)buf->chan->private_data; struct ltt_channel_buf_struct *ltt_buf = channel->buf; + int result; - if(ltt_buf->call_wake_consumer) - relay_wake_consumer(ACCESS_ONCE(ltt_buf->wake_consumer_arg), 0); + result = write(ltt_buf->data_ready_fd_write, "1", 1); + if(result == -1) { + PERROR("write (in ltt_relay_buffer_flush)"); + ERR("this should never happen!"); + } //ust// atomic_set(<t_buf->wakeup_readers, 1); } @@ -1456,6 +1460,8 @@ static int ltt_relay_create_buffer(struct ltt_trace_struct *trace, { struct ltt_channel_buf_struct *ltt_buf = ltt_chan->buf; unsigned int j; + int fds[2]; + int result; ltt_buf->commit_count = zmalloc(sizeof(ltt_buf->commit_count) * n_subbufs); @@ -1480,8 +1486,13 @@ static int ltt_relay_create_buffer(struct ltt_trace_struct *trace, local_set(<t_buf->events_lost, 0); local_set(<t_buf->corrupted_subbuffers, 0); - ltt_buf->call_wake_consumer = 0; - ltt_buf->wake_consumer_arg = NULL; + result = pipe(fds); + if(result == -1) { + PERROR("pipe"); + return -1; + } + ltt_buf->data_ready_fd_read = fds[0]; + ltt_buf->data_ready_fd_write = fds[1]; return 0; } @@ -1593,11 +1604,16 @@ static notrace void ltt_relay_buffer_flush(struct rchan_buf *buf) struct ltt_channel_struct *channel = (struct ltt_channel_struct *)buf->chan->private_data; struct ltt_channel_buf_struct *ltt_buf = channel->buf; + int result; buf->finalized = 1; ltt_force_switch(buf, FORCE_FLUSH); - relay_wake_consumer(ltt_buf, 1); + result = write(ltt_buf->data_ready_fd_write, "1", 1); + if(result == -1) { + PERROR("write (in ltt_relay_buffer_flush)"); + ERR("this should never happen!"); + } } static void ltt_relay_async_wakeup_chan(struct ltt_channel_struct *ltt_channel) @@ -1619,11 +1635,20 @@ static void ltt_relay_async_wakeup_chan(struct ltt_channel_struct *ltt_channel) static void ltt_relay_finish_buffer(struct ltt_channel_struct *ltt_channel) { struct rchan *rchan = ltt_channel->trans_channel_data; + int result; if (rchan->buf) { struct ltt_channel_buf_struct *ltt_buf = ltt_channel->buf; ltt_relay_buffer_flush(rchan->buf); //ust// ltt_relay_wake_writers(ltt_buf); + /* closing the pipe tells the consumer the buffer is finished */ + + //result = write(ltt_buf->data_ready_fd_write, "D", 1); + //if(result == -1) { + // PERROR("write (in ltt_relay_finish_buffer)"); + // ERR("this should never happen!"); + //} + close(ltt_buf->data_ready_fd_write); } } diff --git a/libtracing/relay.h b/libtracing/relay.h index 6cfcb077..cd3baae3 100644 --- a/libtracing/relay.h +++ b/libtracing/relay.h @@ -345,10 +345,14 @@ struct ltt_channel_buf_struct { //ust// * writers //ust// */ //ust// atomic_t wakeup_readers; /* Boolean : wakeup readers waiting ? */ - /* whether or not wake_consumer must be called; must be accessed atomically */ - int call_wake_consumer; - /* the arg to pass to wake_consumer; must be accessed atomically */ - void *wake_consumer_arg; + /* one byte is written to this pipe when data is available, in order + to wake the consumer */ + /* portability: Single byte writes must be as quick as possible. The kernel-side + buffer must be large enough so the writer doesn't block. From the pipe(7) + man page: Since linux 2.6.11, the pipe capacity is 65536 bytes. */ + int data_ready_fd_write; + /* the reading end of the pipe */ + int data_ready_fd_read; } ____cacheline_aligned; int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, long *pconsumed_old); diff --git a/libustcomm/ustcomm.c b/libustcomm/ustcomm.c index 2b08b310..4fe46c53 100644 --- a/libustcomm/ustcomm.c +++ b/libustcomm/ustcomm.c @@ -201,7 +201,7 @@ int ustcomm_send_reply(struct ustcomm_server *server, char *msg, struct ustcomm_ int result; result = send_message_fd(src->fd, msg, NULL); - if(result) { + if(result < 0) { ERR("error in send_message_fd"); return -1; } diff --git a/libustcomm/ustcomm.h b/libustcomm/ustcomm.h index e661a551..0de4ad46 100644 --- a/libustcomm/ustcomm.h +++ b/libustcomm/ustcomm.h @@ -11,6 +11,7 @@ struct ustcomm_connection { int fd; }; +/* ustcomm_server must be shallow-copyable */ struct ustcomm_server { /* the "server" socket for serving the external requests */ int listen_fd; @@ -27,6 +28,7 @@ struct ustcomm_app { struct ustcomm_server server; }; +/* ustcomm_source must be shallow-copyable */ struct ustcomm_source { int fd; void *priv; diff --git a/share/kernelcompat.h b/share/kernelcompat.h index b7c119bf..a18460e0 100644 --- a/share/kernelcompat.h +++ b/share/kernelcompat.h @@ -131,8 +131,6 @@ static int atomic_read(atomic_t *p) #define atomic_long_set atomic_set #define atomic_long_read atomic_read -#include "asm.h" - //#define __xg(x) ((volatile long *)(x)) #define cmpxchg(ptr, o, n) \ diff --git a/ustd/Makefile b/ustd/Makefile index 503b7e4a..0de51f87 100644 --- a/ustd/Makefile +++ b/ustd/Makefile @@ -1,6 +1,6 @@ all: ustd ustd: ustd.c - gcc -g -Wall -I ../libustcomm -I. -I ../../../../libkcompat -o ustd ustd.c ../libustcomm/ustcomm.c + gcc -g -Wall -I ../libustcomm -I. -I ../../../../libkcompat -lpthread -o ustd ustd.c ../libustcomm/ustcomm.c .PHONY: ustd diff --git a/ustd/ustd.c b/ustd/ustd.c index 19981191..8205ed2b 100644 --- a/ustd/ustd.c +++ b/ustd/ustd.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -32,86 +33,10 @@ struct buffer_info { long consumed_old; }; -int add_buffer(pid_t pid, char *bufname) -{ - struct buffer_info *buf; - char *send_msg; - char *received_msg; - int result; - char *tmp; - int fd; - - buf = (struct buffer_info *) malloc(sizeof(struct buffer_info)); - if(buf == NULL) { - ERR("add_buffer: insufficient memory"); - return -1; - } - - buf->name = bufname; - buf->pid = pid; - - /* get shmid */ - asprintf(&send_msg, "get_shmid %s", buf->name); - send_message(pid, send_msg, &received_msg); - free(send_msg); - DBG("got buffer name %s", buf->name); - - result = sscanf(received_msg, "%d", &buf->shmid); - if(result != 1) { - ERR("unable to parse response to get_shmid"); - return -1; - } - free(received_msg); - DBG("got shmid %d", buf->shmid); - - /* get n_subbufs */ - asprintf(&send_msg, "get_n_subbufs %s", buf->name); - send_message(pid, send_msg, &received_msg); - free(send_msg); - - result = sscanf(received_msg, "%d", &buf->n_subbufs); - if(result != 1) { - ERR("unable to parse response to get_n_subbufs"); - return -1; - } - free(received_msg); - DBG("got n_subbufs %d", buf->n_subbufs); - - /* get subbuf size */ - asprintf(&send_msg, "get_subbuf_size %s", buf->name); - send_message(pid, send_msg, &received_msg); - free(send_msg); - - result = sscanf(received_msg, "%d", &buf->subbuf_size); - if(result != 1) { - ERR("unable to parse response to get_subbuf_size"); - return -1; - } - free(received_msg); - DBG("got subbuf_size %d", buf->subbuf_size); - - /* attach memory */ - buf->mem = shmat(buf->shmid, NULL, 0); - if(buf->mem == (void *) 0) { - perror("shmat"); - return -1; - } - DBG("successfully attached memory"); - - /* open file for output */ - asprintf(&tmp, "/tmp/trace/%s_0", buf->name); - result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600); - if(result == -1) { - PERROR("open"); - return -1; - } - buf->file_fd = fd; - free(tmp); - - list_add(&buf->list, &buffers); - - return 0; -} +/* return value: 0 = subbuffer is finished, it won't produce data anymore + * 1 = got subbuffer successfully + * <0 = error + */ int get_subbuffer(struct buffer_info *buf) { @@ -130,21 +55,27 @@ int get_subbuffer(struct buffer_info *buf) free(send_msg); result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old); - if(result != 2) { + if(result != 2 && result != 1) { ERR("unable to parse response to get_subbuffer"); return -1; } - free(received_msg); + + DBG("received msg is %s", received_msg); if(!strcmp(rep_code, "OK")) { DBG("got subbuffer %s", buf->name); retval = 1; } + else if(nth_token_is(received_msg, "END", 0) == 1) { + return 0; + } else { - DBG("did not get subbuffer %s", buf->name); - retval = 0; + DBG("error getting subbuffer %s", buf->name); + retval = -1; } + /* FIMXE: free correctly the stuff */ + free(received_msg); free(rep_code); return retval; } @@ -205,6 +136,125 @@ ssize_t patient_write(int fd, const void *buf, size_t count) return bufc-(const char *)buf; } +void *consumer_thread(void *arg) +{ + struct buffer_info *buf = (struct buffer_info *) arg; + int result; + + for(;;) { + result = get_subbuffer(buf); + if(result == -1) { + ERR("error getting subbuffer"); + continue; + } + if(result == 0) { + /* this is done */ + break; + } + + /* write data to file */ + result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size); + if(result == -1) { + PERROR("write"); + /* FIXME: maybe drop this trace */ + } + + result = put_subbuffer(buf); + if(result == -1) { + ERR("error putting subbuffer"); + break; + } + } + + DBG("thread for buffer %s is stopping", buf->name); + + return NULL; +} + +int add_buffer(pid_t pid, char *bufname) +{ + struct buffer_info *buf; + char *send_msg; + char *received_msg; + int result; + char *tmp; + int fd; + pthread_t thr; + + buf = (struct buffer_info *) malloc(sizeof(struct buffer_info)); + if(buf == NULL) { + ERR("add_buffer: insufficient memory"); + return -1; + } + + buf->name = bufname; + buf->pid = pid; + + /* get shmid */ + asprintf(&send_msg, "get_shmid %s", buf->name); + send_message(pid, send_msg, &received_msg); + free(send_msg); + DBG("got buffer name %s", buf->name); + + result = sscanf(received_msg, "%d", &buf->shmid); + if(result != 1) { + ERR("unable to parse response to get_shmid"); + return -1; + } + free(received_msg); + DBG("got shmid %d", buf->shmid); + + /* get n_subbufs */ + asprintf(&send_msg, "get_n_subbufs %s", buf->name); + send_message(pid, send_msg, &received_msg); + free(send_msg); + + result = sscanf(received_msg, "%d", &buf->n_subbufs); + if(result != 1) { + ERR("unable to parse response to get_n_subbufs"); + return -1; + } + free(received_msg); + DBG("got n_subbufs %d", buf->n_subbufs); + + /* get subbuf size */ + asprintf(&send_msg, "get_subbuf_size %s", buf->name); + send_message(pid, send_msg, &received_msg); + free(send_msg); + + result = sscanf(received_msg, "%d", &buf->subbuf_size); + if(result != 1) { + ERR("unable to parse response to get_subbuf_size"); + return -1; + } + free(received_msg); + DBG("got subbuf_size %d", buf->subbuf_size); + + /* attach memory */ + buf->mem = shmat(buf->shmid, NULL, 0); + if(buf->mem == (void *) 0) { + perror("shmat"); + return -1; + } + DBG("successfully attached memory"); + + /* open file for output */ + asprintf(&tmp, "/tmp/trace/%s_0", buf->name); + result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600); + if(result == -1) { + PERROR("open"); + return -1; + } + buf->file_fd = fd; + free(tmp); + + //list_add(&buf->list, &buffers); + + pthread_create(&thr, NULL, consumer_thread, buf); + + return 0; +} + int main(int argc, char **argv) { struct ustcomm_ustd ustd; @@ -219,9 +269,8 @@ int main(int argc, char **argv) /* app loop */ for(;;) { char *recvbuf; - struct buffer_info *buf; - /* 1. check for requests on our public socket */ + /* check for requests on our public socket */ result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100); if(result == -1) { ERR("error in ustcomm_ustd_recv_message"); @@ -247,30 +296,6 @@ int main(int argc, char **argv) free(recvbuf); } - - /* 2. try to consume data from tracing apps */ - list_for_each_entry(buf, &buffers, list) { - result = get_subbuffer(buf); - if(result == -1) { - ERR("error getting subbuffer"); - continue; - } - if(result == 0) - continue; - - /* write data to file */ - //result = write(buf->file_fd, buf->, ); - result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size); - if(result == -1) { - PERROR("write"); - /* FIXME: maybe drop this trace */ - } - - result = put_subbuffer(buf); - if(result == -1) { - ERR("error putting subbuffer"); - } - } } return 0;