printf("Hello, World!\n");
sleep(1);
- for(i=0; i<5000; i++) {
+ for(i=0; i<50; i++) {
trace_mark(ust, bar, "str %s", "FOOBAZ");
trace_mark(ust, bar2, "number1 %d number2 %d", 53, 9800);
usleep(100000);
scanf("%*s");
+ ltt_trace_stop("auto");
+ ltt_trace_destroy("auto");
+
+ DBG("TRACE STOPPED");
+ scanf("%*s");
+
return 0;
}
consumer_channels[i].chan = chan;
snprintf(tmp, sizeof(tmp), "trace/%s_0", chan->channel_name);
- result = consumer_channels[i].fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00644);
+ result = consumer_channels[i].fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600);
if(result == -1) {
perror("open");
return -1;
sleep(1);
}
-
-// CPRINTF("consumer: got a trace: %s with %d channels\n", trace_name, trace->nr_channels);
-//
-// struct ltt_channel_struct *chan = &trace->channels[0];
-//
-// CPRINTF("channel 1 (%s) active=%u", chan->channel_name, chan->active & 1);
-
-// struct rchan *rchan = chan->trans_channel_data;
-// struct rchan_buf *rbuf = rchan->buf;
-// struct ltt_channel_buf_struct *lttbuf = chan->buf;
-// long consumed_old;
-//
-// result = fd = open("trace.out", O_WRONLY | O_CREAT | O_TRUNC, 00644);
-// if(result == -1) {
-// perror("open");
-// return -1;
-// }
-
-// for(;;) {
-// write(STDOUT_FILENO, str, sizeof(str));
-//
-// result = ltt_do_get_subbuf(rbuf, lttbuf, &consumed_old);
-// if(result < 0) {
-// CPRINTF("ltt_do_get_subbuf: error: %s", strerror(-result));
-// }
-// else {
-// CPRINTF("success!");
-//
-// result = write(fd, rbuf->buf_data + (consumed_old & (2 * 4096-1)), 4096);
-// ltt_do_put_subbuf(rbuf, lttbuf, consumed_old);
-// }
-//
-// //CPRINTF("There seems to be %ld bytes available", SUBBUF_TRUNC(local_read(<tbuf->offset), rbuf->chan) - consumed_old);
-// CPRINTF("Commit count %ld", local_read(<tbuf->commit_count[0]));
-//
-//
-// sleep(1);
-// }
}
void start_consumer(void)
int len;
struct ustcomm_source src;
- result = ustcomm_app_recv_message(&ustcomm_app, &recvbuf, &src);
- DBG("HERE");
- if(result) {
+ result = ustcomm_app_recv_message(&ustcomm_app, &recvbuf, &src, -1);
+ if(result <= 0) {
WARN("error in ustcomm_app_recv_message");
continue;
}
DBG("received a message! it's: %s\n", recvbuf);
len = strlen(recvbuf);
- //if(len && recvbuf[len-1] == '\n') {
- // recvbuf[len-1] = '\0';
- //}
if(!strcmp(recvbuf, "print_markers")) {
print_markers();
DBG("load_probe_lib loading %s", libfile);
}
+ else if(nth_token_is(recvbuf, "get_subbuffer", 0) == 1) {
+ struct ltt_trace_struct *trace;
+ char trace_name[] = "auto";
+ int i;
+ char *channel_name;
+
+ DBG("get_subbuf");
+
+ channel_name = nth_token(recvbuf, 1);
+ if(channel_name == NULL) {
+ ERR("get_subbuf: cannot parse channel");
+ goto next_cmd;
+ }
+
+ ltt_lock_traces();
+ trace = _ltt_trace_find(trace_name);
+ ltt_unlock_traces();
+
+ if(trace == NULL) {
+ CPRINTF("cannot find trace!");
+ return 1;
+ }
+
+ for(i=0; i<trace->nr_channels; i++) {
+ struct rchan *rchan = trace->channels[i].trans_channel_data;
+
+ if(!strcmp(trace->channels[i].channel_name, channel_name)) {
+ struct rchan_buf *rbuf = rchan->buf;
+ struct ltt_channel_buf_struct *lttbuf = trace->channels[i].buf;
+ char *reply;
+ long consumed_old=0;
+
+ result = ltt_do_get_subbuf(rbuf, lttbuf, &consumed_old);
+ if(result < 0) {
+ DBG("ltt_do_get_subbuf: error: %s", strerror(-result));
+ asprintf(&reply, "%s %ld", "UNAVAIL", 0);
+ }
+ else {
+ DBG("ltt_do_get_subbuf: success");
+ asprintf(&reply, "%s %ld", "OK", consumed_old);
+ }
+
+ result = ustcomm_send_reply(&ustcomm_app.server, reply, &src);
+ if(result) {
+ ERR("listener: get_subbuf: ustcomm_send_reply failed");
+ goto next_cmd;
+ }
+
+ free(reply);
+
+ break;
+ }
+ }
+ }
+ else if(nth_token_is(recvbuf, "put_subbuffer", 0) == 1) {
+ struct ltt_trace_struct *trace;
+ char trace_name[] = "auto";
+ int i;
+ char *channel_name;
+ long consumed_old;
+ char *consumed_old_str;
+ char *endptr;
+
+ DBG("put_subbuf");
+
+ channel_name = strdup_malloc(nth_token(recvbuf, 1));
+ if(channel_name == NULL) {
+ ERR("put_subbuf_size: cannot parse channel");
+ goto next_cmd;
+ }
+
+ consumed_old_str = strdup_malloc(nth_token(recvbuf, 2));
+ if(consumed_old_str == NULL) {
+ ERR("put_subbuf: cannot parse consumed_old");
+ goto next_cmd;
+ }
+ consumed_old = strtol(consumed_old_str, &endptr, 10);
+ if(*endptr != '\0') {
+ ERR("put_subbuf: invalid value for consumed_old");
+ goto next_cmd;
+ }
+
+ ltt_lock_traces();
+ trace = _ltt_trace_find(trace_name);
+ ltt_unlock_traces();
+
+ if(trace == NULL) {
+ CPRINTF("cannot find trace!");
+ return 1;
+ }
+
+ for(i=0; i<trace->nr_channels; i++) {
+ struct rchan *rchan = trace->channels[i].trans_channel_data;
+
+ if(!strcmp(trace->channels[i].channel_name, channel_name)) {
+ struct rchan_buf *rbuf = rchan->buf;
+ struct ltt_channel_buf_struct *lttbuf = trace->channels[i].buf;
+ char *reply;
+ long consumed_old=0;
+
+ result = ltt_do_put_subbuf(rbuf, lttbuf, consumed_old);
+ if(result < 0) {
+ WARN("ltt_do_put_subbuf: error");
+ }
+ else {
+ DBG("ltt_do_put_subbuf: success");
+ }
+ asprintf(&reply, "%s", "OK", consumed_old);
+
+ result = ustcomm_send_reply(&ustcomm_app.server, reply, &src);
+ if(result) {
+ ERR("listener: put_subbuf: ustcomm_send_reply failed");
+ goto next_cmd;
+ }
+
+ free(reply);
+
+ break;
+ }
+ }
+
+ free(channel_name);
+ free(consumed_old_str);
+ }
+ else {
+ ERR("unable to parse message: %s", recvbuf);
+ }
next_cmd:
free(recvbuf);
}
/* FIXME: wait for the consumer to be done */
- sleep(1);
+ DBG("waiting 5 sec for consume");
+ sleep(5);
destroy_socket();
}
// backtrace_symbols_fd(buffer, result, STDERR_FILENO);
//}
+char *strdup_malloc(const char *s)
+{
+ char *retval;
+
+ if(s == NULL)
+ return NULL;
+
+ retval = (char *) malloc(strlen(s)+1);
+
+ strcpy(retval, s);
+
+ return retval;
+}
+
static void signal_process(pid_t pid)
{
int result;
PERROR("send");
return -1;
}
+ else if(result == 0) {
+ return 0;
+ }
if(!reply)
- return 0;
+ return 1;
*reply = (char *) malloc(MSG_MAX+1);
result = recv(fd, *reply, MSG_MAX, 0);
PERROR("recv");
return -1;
}
+ else if(result == 0) {
+ return 0;
+ }
(*reply)[result] = '\0';
- return 0;
+ return 1;
}
int send_message_path(const char *path, const char *msg, char **reply, int signalpid)
return 0;
}
-
+/* returns 1 to indicate a message was received
+ * returns 0 to indicate no message was received (cannot happen)
+ * returns -1 to indicate an error
+ */
static int recv_message_fd(int fd, char **msg, struct ustcomm_source *src)
{
result = recv(fd, *msg, MSG_MAX, 0);
if(result == -1) {
- PERROR("recvfrom");
+ PERROR("recv");
return -1;
}
if(src)
src->fd = fd;
- return 0;
+ return 1;
}
int ustcomm_send_reply(struct ustcomm_server *server, char *msg, struct ustcomm_source *src)
return 0;
}
-int ustcomm_recv_message(struct ustcomm_server *server, char **msg, struct ustcomm_source *src)
+/* @timeout: max blocking time in milliseconds, -1 means infinity
+ *
+ * returns 1 to indicate a message was received
+ * returns 0 to indicate no message was received
+ * returns -1 to indicate an error
+ */
+
+int ustcomm_recv_message(struct ustcomm_server *server, char **msg, struct ustcomm_source *src, int timeout)
{
struct pollfd *fds;
struct ustcomm_connection *conn;
idx++;
}
- result = poll(fds, n_fds, -1);
+ result = poll(fds, n_fds, timeout);
if(result == -1) {
PERROR("poll");
return -1;
}
+ if(result == 0)
+ return 0;
+
if(fds[0].revents) {
struct ustcomm_connection *newconn;
int newfd;
return retval;
}
-int ustcomm_ustd_recv_message(struct ustcomm_ustd *ustd, char **msg, struct ustcomm_source *src)
+int ustcomm_ustd_recv_message(struct ustcomm_ustd *ustd, char **msg, struct ustcomm_source *src, int timeout)
{
- return ustcomm_recv_message(&ustd->server, msg, src);
+ return ustcomm_recv_message(&ustd->server, msg, src, timeout);
}
-int ustcomm_app_recv_message(struct ustcomm_app *app, char **msg, struct ustcomm_source *src)
+int ustcomm_app_recv_message(struct ustcomm_app *app, char **msg, struct ustcomm_source *src, int timeout)
{
- return ustcomm_recv_message(&app->server, msg, src);
+ return ustcomm_recv_message(&app->server, msg, src, timeout);
}
static int init_named_socket(char *name, char **path_out)
void *priv;
};
+char *strdup_malloc(const char *s);
+
int send_message(pid_t pid, const char *msg, char **reply);
-int ustcomm_ustd_recv_message(struct ustcomm_ustd *ustd, char **msg, struct ustcomm_source *src);
-int ustcomm_app_recv_message(struct ustcomm_app *app, char **msg, struct ustcomm_source *src);
+int ustcomm_ustd_recv_message(struct ustcomm_ustd *ustd, char **msg, struct ustcomm_source *src, int timeout);
+int ustcomm_app_recv_message(struct ustcomm_app *app, char **msg, struct ustcomm_source *src, int timeout);
int ustcomm_init_app(pid_t pid, struct ustcomm_app *handle);
#include <sys/types.h>
#include <sys/shm.h>
+#include <fcntl.h>
+#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
int subbuf_size;
int file_fd; /* output file */
+
+ struct list_head list;
+
+ long consumed_old;
};
int add_buffer(pid_t pid, char *bufname)
char *send_msg;
char *received_msg;
int result;
+ char *tmp;
+ int fd;
buf = (struct buffer_info *) malloc(sizeof(struct buffer_info));
if(buf == NULL) {
}
DBG("successfully attached memory");
+ /* open file for output */
+ asprintf(&tmp, "/tmp/trace/%s_0", buf->name);
+ result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600);
+ if(result == -1) {
+ PERROR("open");
+ return -1;
+ }
+ buf->file_fd = fd;
+ free(tmp);
+
+ list_add(&buf->list, &buffers);
+
return 0;
}
+int get_subbuffer(struct buffer_info *buf)
+{
+ char *send_msg;
+ char *received_msg;
+ char *rep_code;
+ int retval;
+ int result;
+
+ asprintf(&send_msg, "get_subbuffer %s", buf->name);
+ result = send_message(buf->pid, send_msg, &received_msg);
+ if(result < 0) {
+ ERR("get_subbuffer: send_message failed");
+ return -1;
+ }
+ free(send_msg);
+
+ result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
+ if(result != 2) {
+ ERR("unable to parse response to get_subbuffer");
+ return -1;
+ }
+ free(received_msg);
+
+ if(!strcmp(rep_code, "OK")) {
+ DBG("got subbuffer %s", buf->name);
+ retval = 1;
+ }
+ else {
+ DBG("did not get subbuffer %s", buf->name);
+ retval = 0;
+ }
+
+ free(rep_code);
+ return retval;
+}
+
+int put_subbuffer(struct buffer_info *buf)
+{
+ char *send_msg;
+ char *received_msg;
+ char *rep_code;
+ int retval;
+ int result;
+
+ asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
+ result = send_message(buf->pid, send_msg, &received_msg);
+ if(result < 0) {
+ ERR("put_subbuffer: send_message failed");
+ return -1;
+ }
+ free(send_msg);
+
+ result = sscanf(received_msg, "%as", &rep_code);
+ if(result != 1) {
+ ERR("unable to parse response to put_subbuffer");
+ return -1;
+ }
+ free(received_msg);
+
+ if(!strcmp(rep_code, "OK")) {
+ DBG("subbuffer put %s", buf->name);
+ retval = 1;
+ }
+ else {
+ ERR("invalid response to put_subbuffer");
+ }
+
+ free(rep_code);
+ return retval;
+}
+
+ssize_t patient_write(int fd, const void *buf, size_t count)
+{
+ const char *bufc = (const char *) buf;
+ int result;
+
+ for(;;) {
+ result = write(fd, bufc, count);
+ if(result <= 0) {
+ return result;
+ }
+ count -= result;
+ bufc += result;
+
+ if(count == 0) {
+ break;
+ }
+ }
+
+ return bufc-(const char *)buf;
+}
+
int main(int argc, char **argv)
{
struct ustcomm_ustd ustd;
return 1;
}
+ /* app loop */
for(;;) {
char *recvbuf;
+ struct buffer_info *buf;
- ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL);
+ /* 1. check for requests on our public socket */
+ result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100);
+ if(result == -1) {
+ ERR("error in ustcomm_ustd_recv_message");
+ continue;
+ }
+ if(result > 0) {
+ if(!strncmp(recvbuf, "collect", 7)) {
+ pid_t pid;
+ char *bufname;
+ int result;
- if(!strncmp(recvbuf, "collect", 7)) {
- pid_t pid;
- char *bufname;
- int result;
+ result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
+ if(result != 2) {
+ fprintf(stderr, "parsing error: %s\n", recvbuf);
+ }
- result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
- if(result != 2) {
- fprintf(stderr, "parsing error: %s\n", recvbuf);
+ result = add_buffer(pid, bufname);
+ if(result < 0) {
+ ERR("error in add_buffer");
+ continue;
+ }
}
- add_buffer(pid, bufname);
-
+ free(recvbuf);
}
- free(recvbuf);
+ /* 2. try to consume data from tracing apps */
+ list_for_each_entry(buf, &buffers, list) {
+ result = get_subbuffer(buf);
+ if(result == -1) {
+ ERR("error getting subbuffer");
+ continue;
+ }
+ if(result == 0)
+ continue;
+
+ /* write data to file */
+ //result = write(buf->file_fd, buf->, );
+ result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size);
+ if(result == -1) {
+ PERROR("write");
+ /* FIXME: maybe drop this trace */
+ }
+
+ result = put_subbuffer(buf);
+ if(result == -1) {
+ ERR("error putting subbuffer");
+ }
+ }
}
return 0;