Convert buffering system to per-cpu
authorPierre-Marc Fournier <pierre-marc.fournier@polymtl.ca>
Tue, 2 Feb 2010 02:12:40 +0000 (21:12 -0500)
committerPierre-Marc Fournier <pierre-marc.fournier@polymtl.ca>
Tue, 2 Feb 2010 02:12:40 +0000 (21:12 -0500)
The cpu count is fixed at trace allocation. If the current cpu
happens to be out of range, the event is put in the buffer of cpu0.

libust/buffers.c
libust/buffers.h
libust/channels.h
libust/serialize.c
libust/tracectl.c
libust/tracer.c
libust/tracer.h
ustd/ustd.c

index 504a79da344b14431197576a42685efcefcfcaf5..13660a8910402ad45e163c524aa845d37025a6df 100644 (file)
@@ -20,6 +20,7 @@
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
  */
 
+#include <unistd.h>
 #include <sys/mman.h>
 #include <sys/ipc.h>
 #include <sys/shm.h>
 static DEFINE_MUTEX(ust_buffers_channels_mutex);
 static LIST_HEAD(ust_buffers_channels);
 
+static int get_n_cpus(void)
+{
+       int result;
+       static int n_cpus = 0;
+
+       if(n_cpus) {
+               return n_cpus;
+       }
+
+       result = sysconf(_SC_NPROCESSORS_ONLN);
+       if(result == -1) {
+               return -1;
+       }
+
+       n_cpus = result;
+
+       return result;
+}
+
 static int ust_buffers_init_buffer(struct ltt_trace_struct *trace,
                struct ust_channel *ltt_chan,
                struct ust_buffer *buf,
@@ -57,6 +77,7 @@ static int ust_buffers_alloc_buf(struct ust_buffer *buf, size_t *size)
                return -1;
        }
 
+       /* FIXME: should have matching call to shmdt */
        ptr = shmat(buf->shmid, NULL, 0);
        if(ptr == (void *) -1) {
                perror("shmat");
@@ -86,20 +107,19 @@ static int ust_buffers_alloc_buf(struct ust_buffer *buf, size_t *size)
        return -1;
 }
 
-static struct ust_buffer *ust_buffers_create_buf(struct ust_channel *channel)
+int ust_buffers_create_buf(struct ust_channel *channel, int cpu)
 {
        int result;
+       struct ust_buffer *buf = channel->buf[cpu];
 
-       result = ust_buffers_alloc_buf(channel->buf, &channel->alloc_size);
+       buf->cpu = cpu;
+       result = ust_buffers_alloc_buf(buf, &channel->alloc_size);
        if(result)
-               goto free_buf;
+               return -1;
 
-       ((struct ust_buffer *)channel->buf)->chan = channel;
+       buf->chan = channel;
        kref_get(&channel->kref);
-       return channel->buf;
-
-free_buf:
-       return NULL;
+       return 0;
 }
 
 static void ust_buffers_destroy_channel(struct kref *kref)
@@ -118,6 +138,7 @@ static void ust_buffers_destroy_buf(struct ust_buffer *buf)
                PERROR("munmap");
        }
 
+//ust//        chan->buf[buf->cpu] = NULL;
        free(buf);
        kref_put(&chan->kref, ust_buffers_destroy_channel);
 }
@@ -129,23 +150,21 @@ static void ust_buffers_remove_buf(struct kref *kref)
        ust_buffers_destroy_buf(buf);
 }
 
-static struct ust_buffer *ust_buffers_open_buf(struct ust_channel *chan)
+int ust_buffers_open_buf(struct ust_channel *chan, int cpu)
 {
-       struct ust_buffer *buf = NULL;
-       int err;
-
-       buf = ust_buffers_create_buf(chan);
-       if (!buf)
-               return NULL;
+       int result;
 
-       kref_init(&buf->kref);
+       result = ust_buffers_create_buf(chan, cpu);
+       if (result == -1)
+               return -1;
 
-       err = ust_buffers_init_buffer(chan->trace, chan, buf, chan->subbuf_cnt);
+       kref_init(&chan->buf[cpu]->kref);
 
-       if (err)
-               return ERR_PTR(err);
+       result = ust_buffers_init_buffer(chan->trace, chan, chan->buf[cpu], chan->subbuf_cnt);
+       if(result == -1)
+               return -1;
 
-       return buf;
+       return 0;
 
        /* FIXME: decrementally destroy on error? */
 }
