From: Pierre-Marc Fournier Date: Thu, 26 Feb 2009 01:07:03 +0000 (-0500) Subject: ust: continue work on ustd X-Git-Tag: v0.1~266 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=688760ef257bee85e3841e0d27ecf4e2f921ef00;p=ust.git ust: continue work on ustd --- diff --git a/hello/hello.c b/hello/hello.c index 3bbb2bd..0533173 100644 --- a/hello/hello.c +++ b/hello/hello.c @@ -63,7 +63,7 @@ int main() printf("Hello, World!\n"); sleep(1); - for(i=0; i<5000; i++) { + for(i=0; i<50; i++) { trace_mark(ust, bar, "str %s", "FOOBAZ"); trace_mark(ust, bar2, "number1 %d number2 %d", 53, 9800); usleep(100000); @@ -71,6 +71,12 @@ int main() scanf("%*s"); + ltt_trace_stop("auto"); + ltt_trace_destroy("auto"); + + DBG("TRACE STOPPED"); + scanf("%*s"); + return 0; } diff --git a/libtracectl/tracectl.c b/libtracectl/tracectl.c index 176ee6a..c407be9 100644 --- a/libtracectl/tracectl.c +++ b/libtracectl/tracectl.c @@ -88,7 +88,7 @@ int consumer(void *arg) consumer_channels[i].chan = chan; snprintf(tmp, sizeof(tmp), "trace/%s_0", chan->channel_name); - result = consumer_channels[i].fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00644); + result = consumer_channels[i].fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600); if(result == -1) { perror("open"); return -1; @@ -121,44 +121,6 @@ int consumer(void *arg) sleep(1); } - -// CPRINTF("consumer: got a trace: %s with %d channels\n", trace_name, trace->nr_channels); -// -// struct ltt_channel_struct *chan = &trace->channels[0]; -// -// CPRINTF("channel 1 (%s) active=%u", chan->channel_name, chan->active & 1); - -// struct rchan *rchan = chan->trans_channel_data; -// struct rchan_buf *rbuf = rchan->buf; -// struct ltt_channel_buf_struct *lttbuf = chan->buf; -// long consumed_old; -// -// result = fd = open("trace.out", O_WRONLY | O_CREAT | O_TRUNC, 00644); -// if(result == -1) { -// perror("open"); -// return -1; -// } - -// for(;;) { -// write(STDOUT_FILENO, str, sizeof(str)); -// -// result = ltt_do_get_subbuf(rbuf, lttbuf, &consumed_old); -// if(result < 0) { -// CPRINTF("ltt_do_get_subbuf: error: %s", strerror(-result)); -// } -// else { -// CPRINTF("success!"); -// -// result = write(fd, rbuf->buf_data + (consumed_old & (2 * 4096-1)), 4096); -// ltt_do_put_subbuf(rbuf, lttbuf, consumed_old); -// } -// -// //CPRINTF("There seems to be %ld bytes available", SUBBUF_TRUNC(local_read(<tbuf->offset), rbuf->chan) - consumed_old); -// CPRINTF("Commit count %ld", local_read(<tbuf->commit_count[0])); -// -// -// sleep(1); -// } } void start_consumer(void) @@ -246,18 +208,14 @@ int listener_main(void *p) int len; struct ustcomm_source src; - result = ustcomm_app_recv_message(&ustcomm_app, &recvbuf, &src); - DBG("HERE"); - if(result) { + result = ustcomm_app_recv_message(&ustcomm_app, &recvbuf, &src, -1); + if(result <= 0) { WARN("error in ustcomm_app_recv_message"); continue; } DBG("received a message! it's: %s\n", recvbuf); len = strlen(recvbuf); - //if(len && recvbuf[len-1] == '\n') { - // recvbuf[len-1] = '\0'; - //} if(!strcmp(recvbuf, "print_markers")) { print_markers(); @@ -454,6 +412,133 @@ int listener_main(void *p) DBG("load_probe_lib loading %s", libfile); } + else if(nth_token_is(recvbuf, "get_subbuffer", 0) == 1) { + struct ltt_trace_struct *trace; + char trace_name[] = "auto"; + int i; + char *channel_name; + + DBG("get_subbuf"); + + channel_name = nth_token(recvbuf, 1); + if(channel_name == NULL) { + ERR("get_subbuf: cannot parse channel"); + goto next_cmd; + } + + ltt_lock_traces(); + trace = _ltt_trace_find(trace_name); + ltt_unlock_traces(); + + if(trace == NULL) { + CPRINTF("cannot find trace!"); + return 1; + } + + for(i=0; inr_channels; i++) { + struct rchan *rchan = trace->channels[i].trans_channel_data; + + if(!strcmp(trace->channels[i].channel_name, channel_name)) { + struct rchan_buf *rbuf = rchan->buf; + struct ltt_channel_buf_struct *lttbuf = trace->channels[i].buf; + char *reply; + long consumed_old=0; + + result = ltt_do_get_subbuf(rbuf, lttbuf, &consumed_old); + if(result < 0) { + DBG("ltt_do_get_subbuf: error: %s", strerror(-result)); + asprintf(&reply, "%s %ld", "UNAVAIL", 0); + } + else { + DBG("ltt_do_get_subbuf: success"); + asprintf(&reply, "%s %ld", "OK", consumed_old); + } + + result = ustcomm_send_reply(&ustcomm_app.server, reply, &src); + if(result) { + ERR("listener: get_subbuf: ustcomm_send_reply failed"); + goto next_cmd; + } + + free(reply); + + break; + } + } + } + else if(nth_token_is(recvbuf, "put_subbuffer", 0) == 1) { + struct ltt_trace_struct *trace; + char trace_name[] = "auto"; + int i; + char *channel_name; + long consumed_old; + char *consumed_old_str; + char *endptr; + + DBG("put_subbuf"); + + channel_name = strdup_malloc(nth_token(recvbuf, 1)); + if(channel_name == NULL) { + ERR("put_subbuf_size: cannot parse channel"); + goto next_cmd; + } + + consumed_old_str = strdup_malloc(nth_token(recvbuf, 2)); + if(consumed_old_str == NULL) { + ERR("put_subbuf: cannot parse consumed_old"); + goto next_cmd; + } + consumed_old = strtol(consumed_old_str, &endptr, 10); + if(*endptr != '\0') { + ERR("put_subbuf: invalid value for consumed_old"); + goto next_cmd; + } + + ltt_lock_traces(); + trace = _ltt_trace_find(trace_name); + ltt_unlock_traces(); + + if(trace == NULL) { + CPRINTF("cannot find trace!"); + return 1; + } + + for(i=0; inr_channels; i++) { + struct rchan *rchan = trace->channels[i].trans_channel_data; + + if(!strcmp(trace->channels[i].channel_name, channel_name)) { + struct rchan_buf *rbuf = rchan->buf; + struct ltt_channel_buf_struct *lttbuf = trace->channels[i].buf; + char *reply; + long consumed_old=0; + + result = ltt_do_put_subbuf(rbuf, lttbuf, consumed_old); + if(result < 0) { + WARN("ltt_do_put_subbuf: error"); + } + else { + DBG("ltt_do_put_subbuf: success"); + } + asprintf(&reply, "%s", "OK", consumed_old); + + result = ustcomm_send_reply(&ustcomm_app.server, reply, &src); + if(result) { + ERR("listener: put_subbuf: ustcomm_send_reply failed"); + goto next_cmd; + } + + free(reply); + + break; + } + } + + free(channel_name); + free(consumed_old_str); + } + else { + ERR("unable to parse message: %s", recvbuf); + } next_cmd: free(recvbuf); @@ -664,7 +749,8 @@ static void __attribute__((destructor)) fini() } /* FIXME: wait for the consumer to be done */ - sleep(1); + DBG("waiting 5 sec for consume"); + sleep(5); destroy_socket(); } diff --git a/libustcomm/ustcomm.c b/libustcomm/ustcomm.c index e445ed9..6047b60 100644 --- a/libustcomm/ustcomm.c +++ b/libustcomm/ustcomm.c @@ -35,6 +35,20 @@ // backtrace_symbols_fd(buffer, result, STDERR_FILENO); //} +char *strdup_malloc(const char *s) +{ + char *retval; + + if(s == NULL) + return NULL; + + retval = (char *) malloc(strlen(s)+1); + + strcpy(retval, s); + + return retval; +} + static void signal_process(pid_t pid) { int result; @@ -57,9 +71,12 @@ int send_message_fd(int fd, const char *msg, char **reply) PERROR("send"); return -1; } + else if(result == 0) { + return 0; + } if(!reply) - return 0; + return 1; *reply = (char *) malloc(MSG_MAX+1); result = recv(fd, *reply, MSG_MAX, 0); @@ -67,10 +84,13 @@ int send_message_fd(int fd, const char *msg, char **reply) PERROR("recv"); return -1; } + else if(result == 0) { + return 0; + } (*reply)[result] = '\0'; - return 0; + return 1; } int send_message_path(const char *path, const char *msg, char **reply, int signalpid) @@ -149,7 +169,10 @@ int ustcomm_request_consumer(pid_t pid, const char *channel) return 0; } - +/* returns 1 to indicate a message was received + * returns 0 to indicate no message was received (cannot happen) + * returns -1 to indicate an error + */ static int recv_message_fd(int fd, char **msg, struct ustcomm_source *src) { @@ -159,7 +182,7 @@ static int recv_message_fd(int fd, char **msg, struct ustcomm_source *src) result = recv(fd, *msg, MSG_MAX, 0); if(result == -1) { - PERROR("recvfrom"); + PERROR("recv"); return -1; } @@ -170,7 +193,7 @@ static int recv_message_fd(int fd, char **msg, struct ustcomm_source *src) if(src) src->fd = fd; - return 0; + return 1; } int ustcomm_send_reply(struct ustcomm_server *server, char *msg, struct ustcomm_source *src) @@ -186,7 +209,14 @@ int ustcomm_send_reply(struct ustcomm_server *server, char *msg, struct ustcomm_ return 0; } -int ustcomm_recv_message(struct ustcomm_server *server, char **msg, struct ustcomm_source *src) +/* @timeout: max blocking time in milliseconds, -1 means infinity + * + * returns 1 to indicate a message was received + * returns 0 to indicate no message was received + * returns -1 to indicate an error + */ + +int ustcomm_recv_message(struct ustcomm_server *server, char **msg, struct ustcomm_source *src, int timeout) { struct pollfd *fds; struct ustcomm_connection *conn; @@ -218,12 +248,15 @@ int ustcomm_recv_message(struct ustcomm_server *server, char **msg, struct ustco idx++; } - result = poll(fds, n_fds, -1); + result = poll(fds, n_fds, timeout); if(result == -1) { PERROR("poll"); return -1; } + if(result == 0) + return 0; + if(fds[0].revents) { struct ustcomm_connection *newconn; int newfd; @@ -273,14 +306,14 @@ free_fds_return: return retval; } -int ustcomm_ustd_recv_message(struct ustcomm_ustd *ustd, char **msg, struct ustcomm_source *src) +int ustcomm_ustd_recv_message(struct ustcomm_ustd *ustd, char **msg, struct ustcomm_source *src, int timeout) { - return ustcomm_recv_message(&ustd->server, msg, src); + return ustcomm_recv_message(&ustd->server, msg, src, timeout); } -int ustcomm_app_recv_message(struct ustcomm_app *app, char **msg, struct ustcomm_source *src) +int ustcomm_app_recv_message(struct ustcomm_app *app, char **msg, struct ustcomm_source *src, int timeout) { - return ustcomm_recv_message(&app->server, msg, src); + return ustcomm_recv_message(&app->server, msg, src, timeout); } static int init_named_socket(char *name, char **path_out) diff --git a/libustcomm/ustcomm.h b/libustcomm/ustcomm.h index 91ac80d..e661a55 100644 --- a/libustcomm/ustcomm.h +++ b/libustcomm/ustcomm.h @@ -32,10 +32,12 @@ struct ustcomm_source { void *priv; }; +char *strdup_malloc(const char *s); + int send_message(pid_t pid, const char *msg, char **reply); -int ustcomm_ustd_recv_message(struct ustcomm_ustd *ustd, char **msg, struct ustcomm_source *src); -int ustcomm_app_recv_message(struct ustcomm_app *app, char **msg, struct ustcomm_source *src); +int ustcomm_ustd_recv_message(struct ustcomm_ustd *ustd, char **msg, struct ustcomm_source *src, int timeout); +int ustcomm_app_recv_message(struct ustcomm_app *app, char **msg, struct ustcomm_source *src, int timeout); int ustcomm_init_app(pid_t pid, struct ustcomm_app *handle); diff --git a/ustd/ustd.c b/ustd/ustd.c index 66b9dd3..1998119 100644 --- a/ustd/ustd.c +++ b/ustd/ustd.c @@ -2,6 +2,8 @@ #include #include +#include +#include #include #include @@ -24,6 +26,10 @@ struct buffer_info { int subbuf_size; int file_fd; /* output file */ + + struct list_head list; + + long consumed_old; }; int add_buffer(pid_t pid, char *bufname) @@ -32,6 +38,8 @@ int add_buffer(pid_t pid, char *bufname) char *send_msg; char *received_msg; int result; + char *tmp; + int fd; buf = (struct buffer_info *) malloc(sizeof(struct buffer_info)); if(buf == NULL) { @@ -90,9 +98,113 @@ int add_buffer(pid_t pid, char *bufname) } DBG("successfully attached memory"); + /* open file for output */ + asprintf(&tmp, "/tmp/trace/%s_0", buf->name); + result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600); + if(result == -1) { + PERROR("open"); + return -1; + } + buf->file_fd = fd; + free(tmp); + + list_add(&buf->list, &buffers); + return 0; } +int get_subbuffer(struct buffer_info *buf) +{ + char *send_msg; + char *received_msg; + char *rep_code; + int retval; + int result; + + asprintf(&send_msg, "get_subbuffer %s", buf->name); + result = send_message(buf->pid, send_msg, &received_msg); + if(result < 0) { + ERR("get_subbuffer: send_message failed"); + return -1; + } + free(send_msg); + + result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old); + if(result != 2) { + ERR("unable to parse response to get_subbuffer"); + return -1; + } + free(received_msg); + + if(!strcmp(rep_code, "OK")) { + DBG("got subbuffer %s", buf->name); + retval = 1; + } + else { + DBG("did not get subbuffer %s", buf->name); + retval = 0; + } + + free(rep_code); + return retval; +} + +int put_subbuffer(struct buffer_info *buf) +{ + char *send_msg; + char *received_msg; + char *rep_code; + int retval; + int result; + + asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old); + result = send_message(buf->pid, send_msg, &received_msg); + if(result < 0) { + ERR("put_subbuffer: send_message failed"); + return -1; + } + free(send_msg); + + result = sscanf(received_msg, "%as", &rep_code); + if(result != 1) { + ERR("unable to parse response to put_subbuffer"); + return -1; + } + free(received_msg); + + if(!strcmp(rep_code, "OK")) { + DBG("subbuffer put %s", buf->name); + retval = 1; + } + else { + ERR("invalid response to put_subbuffer"); + } + + free(rep_code); + return retval; +} + +ssize_t patient_write(int fd, const void *buf, size_t count) +{ + const char *bufc = (const char *) buf; + int result; + + for(;;) { + result = write(fd, bufc, count); + if(result <= 0) { + return result; + } + count -= result; + bufc += result; + + if(count == 0) { + break; + } + } + + return bufc-(const char *)buf; +} + int main(int argc, char **argv) { struct ustcomm_ustd ustd; @@ -104,26 +216,61 @@ int main(int argc, char **argv) return 1; } + /* app loop */ for(;;) { char *recvbuf; + struct buffer_info *buf; - ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL); + /* 1. check for requests on our public socket */ + result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100); + if(result == -1) { + ERR("error in ustcomm_ustd_recv_message"); + continue; + } + if(result > 0) { + if(!strncmp(recvbuf, "collect", 7)) { + pid_t pid; + char *bufname; + int result; - if(!strncmp(recvbuf, "collect", 7)) { - pid_t pid; - char *bufname; - int result; + result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname); + if(result != 2) { + fprintf(stderr, "parsing error: %s\n", recvbuf); + } - result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname); - if(result != 2) { - fprintf(stderr, "parsing error: %s\n", recvbuf); + result = add_buffer(pid, bufname); + if(result < 0) { + ERR("error in add_buffer"); + continue; + } } - add_buffer(pid, bufname); - + free(recvbuf); } - free(recvbuf); + /* 2. try to consume data from tracing apps */ + list_for_each_entry(buf, &buffers, list) { + result = get_subbuffer(buf); + if(result == -1) { + ERR("error getting subbuffer"); + continue; + } + if(result == 0) + continue; + + /* write data to file */ + //result = write(buf->file_fd, buf->, ); + result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size); + if(result == -1) { + PERROR("write"); + /* FIXME: maybe drop this trace */ + } + + result = put_subbuffer(buf); + if(result == -1) { + ERR("error putting subbuffer"); + } + } } return 0;