From: Pierre-Marc Fournier Date: Tue, 2 Feb 2010 02:12:40 +0000 (-0500) Subject: Convert buffering system to per-cpu X-Git-Tag: v1.9.1~760 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=204141ee9da22a244c9095287f4f1c513784b171;p=lttng-ust.git Convert buffering system to per-cpu The cpu count is fixed at trace allocation. If the current cpu happens to be out of range, the event is put in the buffer of cpu0. --- diff --git a/libust/buffers.c b/libust/buffers.c index 504a79da..13660a89 100644 --- a/libust/buffers.c +++ b/libust/buffers.c @@ -20,6 +20,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +#include #include #include #include @@ -35,6 +36,25 @@ static DEFINE_MUTEX(ust_buffers_channels_mutex); static LIST_HEAD(ust_buffers_channels); +static int get_n_cpus(void) +{ + int result; + static int n_cpus = 0; + + if(n_cpus) { + return n_cpus; + } + + result = sysconf(_SC_NPROCESSORS_ONLN); + if(result == -1) { + return -1; + } + + n_cpus = result; + + return result; +} + static int ust_buffers_init_buffer(struct ltt_trace_struct *trace, struct ust_channel *ltt_chan, struct ust_buffer *buf, @@ -57,6 +77,7 @@ static int ust_buffers_alloc_buf(struct ust_buffer *buf, size_t *size) return -1; } + /* FIXME: should have matching call to shmdt */ ptr = shmat(buf->shmid, NULL, 0); if(ptr == (void *) -1) { perror("shmat"); @@ -86,20 +107,19 @@ static int ust_buffers_alloc_buf(struct ust_buffer *buf, size_t *size) return -1; } -static struct ust_buffer *ust_buffers_create_buf(struct ust_channel *channel) +int ust_buffers_create_buf(struct ust_channel *channel, int cpu) { int result; + struct ust_buffer *buf = channel->buf[cpu]; - result = ust_buffers_alloc_buf(channel->buf, &channel->alloc_size); + buf->cpu = cpu; + result = ust_buffers_alloc_buf(buf, &channel->alloc_size); if(result) - goto free_buf; + return -1; - ((struct ust_buffer *)channel->buf)->chan = channel; + buf->chan = channel; kref_get(&channel->kref); - return channel->buf; - -free_buf: - return NULL; + return 0; } static void ust_buffers_destroy_channel(struct kref *kref) @@ -118,6 +138,7 @@ static void ust_buffers_destroy_buf(struct ust_buffer *buf) PERROR("munmap"); } +//ust// chan->buf[buf->cpu] = NULL; free(buf); kref_put(&chan->kref, ust_buffers_destroy_channel); } @@ -129,23 +150,21 @@ static void ust_buffers_remove_buf(struct kref *kref) ust_buffers_destroy_buf(buf); } -static struct ust_buffer *ust_buffers_open_buf(struct ust_channel *chan) +int ust_buffers_open_buf(struct ust_channel *chan, int cpu) { - struct ust_buffer *buf = NULL; - int err; - - buf = ust_buffers_create_buf(chan); - if (!buf) - return NULL; + int result; - kref_init(&buf->kref); + result = ust_buffers_create_buf(chan, cpu); + if (result == -1) + return -1; - err = ust_buffers_init_buffer(chan->trace, chan, buf, chan->subbuf_cnt); + kref_init(&chan->buf[cpu]->kref); - if (err) - return ERR_PTR(err); + result = ust_buffers_init_buffer(chan->trace, chan, chan->buf[cpu], chan->subbuf_cnt); + if(result == -1) + return -1; - return buf; + return 0; /* FIXME: decrementally destroy on error? */ } @@ -161,6 +180,9 @@ static void ust_buffers_close_buf(struct ust_buffer *buf) int ust_buffers_channel_open(struct ust_channel *chan, size_t subbuf_size, size_t subbuf_cnt) { + int i; + int result; + if(subbuf_size == 0 || subbuf_cnt == 0) return -1; @@ -169,18 +191,27 @@ int ust_buffers_channel_open(struct ust_channel *chan, size_t subbuf_size, size_ chan->subbuf_size = subbuf_size; chan->subbuf_size_order = get_count_order(subbuf_size); chan->alloc_size = FIX_SIZE(subbuf_size * subbuf_cnt); + kref_init(&chan->kref); mutex_lock(&ust_buffers_channels_mutex); - chan->buf = ust_buffers_open_buf(chan); - if (!chan->buf) - goto error; + for(i=0; in_cpus; i++) { + result = ust_buffers_open_buf(chan, i); + if (result == -1) + goto error; + } list_add(&chan->list, &ust_buffers_channels); mutex_unlock(&ust_buffers_channels_mutex); return 0; - error: + /* Jump directly inside the loop to close the buffers that were already + * opened. */ + for(; i>=0; i--) { + ust_buffers_close_buf(chan->buf[i]); +error: + } + kref_put(&chan->kref, ust_buffers_destroy_channel); mutex_unlock(&ust_buffers_channels_mutex); return -1; @@ -188,12 +219,17 @@ int ust_buffers_channel_open(struct ust_channel *chan, size_t subbuf_size, size_ void ust_buffers_channel_close(struct ust_channel *chan) { - if (!chan) + int i; + if(!chan) return; mutex_lock(&ust_buffers_channels_mutex); - if (chan->buf) - ust_buffers_close_buf(chan->buf); + for(i=0; in_cpus; i++) { + /* FIXME: if we make it here, then all buffers were necessarily allocated. Moreover, we don't + * initialize to NULL so we cannot use this check. Should we? */ +//ust// if (chan->buf[i]) + ust_buffers_close_buf(chan->buf[i]); + } list_del(&chan->list); kref_put(&chan->kref, ust_buffers_destroy_channel); @@ -216,6 +252,7 @@ void _ust_buffers_write(struct ust_buffer *buf, size_t offset, len -= cpy; src += cpy; offset += cpy; + WARN_ON(offset >= buf->buf_size); cpy = min_t(size_t, len, buf->buf_size - offset); @@ -223,16 +260,6 @@ void _ust_buffers_write(struct ust_buffer *buf, size_t offset, } while (unlikely(len != cpy)); } -/** - * ltt_buffers_offset_address - get address of a location within the buffer - * @buf : buffer - * @offset : offset within the buffer. - * - * Return the address where a given offset is located. - * Should be used to get the current subbuffer header pointer. Given we know - * it's never on a page boundary, it's safe to write directly to this address, - * as long as the write is never bigger than a page size. - */ void *ltt_buffers_offset_address(struct ust_buffer *buf, size_t offset) { return ((char *)buf->buf_data)+offset; @@ -289,7 +316,7 @@ static inline int last_tsc_overflow(struct ust_buffer *ltt_buf, */ enum force_switch_mode { FORCE_ACTIVE, FORCE_FLUSH }; -static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan); +static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan, int cpu); static void ltt_force_switch(struct ust_buffer *buf, enum force_switch_mode mode); @@ -449,9 +476,9 @@ int ust_buffers_do_put_subbuf(struct ust_buffer *buf, u32 uconsumed_old) static void ltt_relay_print_subbuffer_errors( struct ust_channel *channel, - long cons_off) + long cons_off, int cpu) { - struct ust_buffer *ltt_buf = channel->buf; + struct ust_buffer *ltt_buf = channel->buf[cpu]; long cons_idx, commit_count, write_offset; cons_idx = SUBBUF_INDEX(cons_off, channel); @@ -477,9 +504,9 @@ static void ltt_relay_print_subbuffer_errors( } static void ltt_relay_print_errors(struct ltt_trace_struct *trace, - struct ust_channel *channel) + struct ust_channel *channel, int cpu) { - struct ust_buffer *ltt_buf = channel->buf; + struct ust_buffer *ltt_buf = channel->buf[cpu]; long cons_off; /* @@ -494,13 +521,13 @@ static void ltt_relay_print_errors(struct ltt_trace_struct *trace, channel) - cons_off) > 0; cons_off = SUBBUF_ALIGN(cons_off, channel)) - ltt_relay_print_subbuffer_errors(channel, cons_off); + ltt_relay_print_subbuffer_errors(channel, cons_off, cpu); } -static void ltt_relay_print_buffer_errors(struct ust_channel *channel) +static void ltt_relay_print_buffer_errors(struct ust_channel *channel, int cpu) { struct ltt_trace_struct *trace = channel->trace; - struct ust_buffer *ltt_buf = channel->buf; + struct ust_buffer *ltt_buf = channel->buf[cpu]; if (local_read(<t_buf->events_lost)) ERR("channel %s: %ld events lost", @@ -511,7 +538,7 @@ static void ltt_relay_print_buffer_errors(struct ust_channel *channel) channel->channel_name, local_read(<t_buf->corrupted_subbuffers)); - ltt_relay_print_errors(trace, channel); + ltt_relay_print_errors(trace, channel, cpu); } static void ltt_relay_release_channel(struct kref *kref) @@ -616,14 +643,14 @@ static int ust_buffers_init_buffer(struct ltt_trace_struct *trace, } /* FIXME: use this function */ -static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan) +static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan, int cpu) { struct ltt_trace_struct *trace = ltt_chan->trace; - struct ust_buffer *ltt_buf = ltt_chan->buf; + struct ust_buffer *ltt_buf = ltt_chan->buf[cpu]; kref_put(<t_chan->trace->ltt_transport_kref, ltt_release_transport); - ltt_relay_print_buffer_errors(ltt_chan); + ltt_relay_print_buffer_errors(ltt_chan, cpu); //ust// free(ltt_buf->commit_seq); kfree(ltt_buf->commit_count); ltt_buf->commit_count = NULL; @@ -632,47 +659,59 @@ static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan) //ust// wake_up_interruptible(&trace->kref_wq); } -static void ltt_chan_alloc_ltt_buf(struct ust_channel *chan) +static int ust_buffers_alloc_channel_buf_structs(struct ust_channel *chan) { void *ptr; int result; + size_t size; + int i; - /* Get one page */ - /* FIXME: increase size if we have a seq_commit array that overflows the page */ - size_t size = PAGE_ALIGN(1); + size = PAGE_ALIGN(1); - result = chan->buf_shmid = shmget(getpid(), size, IPC_CREAT | IPC_EXCL | 0700); - if(chan->buf_shmid == -1) { - PERROR("shmget"); - return; - } + for(i=0; in_cpus; i++) { - ptr = shmat(chan->buf_shmid, NULL, 0); - if(ptr == (void *) -1) { - perror("shmat"); - goto destroy_shmem; - } + result = chan->buf_struct_shmids[i] = shmget(getpid(), size, IPC_CREAT | IPC_EXCL | 0700); + if(result == -1) { + PERROR("shmget"); + goto destroy_previous; + } - /* Already mark the shared memory for destruction. This will occur only - * when all users have detached. - */ - result = shmctl(chan->buf_shmid, IPC_RMID, NULL); - if(result == -1) { - perror("shmctl"); - return; + /* FIXME: should have matching call to shmdt */ + ptr = shmat(chan->buf_struct_shmids[i], NULL, 0); + if(ptr == (void *) -1) { + perror("shmat"); + goto destroy_shm; + } + + /* Already mark the shared memory for destruction. This will occur only + * when all users have detached. + */ + result = shmctl(chan->buf_struct_shmids[i], IPC_RMID, NULL); + if(result == -1) { + perror("shmctl"); + goto destroy_previous; + } + + chan->buf[i] = ptr; } - chan->buf = ptr; + return 0; - return; + /* Jumping inside this loop occurs from within the other loop above with i as + * counter, so it unallocates the structures for the cpu = current_i down to + * zero. */ + for(; i>=0; i--) { + destroy_shm: + result = shmctl(chan->buf_struct_shmids[i], IPC_RMID, NULL); + if(result == -1) { + perror("shmctl"); + } - destroy_shmem: - result = shmctl(chan->buf_shmid, IPC_RMID, NULL); - if(result == -1) { - perror("shmctl"); + destroy_previous: + continue; } - return; + return -1; } /* @@ -682,7 +721,6 @@ static int ust_buffers_create_channel(const char *trace_name, struct ltt_trace_s const char *channel_name, struct ust_channel *ltt_chan, unsigned int subbuf_size, unsigned int n_subbufs, int overwrite) { - int err = 0; int result; kref_init(<t_chan->kref); @@ -693,29 +731,40 @@ static int ust_buffers_create_channel(const char *trace_name, struct ltt_trace_s ltt_chan->overwrite = overwrite; ltt_chan->n_subbufs_order = get_count_order(n_subbufs); ltt_chan->commit_count_mask = (~0UL >> ltt_chan->n_subbufs_order); + ltt_chan->n_cpus = get_n_cpus(); //ust// ltt_chan->buf = percpu_alloc_mask(sizeof(struct ltt_channel_buf_struct), GFP_KERNEL, cpu_possible_map); + ltt_chan->buf = (void *) malloc(ltt_chan->n_cpus * sizeof(void *)); + if(ltt_chan->buf == NULL) { + goto error; + } + ltt_chan->buf_struct_shmids = (int *) malloc(ltt_chan->n_cpus * sizeof(int)); + if(ltt_chan->buf_struct_shmids == NULL) + goto free_buf; - ltt_chan_alloc_ltt_buf(ltt_chan); + result = ust_buffers_alloc_channel_buf_structs(ltt_chan); + if(result != 0) { + goto free_buf_struct_shmids; + } -//ust// ltt_chan->buf = malloc(sizeof(struct ltt_channel_buf_struct)); - if (!ltt_chan->buf) - goto alloc_error; - /* FIXME: handle error of this call */ result = ust_buffers_channel_open(ltt_chan, subbuf_size, n_subbufs); - if (result == -1) { + if (result != 0) { ERR("Cannot open channel for trace %s", trace_name); - goto relay_open_error; + goto unalloc_buf_structs; } - err = 0; - goto end; + return 0; + +unalloc_buf_structs: + /* FIXME: put a call here to unalloc the buf structs! */ + +free_buf_struct_shmids: + free(ltt_chan->buf_struct_shmids); -relay_open_error: -//ust// percpu_free(ltt_chan->buf); -alloc_error: - err = EPERM; -end: - return err; +free_buf: + free(ltt_chan->buf); + +error: + return -1; } /* @@ -754,12 +803,12 @@ static void ltt_relay_async_wakeup_chan(struct ust_channel *ltt_channel) //ust// } } -static void ltt_relay_finish_buffer(struct ust_channel *channel) +static void ltt_relay_finish_buffer(struct ust_channel *channel, unsigned int cpu) { // int result; - if (channel->buf) { - struct ust_buffer *buf = channel->buf; + if (channel->buf[cpu]) { + struct ust_buffer *buf = channel->buf[cpu]; ltt_relay_buffer_flush(buf); //ust// ltt_relay_wake_writers(ltt_buf); /* closing the pipe tells the consumer the buffer is finished */ @@ -776,10 +825,11 @@ static void ltt_relay_finish_buffer(struct ust_channel *channel) static void ltt_relay_finish_channel(struct ust_channel *channel) { -//ust// unsigned int i; + unsigned int i; -//ust// for_each_possible_cpu(i) - ltt_relay_finish_buffer(channel); + for(i=0; in_cpus; i++) { + ltt_relay_finish_buffer(channel, i); + } } static void ltt_relay_remove_channel(struct ust_channel *channel) @@ -1182,9 +1232,9 @@ static inline void ltt_reserve_end_switch_current( static notrace int ltt_relay_reserve_slot(struct ltt_trace_struct *trace, struct ust_channel *channel, void **transport_data, size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc, - unsigned int *rflags, int largest_align) + unsigned int *rflags, int largest_align, int cpu) { - struct ust_buffer *buf = *transport_data = channel->buf; + struct ust_buffer *buf = *transport_data = channel->buf[cpu]; struct ltt_reserve_switch_offsets offsets; offsets.reserve_commit_diff = 0; diff --git a/libust/buffers.h b/libust/buffers.h index 01912628..c6b13d07 100644 --- a/libust/buffers.h +++ b/libust/buffers.h @@ -69,6 +69,7 @@ struct ust_buffer { void *buf_data; size_t buf_size; int shmid; + unsigned int cpu; /* commit count per subbuffer; must be at end of struct */ local_t commit_seq[0] ____cacheline_aligned; diff --git a/libust/channels.h b/libust/channels.h index 893d8c11..6f57f13d 100644 --- a/libust/channels.h +++ b/libust/channels.h @@ -28,6 +28,7 @@ #include #define EVENTS_PER_CHANNEL 65536 +#define MAX_CPUS 32 struct ltt_trace_struct; @@ -36,7 +37,8 @@ struct ust_buffer; struct ust_channel { /* First 32 bytes cache-hot cacheline */ struct ltt_trace_struct *trace; - void *buf; + int *buf_struct_shmids; + struct ust_buffer **buf; int overwrite:1; int active:1; unsigned int n_subbufs_order; @@ -65,8 +67,7 @@ struct ust_channel { int subbuf_size_order; unsigned int subbuf_cnt; const char *channel_name; - - int buf_shmid; + int n_cpus; u32 version; size_t alloc_size; diff --git a/libust/serialize.c b/libust/serialize.c index d603815b..9cb6bcae 100644 --- a/libust/serialize.c +++ b/libust/serialize.c @@ -24,6 +24,9 @@ * va_list * to ltt_vtrace. */ +#define _GNU_SOURCE +#include +#include #include #include #include @@ -46,6 +49,11 @@ enum ltt_type { LTT_TYPE_NONE, }; +static int ust_get_cpu(void) +{ + return sched_getcpu(); +} + #define LTT_ATTRIBUTE_NETWORK_BYTE_ORDER (1<<1) /* @@ -684,7 +692,7 @@ notrace void ltt_vtrace(const struct marker *mdata, void *probe_data, struct ltt_serialize_closure closure; struct ltt_probe_private_data *private_data = call_data; void *serialize_private = NULL; -//ust// int cpu; + int cpu; unsigned int rflags; /* @@ -696,7 +704,9 @@ notrace void ltt_vtrace(const struct marker *mdata, void *probe_data, rcu_read_lock(); //ust// rcu_read_lock_sched_notrace(); //ust// cpu = smp_processor_id(); + cpu = ust_get_cpu(); //ust// __get_cpu_var(ltt_nesting)++; + /* FIXME: should nesting be per-cpu? */ ltt_nesting++; pdata = (struct ltt_active_marker *)probe_data; @@ -750,18 +760,26 @@ notrace void ltt_vtrace(const struct marker *mdata, void *probe_data, if (!channel->active) continue; + /* If a new cpu was plugged since the trace was started, we did + * not add it to the trace, and therefore we write the event to + * cpu 0. + */ + if(cpu >= channel->n_cpus) { + cpu = 0; + } + /* reserve space : header and data */ ret = ltt_reserve_slot(trace, channel, &transport_data, data_size, &slot_size, &buf_offset, &tsc, &rflags, - largest_align); + largest_align, cpu); if (unlikely(ret < 0)) continue; /* buffer full */ va_copy(args_copy, *args); /* FIXME : could probably encapsulate transport better. */ //ust// buf = ((struct rchan *)channel->trans_channel_data)->buf[cpu]; - buf = channel->buf; + buf = channel->buf[cpu]; /* Out-of-order write : header and data */ buf_offset = ltt_write_event_header(trace, buf, buf_offset, diff --git a/libust/tracectl.c b/libust/tracectl.c index 584a94cf..9a996d34 100644 --- a/libust/tracectl.c +++ b/libust/tracectl.c @@ -165,7 +165,7 @@ void notif_cb(void) static void inform_consumer_daemon(const char *trace_name) { - int i; + int i,j; struct ltt_trace_struct *trace; pid_t pid = getpid(); int result; @@ -179,12 +179,18 @@ static void inform_consumer_daemon(const char *trace_name) } for(i=0; i < trace->nr_channels; i++) { - result = ustcomm_request_consumer(pid, trace->channels[i].channel_name); - if(result == -1) { - WARN("Failed to request collection for channel %s. Is the daemon available?", trace->channels[i].channel_name); - /* continue even if fail */ + /* iterate on all cpus */ + for(j=0; jchannels[i].n_cpus; j++) { + char *buf; + asprintf(&buf, "%s_%d", trace->channels[i].channel_name, j); + result = ustcomm_request_consumer(pid, buf); + if(result == -1) { + WARN("Failed to request collection for channel %s. Is the daemon available?", trace->channels[i].channel_name); + /* continue even if fail */ + } + free(buf); + buffers_to_export++; } - buffers_to_export++; } finish: @@ -273,6 +279,408 @@ void process_blocked_consumers(void) } +void seperate_channel_cpu(const char *channel_and_cpu, char **channel, int *cpu) +{ + const char *sep; + + sep = rindex(channel_and_cpu, '_'); + if(sep == NULL) { + *cpu = -1; + sep = channel_and_cpu + strlen(channel_and_cpu); + } + else { + *cpu = atoi(sep+1); + } + + asprintf(channel, "%.*s", (int)(sep-channel_and_cpu), channel_and_cpu); +} + +static int do_cmd_get_shmid(const char *recvbuf, struct ustcomm_source *src) +{ + int retval = 0; + struct ltt_trace_struct *trace; + char trace_name[] = "auto"; + int i; + char *channel_and_cpu; + int found = 0; + int result; + char *ch_name; + int ch_cpu; + + DBG("get_shmid"); + + channel_and_cpu = nth_token(recvbuf, 1); + if(channel_and_cpu == NULL) { + ERR("get_shmid: cannot parse channel"); + goto end; + } + + seperate_channel_cpu(channel_and_cpu, &ch_name, &ch_cpu); + if(ch_cpu == -1) { + ERR("Problem parsing channel name"); + goto free_short_chan_name; + } + + ltt_lock_traces(); + trace = _ltt_trace_find(trace_name); + ltt_unlock_traces(); + + if(trace == NULL) { + ERR("cannot find trace!"); + retval = -1; + goto free_short_chan_name; + } + + for(i=0; inr_channels; i++) { + struct ust_channel *channel = &trace->channels[i]; + struct ust_buffer *buf = channel->buf[ch_cpu]; + + if(!strcmp(trace->channels[i].channel_name, ch_name)) { + char *reply; + +// DBG("the shmid for the requested channel is %d", buf->shmid); +// DBG("the shmid for its buffer structure is %d", channel->buf_struct_shmids); + asprintf(&reply, "%d %d", buf->shmid, channel->buf_struct_shmids[ch_cpu]); + + result = ustcomm_send_reply(&ustcomm_app.server, reply, src); + if(result) { + ERR("listener: get_shmid: ustcomm_send_reply failed"); + free(reply); + retval = -1; + goto free_short_chan_name; + } + + free(reply); + + found = 1; + break; + } + } + + if(found) { + buffers_to_export--; + } + else { + ERR("get_shmid: channel not found (%s)", channel_and_cpu); + } + + free_short_chan_name: + free(ch_name); + + end: + return retval; +} + +static int do_cmd_get_n_subbufs(const char *recvbuf, struct ustcomm_source *src) +{ + int retval = 0; + struct ltt_trace_struct *trace; + char trace_name[] = "auto"; + int i; + char *channel_and_cpu; + int found = 0; + int result; + char *ch_name; + int ch_cpu; + + DBG("get_n_subbufs"); + + channel_and_cpu = nth_token(recvbuf, 1); + if(channel_and_cpu == NULL) { + ERR("get_n_subbufs: cannot parse channel"); + goto end; + } + + seperate_channel_cpu(channel_and_cpu, &ch_name, &ch_cpu); + if(ch_cpu == -1) { + ERR("Problem parsing channel name"); + goto free_short_chan_name; + } + + ltt_lock_traces(); + trace = _ltt_trace_find(trace_name); + ltt_unlock_traces(); + + if(trace == NULL) { + ERR("cannot find trace!"); + retval = -1; + goto free_short_chan_name; + } + + for(i=0; inr_channels; i++) { + struct ust_channel *channel = &trace->channels[i]; + + if(!strcmp(trace->channels[i].channel_name, ch_name)) { + char *reply; + + DBG("the n_subbufs for the requested channel is %d", channel->subbuf_cnt); + asprintf(&reply, "%d", channel->subbuf_cnt); + + result = ustcomm_send_reply(&ustcomm_app.server, reply, src); + if(result) { + ERR("listener: get_n_subbufs: ustcomm_send_reply failed"); + free(reply); + retval = -1; + goto free_short_chan_name; + } + + free(reply); + found = 1; + break; + } + } + if(found == 0) { + ERR("get_n_subbufs: unable to find channel"); + } + + free_short_chan_name: + free(ch_name); + + end: + return retval; +} + +static int do_cmd_get_subbuf_size(const char *recvbuf, struct ustcomm_source *src) +{ + int retval = 0; + struct ltt_trace_struct *trace; + char trace_name[] = "auto"; + int i; + char *channel_and_cpu; + int found = 0; + int result; + char *ch_name; + int ch_cpu; + + DBG("get_subbuf_size"); + + channel_and_cpu = nth_token(recvbuf, 1); + if(channel_and_cpu == NULL) { + ERR("get_subbuf_size: cannot parse channel"); + goto end; + } + + seperate_channel_cpu(channel_and_cpu, &ch_name, &ch_cpu); + if(ch_cpu == -1) { + ERR("Problem parsing channel name"); + goto free_short_chan_name; + } + + ltt_lock_traces(); + trace = _ltt_trace_find(trace_name); + ltt_unlock_traces(); + + if(trace == NULL) { + ERR("cannot find trace!"); + retval = -1; + goto free_short_chan_name; + } + + for(i=0; inr_channels; i++) { + struct ust_channel *channel = &trace->channels[i]; + + if(!strcmp(trace->channels[i].channel_name, ch_name)) { + char *reply; + + DBG("the subbuf_size for the requested channel is %zd", channel->subbuf_size); + asprintf(&reply, "%zd", channel->subbuf_size); + + result = ustcomm_send_reply(&ustcomm_app.server, reply, src); + if(result) { + ERR("listener: get_subbuf_size: ustcomm_send_reply failed"); + free(reply); + retval = -1; + goto free_short_chan_name; + } + + free(reply); + found = 1; + break; + } + } + if(found == 0) { + ERR("get_subbuf_size: unable to find channel"); + } + + free_short_chan_name: + free(ch_name); + + end: + return retval; +} + +static int do_cmd_get_subbuffer(const char *recvbuf, struct ustcomm_source *src) +{ + int retval = 0; + struct ltt_trace_struct *trace; + char trace_name[] = "auto"; + int i; + char *channel_and_cpu; + int found = 0; + char *ch_name; + int ch_cpu; + + DBG("get_subbuf"); + + channel_and_cpu = nth_token(recvbuf, 1); + if(channel_and_cpu == NULL) { + ERR("get_subbuf: cannot parse channel"); + goto end; + } + + seperate_channel_cpu(channel_and_cpu, &ch_name, &ch_cpu); + if(ch_cpu == -1) { + ERR("Problem parsing channel name"); + goto free_short_chan_name; + } + + ltt_lock_traces(); + trace = _ltt_trace_find(trace_name); + ltt_unlock_traces(); + + if(trace == NULL) { + ERR("cannot find trace!"); + retval = -1; + goto free_short_chan_name; + } + + for(i=0; inr_channels; i++) { + struct ust_channel *channel = &trace->channels[i]; + + if(!strcmp(trace->channels[i].channel_name, ch_name)) { + struct ust_buffer *buf = channel->buf[ch_cpu]; + struct blocked_consumer *bc; + + found = 1; + + bc = (struct blocked_consumer *) malloc(sizeof(struct blocked_consumer)); + if(bc == NULL) { + ERR("malloc returned NULL"); + goto free_short_chan_name; + } + bc->fd_consumer = src->fd; + bc->fd_producer = buf->data_ready_fd_read; + bc->buf = buf; + bc->src = *src; + bc->server = ustcomm_app.server; + + list_add(&bc->list, &blocked_consumers); + + break; + } + } + if(found == 0) { + ERR("get_subbuf: unable to find channel"); + } + + free_short_chan_name: + free(ch_name); + + end: + return retval; +} + +static int do_cmd_put_subbuffer(const char *recvbuf, struct ustcomm_source *src) +{ + int retval = 0; + struct ltt_trace_struct *trace; + char trace_name[] = "auto"; + int i; + char *channel_and_cpu; + int found = 0; + int result; + char *ch_name; + int ch_cpu; + long consumed_old; + char *consumed_old_str; + char *endptr; + + DBG("put_subbuf"); + + channel_and_cpu = strdup_malloc(nth_token(recvbuf, 1)); + if(channel_and_cpu == NULL) { + ERR("put_subbuf_size: cannot parse channel"); + retval = -1; + goto end; + } + + consumed_old_str = strdup_malloc(nth_token(recvbuf, 2)); + if(consumed_old_str == NULL) { + ERR("put_subbuf: cannot parse consumed_old"); + retval = -1; + goto free_channel_and_cpu; + } + consumed_old = strtol(consumed_old_str, &endptr, 10); + if(*endptr != '\0') { + ERR("put_subbuf: invalid value for consumed_old"); + retval = -1; + goto free_consumed_old_str; + } + + seperate_channel_cpu(channel_and_cpu, &ch_name, &ch_cpu); + if(ch_cpu == -1) { + ERR("Problem parsing channel name"); + retval = -1; + goto free_short_chan_name; + } + + ltt_lock_traces(); + trace = _ltt_trace_find(trace_name); + ltt_unlock_traces(); + + if(trace == NULL) { + ERR("cannot find trace!"); + retval = -1; + goto free_short_chan_name; + } + + for(i=0; inr_channels; i++) { + struct ust_channel *channel = &trace->channels[i]; + + if(!strcmp(trace->channels[i].channel_name, ch_name)) { + struct ust_buffer *buf = channel->buf[ch_cpu]; + char *reply; + long consumed_old=0; + + found = 1; + + result = ust_buffers_do_put_subbuf(buf, consumed_old); + if(result < 0) { + WARN("ust_buffers_do_put_subbuf: error (subbuf=%s)", channel_and_cpu); + asprintf(&reply, "%s", "ERROR"); + } + else { + DBG("ust_buffers_do_put_subbuf: success (subbuf=%s)", channel_and_cpu); + asprintf(&reply, "%s", "OK"); + } + + result = ustcomm_send_reply(&ustcomm_app.server, reply, src); + if(result) { + ERR("listener: put_subbuf: ustcomm_send_reply failed"); + free(reply); + retval = -1; + goto free_channel_and_cpu; + } + + free(reply); + break; + } + } + if(found == 0) { + ERR("get_subbuf_size: unable to find channel"); + } + + free_channel_and_cpu: + free(channel_and_cpu); + free_consumed_old_str: + free(consumed_old_str); + free_short_chan_name: + free(ch_name); + + end: + return retval; +} + void *listener_main(void *p) { int result; @@ -423,140 +831,13 @@ void *listener_main(void *p) } } else if(nth_token_is(recvbuf, "get_shmid", 0) == 1) { - struct ltt_trace_struct *trace; - char trace_name[] = "auto"; - int i; - char *channel_name; - - DBG("get_shmid"); - - channel_name = nth_token(recvbuf, 1); - if(channel_name == NULL) { - ERR("get_shmid: cannot parse channel"); - goto next_cmd; - } - - ltt_lock_traces(); - trace = _ltt_trace_find(trace_name); - ltt_unlock_traces(); - - if(trace == NULL) { - ERR("cannot find trace!"); - return (void *)1; - } - - for(i=0; inr_channels; i++) { - struct ust_channel *channel = &trace->channels[i]; - struct ust_buffer *buf = channel->buf; - - if(!strcmp(trace->channels[i].channel_name, channel_name)) { - char *reply; - - DBG("the shmid for the requested channel is %d", buf->shmid); - DBG("the shmid for its buffer structure is %d", channel->buf_shmid); - asprintf(&reply, "%d %d", buf->shmid, channel->buf_shmid); - - result = ustcomm_send_reply(&ustcomm_app.server, reply, &src); - if(result) { - ERR("listener: get_shmid: ustcomm_send_reply failed"); - goto next_cmd; - } - - free(reply); - - break; - } - } - - buffers_to_export--; + do_cmd_get_shmid(recvbuf, &src); } else if(nth_token_is(recvbuf, "get_n_subbufs", 0) == 1) { - struct ltt_trace_struct *trace; - char trace_name[] = "auto"; - int i; - char *channel_name; - - DBG("get_n_subbufs"); - - channel_name = nth_token(recvbuf, 1); - if(channel_name == NULL) { - ERR("get_n_subbufs: cannot parse channel"); - goto next_cmd; - } - - ltt_lock_traces(); - trace = _ltt_trace_find(trace_name); - ltt_unlock_traces(); - - if(trace == NULL) { - ERR("cannot find trace!"); - return (void *)1; - } - - for(i=0; inr_channels; i++) { - struct ust_channel *channel = &trace->channels[i]; - - if(!strcmp(trace->channels[i].channel_name, channel_name)) { - char *reply; - - DBG("the n_subbufs for the requested channel is %d", channel->subbuf_cnt); - asprintf(&reply, "%d", channel->subbuf_cnt); - - result = ustcomm_send_reply(&ustcomm_app.server, reply, &src); - if(result) { - ERR("listener: get_n_subbufs: ustcomm_send_reply failed"); - goto next_cmd; - } - - free(reply); - - break; - } - } + do_cmd_get_n_subbufs(recvbuf, &src); } else if(nth_token_is(recvbuf, "get_subbuf_size", 0) == 1) { - struct ltt_trace_struct *trace; - char trace_name[] = "auto"; - int i; - char *channel_name; - - DBG("get_subbuf_size"); - - channel_name = nth_token(recvbuf, 1); - if(channel_name == NULL) { - ERR("get_subbuf_size: cannot parse channel"); - goto next_cmd; - } - - ltt_lock_traces(); - trace = _ltt_trace_find(trace_name); - ltt_unlock_traces(); - - if(trace == NULL) { - ERR("cannot find trace!"); - return (void *)1; - } - - for(i=0; inr_channels; i++) { - struct ust_channel *channel = &trace->channels[i]; - - if(!strcmp(trace->channels[i].channel_name, channel_name)) { - char *reply; - - DBG("the subbuf_size for the requested channel is %zd", channel->subbuf_size); - asprintf(&reply, "%zd", channel->subbuf_size); - - result = ustcomm_send_reply(&ustcomm_app.server, reply, &src); - if(result) { - ERR("listener: get_subbuf_size: ustcomm_send_reply failed"); - goto next_cmd; - } - - free(reply); - - break; - } - } + do_cmd_get_subbuf_size(recvbuf, &src); } else if(nth_token_is(recvbuf, "load_probe_lib", 0) == 1) { char *libfile; @@ -564,123 +845,14 @@ void *listener_main(void *p) libfile = nth_token(recvbuf, 1); DBG("load_probe_lib loading %s", libfile); + + free(libfile); } else if(nth_token_is(recvbuf, "get_subbuffer", 0) == 1) { - struct ltt_trace_struct *trace; - char trace_name[] = "auto"; - int i; - char *channel_name; - - DBG("get_subbuf"); - - channel_name = nth_token(recvbuf, 1); - if(channel_name == NULL) { - ERR("get_subbuf: cannot parse channel"); - goto next_cmd; - } - - ltt_lock_traces(); - trace = _ltt_trace_find(trace_name); - ltt_unlock_traces(); - - if(trace == NULL) { - ERR("cannot find trace!"); - return (void *)1; - } - - for(i=0; inr_channels; i++) { - struct ust_channel *channel = &trace->channels[i]; - - if(!strcmp(trace->channels[i].channel_name, channel_name)) { - struct ust_buffer *buf = channel->buf; - struct blocked_consumer *bc; - - bc = (struct blocked_consumer *) malloc(sizeof(struct blocked_consumer)); - if(bc == NULL) { - ERR("malloc returned NULL"); - goto next_cmd; - } - bc->fd_consumer = src.fd; - bc->fd_producer = buf->data_ready_fd_read; - bc->buf = buf; - bc->src = src; - bc->server = ustcomm_app.server; - - list_add(&bc->list, &blocked_consumers); - - break; - } - } + do_cmd_get_subbuffer(recvbuf, &src); } else if(nth_token_is(recvbuf, "put_subbuffer", 0) == 1) { - struct ltt_trace_struct *trace; - char trace_name[] = "auto"; - int i; - char *channel_name; - long consumed_old; - char *consumed_old_str; - char *endptr; - - DBG("put_subbuf"); - - channel_name = strdup_malloc(nth_token(recvbuf, 1)); - if(channel_name == NULL) { - ERR("put_subbuf_size: cannot parse channel"); - goto next_cmd; - } - - consumed_old_str = strdup_malloc(nth_token(recvbuf, 2)); - if(consumed_old_str == NULL) { - ERR("put_subbuf: cannot parse consumed_old"); - goto next_cmd; - } - consumed_old = strtol(consumed_old_str, &endptr, 10); - if(*endptr != '\0') { - ERR("put_subbuf: invalid value for consumed_old"); - goto next_cmd; - } - - ltt_lock_traces(); - trace = _ltt_trace_find(trace_name); - ltt_unlock_traces(); - - if(trace == NULL) { - ERR("cannot find trace!"); - return (void *)1; - } - - for(i=0; inr_channels; i++) { - struct ust_channel *channel = &trace->channels[i]; - - if(!strcmp(trace->channels[i].channel_name, channel_name)) { - struct ust_buffer *buf = channel->buf; - char *reply; - long consumed_old=0; - - result = ust_buffers_do_put_subbuf(buf, consumed_old); - if(result < 0) { - WARN("ust_buffers_do_put_subbuf: error (subbuf=%s)", channel_name); - asprintf(&reply, "%s", "ERROR"); - } - else { - DBG("ust_buffers_do_put_subbuf: success (subbuf=%s)", channel_name); - asprintf(&reply, "%s", "OK"); - } - - result = ustcomm_send_reply(&ustcomm_app.server, reply, &src); - if(result) { - ERR("listener: put_subbuf: ustcomm_send_reply failed"); - goto next_cmd; - } - - free(reply); - - break; - } - } - - free(channel_name); - free(consumed_old_str); + do_cmd_put_subbuffer(recvbuf, &src); } else if(nth_token_is(recvbuf, "enable_marker", 0) == 1) { char *channel_slash_name = nth_token(recvbuf, 1); @@ -1190,7 +1362,6 @@ void ust_before_fork(ust_fork_info_t *fork_info) static void ust_after_fork_common(ust_fork_info_t *fork_info) { int result; - sigset_t orig_sigs; /* Restore signals */ result = sigprocmask(SIG_SETMASK, &fork_info->orig_sigs, NULL); diff --git a/libust/tracer.c b/libust/tracer.c index aec55f43..5e4e1c09 100644 --- a/libust/tracer.c +++ b/libust/tracer.c @@ -680,7 +680,7 @@ int ltt_trace_alloc(const char *trace_name) //ust// local_irq_save(flags); trace->start_freq = trace_clock_frequency(); trace->start_tsc = trace_clock_read64(); - gettimeofday(&trace->start_time, NULL); //ust// changed + gettimeofday(&trace->start_time, NULL); //ust// changed /* FIXME: is this ok? */ //ust// local_irq_restore(flags); for (chan = 0; chan < trace->nr_channels; chan++) { diff --git a/libust/tracer.h b/libust/tracer.h index b0767543..b1fbf107 100644 --- a/libust/tracer.h +++ b/libust/tracer.h @@ -133,7 +133,7 @@ struct ltt_trace_ops { void **transport_data, size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc, unsigned int *rflags, - int largest_align); + int largest_align, int cpu); //ust// void (*commit_slot) (struct ltt_channel_struct *channel, //ust// void **transport_data, long buf_offset, //ust// size_t slot_size); @@ -154,7 +154,7 @@ struct ltt_trace_ops { void (*remove_channel) (struct ust_channel *channel); void (*user_errors) (struct ltt_trace_struct *trace, unsigned int index, size_t data_size, - struct user_dbg_data *dbg); + struct user_dbg_data *dbg, unsigned int cpu); } ____cacheline_aligned; struct ltt_transport { @@ -457,11 +457,11 @@ static __inline__ int ltt_reserve_slot( long *buf_offset, u64 *tsc, unsigned int *rflags, - int largest_align) + int largest_align, int cpu) { return trace->ops->reserve_slot(trace, channel, transport_data, data_size, slot_size, buf_offset, tsc, rflags, - largest_align); + largest_align, cpu); } diff --git a/ustd/ustd.c b/ustd/ustd.c index 26f87a2b..969c192a 100644 --- a/ustd/ustd.c +++ b/ustd/ustd.c @@ -259,7 +259,7 @@ struct buffer_info *connect_buffer(pid_t pid, const char *bufname) result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid); if(result != 2) { - ERR("unable to parse response to get_shmid"); + ERR("unable to parse response to get_shmid (\"%s\")", received_msg); return NULL; } free(received_msg); @@ -342,7 +342,7 @@ struct buffer_info *connect_buffer(pid_t pid, const char *bufname) } free(tmp); - asprintf(&tmp, "%s/%u_%lld/%s_0", trace_path, buf->pid, buf->pidunique, buf->name); + asprintf(&tmp, "%s/%u_%lld/%s", trace_path, buf->pid, buf->pidunique, buf->name); result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC | O_EXCL, 00600); if(result == -1) { PERROR("open");