@@ -161,6 +180,9 @@ static void ust_buffers_close_buf(struct ust_buffer *buf)
 
 int ust_buffers_channel_open(struct ust_channel *chan, size_t subbuf_size, size_t subbuf_cnt)
 {
+       int i;
+       int result;
+
        if(subbuf_size == 0 || subbuf_cnt == 0)
                return -1;
 
@@ -169,18 +191,27 @@ int ust_buffers_channel_open(struct ust_channel *chan, size_t subbuf_size, size_
        chan->subbuf_size = subbuf_size;
        chan->subbuf_size_order = get_count_order(subbuf_size);
        chan->alloc_size = FIX_SIZE(subbuf_size * subbuf_cnt);
+
        kref_init(&chan->kref);
 
        mutex_lock(&ust_buffers_channels_mutex);
-       chan->buf = ust_buffers_open_buf(chan);
-       if (!chan->buf)
-               goto error;
+       for(i=0; i<chan->n_cpus; i++) {
+               result = ust_buffers_open_buf(chan, i);
+               if (result == -1)
+                       goto error;
+       }
        list_add(&chan->list, &ust_buffers_channels);
        mutex_unlock(&ust_buffers_channels_mutex);
 
        return 0;
 
-       error:
+       /* Jump directly inside the loop to close the buffers that were already
+        * opened. */
+       for(; i>=0; i--) {
+               ust_buffers_close_buf(chan->buf[i]);
+error:
+       }
+
        kref_put(&chan->kref, ust_buffers_destroy_channel);
        mutex_unlock(&ust_buffers_channels_mutex);
        return -1;
@@ -188,12 +219,17 @@ int ust_buffers_channel_open(struct ust_channel *chan, size_t subbuf_size, size_
 
 void ust_buffers_channel_close(struct ust_channel *chan)
 {
-       if (!chan)
+       int i;
+       if(!chan)
                return;
 
        mutex_lock(&ust_buffers_channels_mutex);
-       if (chan->buf)
-               ust_buffers_close_buf(chan->buf);
+       for(i=0; i<chan->n_cpus; i++) {
+       /* FIXME: if we make it here, then all buffers were necessarily allocated. Moreover, we don't
+        * initialize to NULL so we cannot use this check. Should we? */
+//ust//                if (chan->buf[i])
+                       ust_buffers_close_buf(chan->buf[i]);
+       }
 
        list_del(&chan->list);
        kref_put(&chan->kref, ust_buffers_destroy_channel);
@@ -216,6 +252,7 @@ void _ust_buffers_write(struct ust_buffer *buf, size_t offset,
                len -= cpy;
                src += cpy;
                offset += cpy;
+
                WARN_ON(offset >= buf->buf_size);
 
                cpy = min_t(size_t, len, buf->buf_size - offset);
@@ -223,16 +260,6 @@ void _ust_buffers_write(struct ust_buffer *buf, size_t offset,
        } while (unlikely(len != cpy));
 }
 
-/**
- * ltt_buffers_offset_address - get address of a location within the buffer
- * @buf : buffer
- * @offset : offset within the buffer.
- *
- * Return the address where a given offset is located.
- * Should be used to get the current subbuffer header pointer. Given we know
- * it's never on a page boundary, it's safe to write directly to this address,
- * as long as the write is never bigger than a page size.
- */
 void *ltt_buffers_offset_address(struct ust_buffer *buf, size_t offset)
 {
        return ((char *)buf->buf_data)+offset;
@@ -289,7 +316,7 @@ static inline int last_tsc_overflow(struct ust_buffer *ltt_buf,
  */
 enum force_switch_mode { FORCE_ACTIVE, FORCE_FLUSH };
 
-static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan);
+static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan, int cpu);
 
 static void ltt_force_switch(struct ust_buffer *buf,
                enum force_switch_mode mode);
@@ -449,9 +476,9 @@ int ust_buffers_do_put_subbuf(struct ust_buffer *buf, u32 uconsumed_old)
 
 static void ltt_relay_print_subbuffer_errors(
                struct ust_channel *channel,
-               long cons_off)
+               long cons_off, int cpu)
 {
-       struct ust_buffer *ltt_buf = channel->buf;
+       struct ust_buffer *ltt_buf = channel->buf[cpu];
        long cons_idx, commit_count, write_offset;
 
        cons_idx = SUBBUF_INDEX(cons_off, channel);
@@ -477,9 +504,9 @@ static void ltt_relay_print_subbuffer_errors(
 }
 
 static void ltt_relay_print_errors(struct ltt_trace_struct *trace,
-               struct ust_channel *channel)
+               struct ust_channel *channel, int cpu)
 {
-       struct ust_buffer *ltt_buf = channel->buf;
+       struct ust_buffer *ltt_buf = channel->buf[cpu];
        long cons_off;
 
        /*
@@ -494,13 +521,13 @@ static void ltt_relay_print_errors(struct ltt_trace_struct *trace,
                                      channel)
                         - cons_off) > 0;
                        cons_off = SUBBUF_ALIGN(cons_off, channel))
-               ltt_relay_print_subbuffer_errors(channel, cons_off);
+               ltt_relay_print_subbuffer_errors(channel, cons_off, cpu);
 }
 
-static void ltt_relay_print_buffer_errors(struct ust_channel *channel)
+static void ltt_relay_print_buffer_errors(struct ust_channel *channel, int cpu)
 {
        struct ltt_trace_struct *trace = channel->trace;
-       struct ust_buffer *ltt_buf = channel->buf;
+       struct ust_buffer *ltt_buf = channel->buf[cpu];
 
        if (local_read(&ltt_buf->events_lost))
                ERR("channel %s: %ld events lost",
@@ -511,7 +538,7 @@ static void ltt_relay_print_buffer_errors(struct ust_channel *channel)
                        channel->channel_name,
                        local_read(&ltt_buf->corrupted_subbuffers));
 
-       ltt_relay_print_errors(trace, channel);
+       ltt_relay_print_errors(trace, channel, cpu);
 }
 
 static void ltt_relay_release_channel(struct kref *kref)
@@ -616,14 +643,14 @@ static int ust_buffers_init_buffer(struct ltt_trace_struct *trace,
 }
 
 /* FIXME: use this function */
-static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan)
+static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan, int cpu)
 {
        struct ltt_trace_struct *trace = ltt_chan->trace;
-       struct ust_buffer *ltt_buf = ltt_chan->buf;
+       struct ust_buffer *ltt_buf = ltt_chan->buf[cpu];
 
        kref_put(&ltt_chan->trace->ltt_transport_kref,
                ltt_release_transport);
-       ltt_relay_print_buffer_errors(ltt_chan);
+       ltt_relay_print_buffer_errors(ltt_chan, cpu);
 //ust//        free(ltt_buf->commit_seq);
        kfree(ltt_buf->commit_count);
        ltt_buf->commit_count = NULL;
@@ -632,47 +659,59 @@ static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan)
 //ust//        wake_up_interruptible(&trace->kref_wq);
 }
 
-static void ltt_chan_alloc_ltt_buf(struct ust_channel *chan)
+static int ust_buffers_alloc_channel_buf_structs(struct ust_channel *chan)
 {
        void *ptr;
        int result;
+       size_t size;
+       int i;
 
-       /* Get one page */
-       /* FIXME: increase size if we have a seq_commit array that overflows the page */
-       size_t size = PAGE_ALIGN(1);
+       size = PAGE_ALIGN(1);
 
-       result = chan->buf_shmid = shmget(getpid(), size, IPC_CREAT | IPC_EXCL | 0700);
-       if(chan->buf_shmid == -1) {
-               PERROR("shmget");
-               return;
-       }
+       for(i=0; i<chan->n_cpus; i++) {
 
-       ptr = shmat(chan->buf_shmid, NULL, 0);
-       if(ptr == (void *) -1) {
-               perror("shmat");
-               goto destroy_shmem;
-       }
+               result = chan->buf_struct_shmids[i] = shmget(getpid(), size, IPC_CREAT | IPC_EXCL | 0700);
+               if(result == -1) {
+                       PERROR("shmget");
+                       goto destroy_previous;
+               }
 
-       /* Already mark the shared memory for destruction. This will occur only
-         * when all users have detached.
-        */
-       result = shmctl(chan->buf_shmid, IPC_RMID, NULL);
-       if(result == -1) {
-               perror("shmctl");
-               return;
+               /* FIXME: should have matching call to shmdt */
+               ptr = shmat(chan->buf_struct_shmids[i], NULL, 0);
+               if(ptr == (void *) -1) {
+                       perror("shmat");
+                       goto destroy_shm;
+               }
+
+               /* Already mark the shared memory for destruction. This will occur only
+                * when all users have detached.
+                */
+               result = shmctl(chan->buf_struct_shmids[i], IPC_RMID, NULL);
+               if(result == -1) {
+                       perror("shmctl");
+                       goto destroy_previous;
+               }
+
+               chan->buf[i] = ptr;
        }
 
-       chan->buf = ptr;
+       return 0;
 
-       return;
+       /* Jumping inside this loop occurs from within the other loop above with i as
+        * counter, so it unallocates the structures for the cpu = current_i down to
+        * zero. */
+       for(; i>=0; i--) {
+               destroy_shm:
+               result = shmctl(chan->buf_struct_shmids[i], IPC_RMID, NULL);
+               if(result == -1) {
+                       perror("shmctl");
+               }
 
-       destroy_shmem:
-       result = shmctl(chan->buf_shmid, IPC_RMID, NULL);
-       if(result == -1) {
-               perror("shmctl");
+               destroy_previous:
+               continue;
        }
 
-       return;
+       return -1;
 }
 
 /*
@@ -682,7 +721,6 @@ static int ust_buffers_create_channel(const char *trace_name, struct ltt_trace_s
        const char *channel_name, struct ust_channel *ltt_chan,
        unsigned int subbuf_size, unsigned int n_subbufs, int overwrite)
 {
-       int err = 0;
        int result;
 
        kref_init(&ltt_chan->kref);
@@ -693,29 +731,40 @@ static int ust_buffers_create_channel(const char *trace_name, struct ltt_trace_s
        ltt_chan->overwrite = overwrite;
        ltt_chan->n_subbufs_order = get_count_order(n_subbufs);
        ltt_chan->commit_count_mask = (~0UL >> ltt_chan->n_subbufs_order);
+       ltt_chan->n_cpus = get_n_cpus();
 //ust//        ltt_chan->buf = percpu_alloc_mask(sizeof(struct ltt_channel_buf_struct), GFP_KERNEL, cpu_possible_map);
+       ltt_chan->buf = (void *) malloc(ltt_chan->n_cpus * sizeof(void *));
+       if(ltt_chan->buf == NULL) {
+               goto error;
+       }
+       ltt_chan->buf_struct_shmids = (int *) malloc(ltt_chan->n_cpus * sizeof(int));
+       if(ltt_chan->buf_struct_shmids == NULL)
+               goto free_buf;
 
-       ltt_chan_alloc_ltt_buf(ltt_chan);
+       result = ust_buffers_alloc_channel_buf_structs(ltt_chan);
+       if(result != 0) {
+               goto free_buf_struct_shmids;
+       }
 
-//ust//        ltt_chan->buf = malloc(sizeof(struct ltt_channel_buf_struct));
-       if (!ltt_chan->buf)
-               goto alloc_error;
-       /* FIXME: handle error of this call */
        result = ust_buffers_channel_open(ltt_chan, subbuf_size, n_subbufs);
-       if (result == -1) {
+       if (result != 0) {
                ERR("Cannot open channel for trace %s", trace_name);
-               goto relay_open_error;
+               goto unalloc_buf_structs;
        }
 
-       err = 0;
-       goto end;
+       return 0;
+
+unalloc_buf_structs:
+       /* FIXME: put a call here to unalloc the buf structs! */
+
+free_buf_struct_shmids:
+       free(ltt_chan->buf_struct_shmids);
 
-relay_open_error:
-//ust//        percpu_free(ltt_chan->buf);
-alloc_error:
-       err = EPERM;
-end:
-       return err;
+free_buf:
+       free(ltt_chan->buf);
+
+error:
+       return -1;
 }
 
 /*
@@ -754,12 +803,12 @@ static void ltt_relay_async_wakeup_chan(struct ust_channel *ltt_channel)
 //ust//        }
 }
 
-static void ltt_relay_finish_buffer(struct ust_channel *channel)
+static void ltt_relay_finish_buffer(struct ust_channel *channel, unsigned int cpu)
 {
 //     int result;
 
-       if (channel->buf) {
-               struct ust_buffer *buf = channel->buf;
+       if (channel->buf[cpu]) {
+               struct ust_buffer *buf = channel->buf[cpu];
                ltt_relay_buffer_flush(buf);
 //ust//                ltt_relay_wake_writers(ltt_buf);
                /* closing the pipe tells the consumer the buffer is finished */
@@ -776,10 +825,11 @@ static void ltt_relay_finish_buffer(struct ust_channel *channel)
 
 static void ltt_relay_finish_channel(struct ust_channel *channel)
 {
-//ust//        unsigned int i;
+       unsigned int i;
 
-//ust//        for_each_possible_cpu(i)
-               ltt_relay_finish_buffer(channel);
+       for(i=0; i<channel->n_cpus; i++) {
+               ltt_relay_finish_buffer(channel, i);
+       }
 }
 
 static void ltt_relay_remove_channel(struct ust_channel *channel)
@@ -1182,9 +1232,9 @@ static inline void ltt_reserve_end_switch_current(
 static notrace int ltt_relay_reserve_slot(struct ltt_trace_struct *trace,
                struct ust_channel *channel, void **transport_data,
                size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc,
-               unsigned int *rflags, int largest_align)
+               unsigned int *rflags, int largest_align, int cpu)
 {
-       struct ust_buffer *buf = *transport_data = channel->buf;
+       struct ust_buffer *buf = *transport_data = channel->buf[cpu];
        struct ltt_reserve_switch_offsets offsets;
 
        offsets.reserve_commit_diff = 0;
index 019126282065f6fd6c237a88670963ed1e45f8c4..c6b13d076ecbc26d445efec8c5cbd96775cc732b 100644 (file)
@@ -69,6 +69,7 @@ struct ust_buffer {
        void *buf_data;
        size_t buf_size;
        int shmid;
+       unsigned int cpu;
 
        /* commit count per subbuffer; must be at end of struct */
        local_t commit_seq[0] ____cacheline_aligned;
index 893d8c1193b544c35ff063cbb06e1a37f3de9a7d..6f57f13de6de2b7c58800b5bda5d284f7f0f31ea 100644 (file)
@@ -28,6 +28,7 @@
 #include <kcompat/kref.h>
 
 #define EVENTS_PER_CHANNEL     65536
+#define MAX_CPUS               32
 
 struct ltt_trace_struct;
 
@@ -36,7 +37,8 @@ struct ust_buffer;
 struct ust_channel {
        /* First 32 bytes cache-hot cacheline */
        struct ltt_trace_struct *trace;
-       void *buf;
+       int *buf_struct_shmids;
+       struct ust_buffer **buf;
        int overwrite:1;
        int active:1;
        unsigned int n_subbufs_order;
@@ -65,8 +67,7 @@ struct ust_channel {
        int subbuf_size_order;
        unsigned int subbuf_cnt;
        const char *channel_name;
-
-       int buf_shmid;
+       int n_cpus;
 
        u32 version;
        size_t alloc_size;
index d603815b50621e74332f96301d1b6b9a38266200..9cb6bcaed6cd52794a01ebbc80635c197fb4b6d1 100644 (file)
@@ -24,6 +24,9 @@
  * va_list * to ltt_vtrace.
  */
 
+#define _GNU_SOURCE
+#include <unistd.h>
+#include <sys/syscall.h>
 #include <stdarg.h>
 #include <string.h>
 #include <stdint.h>
@@ -46,6 +49,11 @@ enum ltt_type {
        LTT_TYPE_NONE,
 };
 
+static int ust_get_cpu(void)
+{
+       return sched_getcpu();
+}
+
 #define LTT_ATTRIBUTE_NETWORK_BYTE_ORDER (1<<1)
 
 /*
@@ -684,7 +692,7 @@ notrace void ltt_vtrace(const struct marker *mdata, void *probe_data,
        struct ltt_serialize_closure closure;
        struct ltt_probe_private_data *private_data = call_data;
        void *serialize_private = NULL;
-//ust//        int cpu;
+       int cpu;
        unsigned int rflags;
 
        /*
@@ -696,7 +704,9 @@ notrace void ltt_vtrace(const struct marker *mdata, void *probe_data,
 
        rcu_read_lock(); //ust// rcu_read_lock_sched_notrace();
 //ust//        cpu = smp_processor_id();
+       cpu = ust_get_cpu();
 //ust//        __get_cpu_var(ltt_nesting)++;
+       /* FIXME: should nesting be per-cpu? */
        ltt_nesting++;
 
        pdata = (struct ltt_active_marker *)probe_data;
@@ -750,18 +760,26 @@ notrace void ltt_vtrace(const struct marker *mdata, void *probe_data,
                if (!channel->active)
                        continue;
 
+               /* If a new cpu was plugged since the trace was started, we did
+                * not add it to the trace, and therefore we write the event to
+                * cpu 0.
+                */
+               if(cpu >= channel->n_cpus) {
+                       cpu = 0;
+               }
+
                /* reserve space : header and data */
                ret = ltt_reserve_slot(trace, channel, &transport_data,
                                        data_size, &slot_size, &buf_offset,
                                        &tsc, &rflags,
-                                       largest_align);
+                                       largest_align, cpu);
                if (unlikely(ret < 0))
                        continue; /* buffer full */
 
                va_copy(args_copy, *args);
                /* FIXME : could probably encapsulate transport better. */
 //ust//                buf = ((struct rchan *)channel->trans_channel_data)->buf[cpu];
-               buf = channel->buf;
+               buf = channel->buf[cpu];
                /* Out-of-order write : header and data */
                buf_offset = ltt_write_event_header(trace,
                                        buf, buf_offset,
index 584a94cff8067e36626bd537bb7ac1d48731ee25..9a996d3449b8e87cb934dbcb3cdf286735144bbf 100644 (file)
@@ -165,7 +165,7 @@ void notif_cb(void)
 
 static void inform_consumer_daemon(const char *trace_name)
 {
-       int i;
+       int i,j;
        struct ltt_trace_struct *trace;
        pid_t pid = getpid();
        int result;
@@ -179,12 +179,18 @@ static void inform_consumer_daemon(const char *trace_name)
        }
 
        for(i=0; i < trace->nr_channels; i++) {
-               result = ustcomm_request_consumer(pid, trace->channels[i].channel_name);
-               if(result == -1) {
-                       WARN("Failed to request collection for channel %s. Is the daemon available?", trace->channels[i].channel_name);
-                       /* continue even if fail */
+               /* iterate on all cpus */
+               for(j=0; j<trace->channels[i].n_cpus; j++) {
+                       char *buf;
+                       asprintf(&buf, "%s_%d", trace->channels[i].channel_name, j);
+                       result = ustcomm_request_consumer(pid, buf);
+                       if(result == -1) {
+                               WARN("Failed to request collection for channel %s. Is the daemon available?", trace->channels[i].channel_name);
+                               /* continue even if fail */
+                       }
+                       free(buf);
+                       buffers_to_export++;
                }
-               buffers_to_export++;
        }
 
        finish:
@@ -273,6 +279,408 @@ void process_blocked_consumers(void)
 
 }
 
+void seperate_channel_cpu(const char *channel_and_cpu, char **channel, int *cpu)
+{
+       const char *sep;
+
+       sep = rindex(channel_and_cpu, '_');
+       if(sep == NULL) {
+               *cpu = -1;
+               sep = channel_and_cpu + strlen(channel_and_cpu);
+       }
+       else {
+               *cpu = atoi(sep+1);
+       }
+
+       asprintf(channel, "%.*s", (int)(sep-channel_and_cpu), channel_and_cpu);
+}
+
+static int do_cmd_get_shmid(const char *recvbuf, struct ustcomm_source *src)
+{
+       int retval = 0;
+       struct ltt_trace_struct *trace;
+       char trace_name[] = "auto";
+       int i;
+       char *channel_and_cpu;
+       int found = 0;
+       int result;
+       char *ch_name;
+       int ch_cpu;
+
+       DBG("get_shmid");
+
+       channel_and_cpu = nth_token(recvbuf, 1);
+       if(channel_and_cpu == NULL) {
+               ERR("get_shmid: cannot parse channel");
+               goto end;
+       }
+
+       seperate_channel_cpu(channel_and_cpu, &ch_name, &ch_cpu);
+       if(ch_cpu == -1) {
+               ERR("Problem parsing channel name");
+               goto free_short_chan_name;
+       }
+
+       ltt_lock_traces();
+       trace = _ltt_trace_find(trace_name);
+       ltt_unlock_traces();
+
+       if(trace == NULL) {
+               ERR("cannot find trace!");
+               retval = -1;
+               goto free_short_chan_name;
+       }
+
+       for(i=0; i<trace->nr_channels; i++) {
+               struct ust_channel *channel = &trace->channels[i];
+               struct ust_buffer *buf = channel->buf[ch_cpu];
+
+               if(!strcmp(trace->channels[i].channel_name, ch_name)) {
+                       char *reply;
+
+//                     DBG("the shmid for the requested channel is %d", buf->shmid);
+//                     DBG("the shmid for its buffer structure is %d", channel->buf_struct_shmids);
+                       asprintf(&reply, "%d %d", buf->shmid, channel->buf_struct_shmids[ch_cpu]);
+
+                       result = ustcomm_send_reply(&ustcomm_app.server, reply, src);
+                       if(result) {
+                               ERR("listener: get_shmid: ustcomm_send_reply failed");
+                               free(reply);
+                               retval = -1;
+                               goto free_short_chan_name;
+                       }
+
+                       free(reply);
+
+                       found = 1;
+                       break;
+               }
+       }
+
+       if(found) {
+               buffers_to_export--;
+       }
+       else {
+               ERR("get_shmid: channel not found (%s)", channel_and_cpu);
+       }
+
+       free_short_chan_name:
+       free(ch_name);
+
+       end:
+       return retval;
+}
+
+static int do_cmd_get_n_subbufs(const char *recvbuf, struct ustcomm_source *src)
+{
+       int retval = 0;
+       struct ltt_trace_struct *trace;
+       char trace_name[] = "auto";
+       int i;
+       char *channel_and_cpu;
+       int found = 0;
+       int result;
+       char *ch_name;
+       int ch_cpu;
+
+       DBG("get_n_subbufs");
+
+       channel_and_cpu = nth_token(recvbuf, 1);
+       if(channel_and_cpu == NULL) {
+               ERR("get_n_subbufs: cannot parse channel");
+               goto end;
+       }
+
+       seperate_channel_cpu(channel_and_cpu, &ch_name, &ch_cpu);
+       if(ch_cpu == -1) {
+               ERR("Problem parsing channel name");
+               goto free_short_chan_name;
+       }
+
+       ltt_lock_traces();
+       trace = _ltt_trace_find(trace_name);
+       ltt_unlock_traces();
+
+       if(trace == NULL) {
+               ERR("cannot find trace!");
+               retval = -1;
+               goto free_short_chan_name;
+       }
+
+       for(i=0; i<trace->nr_channels; i++) {
+               struct ust_channel *channel = &trace->channels[i];
+
+               if(!strcmp(trace->channels[i].channel_name, ch_name)) {
+                       char *reply;
+
+                       DBG("the n_subbufs for the requested channel is %d", channel->subbuf_cnt);
+                       asprintf(&reply, "%d", channel->subbuf_cnt);
+
+                       result = ustcomm_send_reply(&ustcomm_app.server, reply, src);
+                       if(result) {
+                               ERR("listener: get_n_subbufs: ustcomm_send_reply failed");
+                               free(reply);
+                               retval = -1;
+                               goto free_short_chan_name;
+                       }
+
+                       free(reply);
+                       found = 1;
+                       break;
+               }
+       }
+       if(found == 0) {
+               ERR("get_n_subbufs: unable to find channel");
+       }
+
+       free_short_chan_name:
+       free(ch_name);
+
+       end:
+       return retval;
+}
+
+static int do_cmd_get_subbuf_size(const char *recvbuf, struct ustcomm_source *src)
+{
+       int retval = 0;
+       struct ltt_trace_struct *trace;
+       char trace_name[] = "auto";
+       int i;
+       char *channel_and_cpu;
+       int found = 0;
+       int result;
+       char *ch_name;
+       int ch_cpu;
+
+       DBG("get_subbuf_size");
+
+       channel_and_cpu = nth_token(recvbuf, 1);
+       if(channel_and_cpu == NULL) {
+               ERR("get_subbuf_size: cannot parse channel");
+               goto end;
+       }
+
+       seperate_channel_cpu(channel_and_cpu, &ch_name, &ch_cpu);
+       if(ch_cpu == -1) {
+               ERR("Problem parsing channel name");
+               goto free_short_chan_name;
+       }
+
+       ltt_lock_traces();
+       trace = _ltt_trace_find(trace_name);
+       ltt_unlock_traces();
+
+       if(trace == NULL) {
+               ERR("cannot find trace!");
+               retval = -1;
+               goto free_short_chan_name;
+       }
+
+       for(i=0; i<trace->nr_channels; i++) {
+               struct ust_channel *channel = &trace->channels[i];
+
+               if(!strcmp(trace->channels[i].channel_name, ch_name)) {
+                       char *reply;
+
+                       DBG("the subbuf_size for the requested channel is %zd", channel->subbuf_size);
+                       asprintf(&reply, "%zd", channel->subbuf_size);
+
+                       result = ustcomm_send_reply(&ustcomm_app.server, reply, src);
+                       if(result) {
+                               ERR("listener: get_subbuf_size: ustcomm_send_reply failed");
+                               free(reply);
+                               retval = -1;
+                               goto free_short_chan_name;
+                       }
+
+                       free(reply);
+                       found = 1;
+                       break;
+               }
+       }
+       if(found == 0) {
+               ERR("get_subbuf_size: unable to find channel");
+       }
+
+       free_short_chan_name:
+       free(ch_name);
+
+       end:
+       return retval;
+}
+
+static int do_cmd_get_subbuffer(const char *recvbuf, struct ustcomm_source *src)
+{
+       int retval = 0;
+       struct ltt_trace_struct *trace;
+       char trace_name[] = "auto";
+       int i;
+       char *channel_and_cpu;
+       int found = 0;
+       char *ch_name;
+       int ch_cpu;
+
+       DBG("get_subbuf");
+
+       channel_and_cpu = nth_token(recvbuf, 1);
+       if(channel_and_cpu == NULL) {
+               ERR("get_subbuf: cannot parse channel");
+               goto end;
+       }
+
+       seperate_channel_cpu(channel_and_cpu, &ch_name, &ch_cpu);
+       if(ch_cpu == -1) {
+               ERR("Problem parsing channel name");
+               goto free_short_chan_name;
+       }
+
+       ltt_lock_traces();
+       trace = _ltt_trace_find(trace_name);
+       ltt_unlock_traces();
+
+       if(trace == NULL) {
+               ERR("cannot find trace!");
+               retval = -1;
+               goto free_short_chan_name;
+       }
+
+       for(i=0; i<trace->nr_channels; i++) {
+               struct ust_channel *channel = &trace->channels[i];
+
+               if(!strcmp(trace->channels[i].channel_name, ch_name)) {
+                       struct ust_buffer *buf = channel->buf[ch_cpu];
+                       struct blocked_consumer *bc;
+
+                       found = 1;
+
+                       bc = (struct blocked_consumer *) malloc(sizeof(struct blocked_consumer));
+                       if(bc == NULL) {
+                               ERR("malloc returned NULL");
+                               goto free_short_chan_name;
+                       }
+                       bc->fd_consumer = src->fd;
+                       bc->fd_producer = buf->data_ready_fd_read;
+                       bc->buf = buf;
+                       bc->src = *src;
+                       bc->server = ustcomm_app.server;
+
+                       list_add(&bc->list, &blocked_consumers);
+
+                       break;
+               }
+       }
+       if(found == 0) {
+               ERR("get_subbuf: unable to find channel");
+       }
+
+       free_short_chan_name:
+       free(ch_name);
+
+       end:
+       return retval;
+}
+
+static int do_cmd_put_subbuffer(const char *recvbuf, struct ustcomm_source *src)
+{
+       int retval = 0;
+       struct ltt_trace_struct *trace;
+       char trace_name[] = "auto";
+       int i;
+       char *channel_and_cpu;
+       int found = 0;
+       int result;
+       char *ch_name;
+       int ch_cpu;
+       long consumed_old;
+       char *consumed_old_str;
+       char *endptr;
+
+       DBG("put_subbuf");
+
+       channel_and_cpu = strdup_malloc(nth_token(recvbuf, 1));
+       if(channel_and_cpu == NULL) {
+               ERR("put_subbuf_size: cannot parse channel");
+               retval = -1;
+               goto end;
+       }
+
+       consumed_old_str = strdup_malloc(nth_token(recvbuf, 2));
+       if(consumed_old_str == NULL) {
+               ERR("put_subbuf: cannot parse consumed_old");
+               retval = -1;
+               goto free_channel_and_cpu;
+       }
+       consumed_old = strtol(consumed_old_str, &endptr, 10);
+       if(*endptr != '\0') {
+               ERR("put_subbuf: invalid value for consumed_old");
+               retval = -1;
+               goto free_consumed_old_str;
+       }
+
+       seperate_channel_cpu(channel_and_cpu, &ch_name, &ch_cpu);
+       if(ch_cpu == -1) {
+               ERR("Problem parsing channel name");
+               retval = -1;
+               goto free_short_chan_name;
+       }
+
+       ltt_lock_traces();
+       trace = _ltt_trace_find(trace_name);
+       ltt_unlock_traces();
+
+       if(trace == NULL) {
+               ERR("cannot find trace!");
+               retval = -1;
+               goto free_short_chan_name;
+       }
+
+       for(i=0; i<trace->nr_channels; i++) {
+               struct ust_channel *channel = &trace->channels[i];
+
+               if(!strcmp(trace->channels[i].channel_name, ch_name)) {
+                       struct ust_buffer *buf = channel->buf[ch_cpu];
+                       char *reply;
+                       long consumed_old=0;
+
+                       found = 1;
+
+                       result = ust_buffers_do_put_subbuf(buf, consumed_old);
+                       if(result < 0) {
+                               WARN("ust_buffers_do_put_subbuf: error (subbuf=%s)", channel_and_cpu);
+                               asprintf(&reply, "%s", "ERROR");
+                       }
+                       else {
+                               DBG("ust_buffers_do_put_subbuf: success (subbuf=%s)", channel_and_cpu);
+                               asprintf(&reply, "%s", "OK");
+                       }
+
+                       result = ustcomm_send_reply(&ustcomm_app.server, reply, src);
+                       if(result) {
+                               ERR("listener: put_subbuf: ustcomm_send_reply failed");
+                               free(reply);
+                               retval = -1;
+                               goto free_channel_and_cpu;
+                       }
+
+                       free(reply);
+                       break;
+               }
+       }
+       if(found == 0) {
+               ERR("get_subbuf_size: unable to find channel");
+       }
+
+       free_channel_and_cpu:
+       free(channel_and_cpu);
+       free_consumed_old_str:
+       free(consumed_old_str);
+       free_short_chan_name:
+       free(ch_name);
+
+       end:
+       return retval;
+}
+
 void *listener_main(void *p)
 {
        int result;
@@ -423,140 +831,13 @@ void *listener_main(void *p)
                        }
                }
                else if(nth_token_is(recvbuf, "get_shmid", 0) == 1) {
-                       struct ltt_trace_struct *trace;
-                       char trace_name[] = "auto";
-                       int i;
-                       char *channel_name;
-
-                       DBG("get_shmid");
-
-                       channel_name = nth_token(recvbuf, 1);
-                       if(channel_name == NULL) {
-                               ERR("get_shmid: cannot parse channel");
-                               goto next_cmd;
-                       }
-
-                       ltt_lock_traces();
-                       trace = _ltt_trace_find(trace_name);
-                       ltt_unlock_traces();
-
-                       if(trace == NULL) {
-                               ERR("cannot find trace!");
-                               return (void *)1;
-                       }
-
-                       for(i=0; i<trace->nr_channels; i++) {
-                               struct ust_channel *channel = &trace->channels[i];
-                               struct ust_buffer *buf = channel->buf;
-
-                               if(!strcmp(trace->channels[i].channel_name, channel_name)) {
-                                       char *reply;
-
-                                       DBG("the shmid for the requested channel is %d", buf->shmid);
-                                       DBG("the shmid for its buffer structure is %d", channel->buf_shmid);
-                                       asprintf(&reply, "%d %d", buf->shmid, channel->buf_shmid);
-
-                                       result = ustcomm_send_reply(&ustcomm_app.server, reply, &src);
-                                       if(result) {
-                                               ERR("listener: get_shmid: ustcomm_send_reply failed");
-                                               goto next_cmd;
-                                       }
-
-                                       free(reply);
-
-                                       break;
-                               }
-                       }
-
-                       buffers_to_export--;
+                       do_cmd_get_shmid(recvbuf, &src);
                }
                else if(nth_token_is(recvbuf, "get_n_subbufs", 0) == 1) {
-                       struct ltt_trace_struct *trace;
-                       char trace_name[] = "auto";
-                       int i;
-                       char *channel_name;
-
-                       DBG("get_n_subbufs");
-
-                       channel_name = nth_token(recvbuf, 1);
-                       if(channel_name == NULL) {
-                               ERR("get_n_subbufs: cannot parse channel");
-                               goto next_cmd;
-                       }
-
-                       ltt_lock_traces();
-                       trace = _ltt_trace_find(trace_name);
-                       ltt_unlock_traces();
-
-                       if(trace == NULL) {
-                               ERR("cannot find trace!");
-                               return (void *)1;
-                       }
-
-                       for(i=0; i<trace->nr_channels; i++) {
-                               struct ust_channel *channel = &trace->channels[i];
-
-                               if(!strcmp(trace->channels[i].channel_name, channel_name)) {
-                                       char *reply;
-
-                                       DBG("the n_subbufs for the requested channel is %d", channel->subbuf_cnt);
-                                       asprintf(&reply, "%d", channel->subbuf_cnt);
-
-                                       result = ustcomm_send_reply(&ustcomm_app.server, reply, &src);
-                                       if(result) {
-                                               ERR("listener: get_n_subbufs: ustcomm_send_reply failed");
-                                               goto next_cmd;
-                                       }
-
-                                       free(reply);
-
-                                       break;
-                               }
-                       }
+                       do_cmd_get_n_subbufs(recvbuf, &src);
                }
                else if(nth_token_is(recvbuf, "get_subbuf_size", 0) == 1) {
-                       struct ltt_trace_struct *trace;
-                       char trace_name[] = "auto";
-                       int i;
-                       char *channel_name;
-
-                       DBG("get_subbuf_size");
-
-                       channel_name = nth_token(recvbuf, 1);
-                       if(channel_name == NULL) {
-                               ERR("get_subbuf_size: cannot parse channel");
-                               goto next_cmd;
-                       }
-
-                       ltt_lock_traces();
-                       trace = _ltt_trace_find(trace_name);
-                       ltt_unlock_traces();
-
-                       if(trace == NULL) {
-                               ERR("cannot find trace!");
-                               return (void *)1;
-                       }
-
-                       for(i=0; i<trace->nr_channels; i++) {
-                               struct ust_channel *channel = &trace->channels[i];
-
-                               if(!strcmp(trace->channels[i].channel_name, channel_name)) {
-                                       char *reply;
-
-                                       DBG("the subbuf_size for the requested channel is %zd", channel->subbuf_size);
-                                       asprintf(&reply, "%zd", channel->subbuf_size);
-
-                                       result = ustcomm_send_reply(&ustcomm_app.server, reply, &src);
-                                       if(result) {
-                                               ERR("listener: get_subbuf_size: ustcomm_send_reply failed");
-                                               goto next_cmd;
-                                       }
-
-                                       free(reply);
-
-                                       break;
-                               }
-                       }
+                       do_cmd_get_subbuf_size(recvbuf, &src);
                }
                else if(nth_token_is(recvbuf, "load_probe_lib", 0) == 1) {
                        char *libfile;
@@ -564,123 +845,14 @@ void *listener_main(void *p)
                        libfile = nth_token(recvbuf, 1);
 
                        DBG("load_probe_lib loading %s", libfile);
+
+                       free(libfile);
                }
                else if(nth_token_is(recvbuf, "get_subbuffer", 0) == 1) {
-                       struct ltt_trace_struct *trace;
-                       char trace_name[] = "auto";
-                       int i;
-                       char *channel_name;
-
-                       DBG("get_subbuf");
-
-                       channel_name = nth_token(recvbuf, 1);
-                       if(channel_name == NULL) {
-                               ERR("get_subbuf: cannot parse channel");
-                               goto next_cmd;
-                       }
-
-                       ltt_lock_traces();
-                       trace = _ltt_trace_find(trace_name);
-                       ltt_unlock_traces();
-
-                       if(trace == NULL) {
-                               ERR("cannot find trace!");
-                               return (void *)1;
-                       }
-
-                       for(i=0; i<trace->nr_channels; i++) {
-                               struct ust_channel *channel = &trace->channels[i];
-
-                               if(!strcmp(trace->channels[i].channel_name, channel_name)) {
-                                       struct ust_buffer *buf = channel->buf;
-                                       struct blocked_consumer *bc;
-
-                                       bc = (struct blocked_consumer *) malloc(sizeof(struct blocked_consumer));
-                                       if(bc == NULL) {
-                                               ERR("malloc returned NULL");
-                                               goto next_cmd;
-                                       }
-                                       bc->fd_consumer = src.fd;
-                                       bc->fd_producer = buf->data_ready_fd_read;
-                                       bc->buf = buf;
-                                       bc->src = src;
-                                       bc->server = ustcomm_app.server;
-
-                                       list_add(&bc->list, &blocked_consumers);
-
-                                       break;
-                               }
-                       }
+                       do_cmd_get_subbuffer(recvbuf, &src);
                }
                else if(nth_token_is(recvbuf, "put_subbuffer", 0) == 1) {
-                       struct ltt_trace_struct *trace;
-                       char trace_name[] = "auto";
-                       int i;
-                       char *channel_name;
-                       long consumed_old;
-                       char *consumed_old_str;
-                       char *endptr;
-
-                       DBG("put_subbuf");
-
-                       channel_name = strdup_malloc(nth_token(recvbuf, 1));
-                       if(channel_name == NULL) {
-                               ERR("put_subbuf_size: cannot parse channel");
-                               goto next_cmd;
-                       }
-
-                       consumed_old_str = strdup_malloc(nth_token(recvbuf, 2));
-                       if(consumed_old_str == NULL) {
-                               ERR("put_subbuf: cannot parse consumed_old");
-                               goto next_cmd;
-                       }
-                       consumed_old = strtol(consumed_old_str, &endptr, 10);
-                       if(*endptr != '\0') {
-                               ERR("put_subbuf: invalid value for consumed_old");
-                               goto next_cmd;
-                       }
-
-                       ltt_lock_traces();
-                       trace = _ltt_trace_find(trace_name);
-                       ltt_unlock_traces();
-
-                       if(trace == NULL) {
-                               ERR("cannot find trace!");
-                               return (void *)1;
-                       }
-
-                       for(i=0; i<trace->nr_channels; i++) {
-                               struct ust_channel *channel = &trace->channels[i];
-
-                               if(!strcmp(trace->channels[i].channel_name, channel_name)) {
-                                       struct ust_buffer *buf = channel->buf;
-                                       char *reply;
-                                       long consumed_old=0;
-
-                                       result = ust_buffers_do_put_subbuf(buf, consumed_old);
-                                       if(result < 0) {
-                                               WARN("ust_buffers_do_put_subbuf: error (subbuf=%s)", channel_name);
-                                               asprintf(&reply, "%s", "ERROR");
-                                       }
-                                       else {
-                                               DBG("ust_buffers_do_put_subbuf: success (subbuf=%s)", channel_name);
-                                               asprintf(&reply, "%s", "OK");
-                                       }
-
-                                       result = ustcomm_send_reply(&ustcomm_app.server, reply, &src);
-                                       if(result) {
-                                               ERR("listener: put_subbuf: ustcomm_send_reply failed");
-                                               goto next_cmd;
-                                       }
-
-                                       free(reply);
-
-                                       break;
-                               }
-                       }
-
-                       free(channel_name);
-                       free(consumed_old_str);
+                       do_cmd_put_subbuffer(recvbuf, &src);
                }
                else if(nth_token_is(recvbuf, "enable_marker", 0) == 1) {
                        char *channel_slash_name = nth_token(recvbuf, 1);
@@ -1190,7 +1362,6 @@ void ust_before_fork(ust_fork_info_t *fork_info)
 static void ust_after_fork_common(ust_fork_info_t *fork_info)
 {
        int result;
-       sigset_t orig_sigs;
 
         /* Restore signals */
         result = sigprocmask(SIG_SETMASK, &fork_info->orig_sigs, NULL);
index aec55f43223b382432b298b43dce170d1d75506b..5e4e1c0919e207f8eed2d54fcdcf861d6ba8c4e8 100644 (file)
@@ -680,7 +680,7 @@ int ltt_trace_alloc(const char *trace_name)
 //ust//        local_irq_save(flags);
        trace->start_freq = trace_clock_frequency();
        trace->start_tsc = trace_clock_read64();
-       gettimeofday(&trace->start_time, NULL); //ust// changed
+       gettimeofday(&trace->start_time, NULL); //ust// changed /* FIXME: is this ok? */
 //ust//        local_irq_restore(flags);
 
        for (chan = 0; chan < trace->nr_channels; chan++) {
index b0767543eefbbcd8095a3555d4ae03afc5263a5d..b1fbf107c0ae7beba094729ee5ba4144f04345fc 100644 (file)
@@ -133,7 +133,7 @@ struct ltt_trace_ops {
                                void **transport_data, size_t data_size,
                                size_t *slot_size, long *buf_offset, u64 *tsc,
                                unsigned int *rflags,
-                               int largest_align);
+                               int largest_align, int cpu);
 //ust//        void (*commit_slot) (struct ltt_channel_struct *channel,
 //ust//                                void **transport_data, long buf_offset,
 //ust//                                size_t slot_size);
@@ -154,7 +154,7 @@ struct ltt_trace_ops {
        void (*remove_channel) (struct ust_channel *channel);
        void (*user_errors) (struct ltt_trace_struct *trace,
                                unsigned int index, size_t data_size,
-                               struct user_dbg_data *dbg);
+                               struct user_dbg_data *dbg, unsigned int cpu);
 } ____cacheline_aligned;
 
 struct ltt_transport {
@@ -457,11 +457,11 @@ static __inline__ int ltt_reserve_slot(
                long *buf_offset,
                u64 *tsc,
                unsigned int *rflags,
-               int largest_align)
+               int largest_align, int cpu)
 {
        return trace->ops->reserve_slot(trace, channel, transport_data,
                        data_size, slot_size, buf_offset, tsc, rflags,
-                       largest_align);
+                       largest_align, cpu);
 }
 
 
index 26f87a2ba3f991f0e7a5acba10aa8a6d3b539b17..969c192abdb66bea7b9ce93dd48d7a994b5fe70c 100644 (file)
@@ -259,7 +259,7 @@ struct buffer_info *connect_buffer(pid_t pid, const char *bufname)
 
        result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid);
        if(result != 2) {
-               ERR("unable to parse response to get_shmid");
+               ERR("unable to parse response to get_shmid (\"%s\")", received_msg);
                return NULL;
        }
        free(received_msg);
@@ -342,7 +342,7 @@ struct buffer_info *connect_buffer(pid_t pid, const char *bufname)
        }
        free(tmp);
 
-       asprintf(&tmp, "%s/%u_%lld/%s_0", trace_path, buf->pid, buf->pidunique, buf->name);
+       asprintf(&tmp, "%s/%u_%lld/%s", trace_path, buf->pid, buf->pidunique, buf->name);
        result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC | O_EXCL, 00600);
        if(result == -1) {
                PERROR("open");
This page took 0.043494 seconds and 4 git commands to generate.