From: Nils Carlson Date: Tue, 30 Nov 2010 13:36:30 +0000 (+0100) Subject: Fix freeing of channels and buffers in buffers.c v2 X-Git-Tag: v0.10~6 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=ef15e552c145311c21bb89a1f5e9c6c040e1737f;p=ust.git Fix freeing of channels and buffers in buffers.c v2 Changes since v1: Handle new naming conventions buffers.c appeared to be very convoluted full of krefs and in general very bad shape. I have tried to make the freeing and unmapping of shared memory symmetrical in the sense that every open has a corresponding close and every map has a corresponding unmap. I have removed all use of krefs. Assuming no concurrency this should be fine. The kref usage was anyway broken. Signed-off-by: Nils Carlson --- diff --git a/libust/buffers.c b/libust/buffers.c index 3c8b943..e5a1db3 100644 --- a/libust/buffers.c +++ b/libust/buffers.c @@ -134,12 +134,23 @@ void _ust_buffers_strncpy_fixup(struct ust_buffer *buf, size_t offset, ust_buffers_do_memset(buf->buf_data + buf_offset, '\0', 1); } -static int ust_buffers_init_buffer(struct ust_trace *trace, - struct ust_channel *ltt_chan, - struct ust_buffer *buf, - unsigned int n_subbufs); +static void ltt_buffer_begin(struct ust_buffer *buf, + u64 tsc, unsigned int subbuf_idx) +{ + struct ust_channel *channel = buf->chan; + struct ltt_subbuffer_header *header = + (struct ltt_subbuffer_header *) + ust_buffers_offset_address(buf, + subbuf_idx * buf->chan->subbuf_size); -static int ust_buffers_alloc_buf(struct ust_buffer *buf, size_t *size) + header->cycle_count_begin = tsc; + header->data_size = 0xFFFFFFFF; /* for recognizing crashed buffers */ + header->sb_size = 0xFFFFFFFF; /* for recognizing crashed buffers */ + /* FIXME: add memory barrier? */ + ltt_write_trace_header(channel->trace, header); +} + +static int map_buf_data(struct ust_buffer *buf, size_t *size) { void *ptr; int result; @@ -147,18 +158,16 @@ static int ust_buffers_alloc_buf(struct ust_buffer *buf, size_t *size) *size = PAGE_ALIGN(*size); result = buf->shmid = shmget(getpid(), *size, IPC_CREAT | IPC_EXCL | 0700); - if(result == -1 && errno == EINVAL) { + if (result < 0 && errno == EINVAL) { ERR("shmget() returned EINVAL; maybe /proc/sys/kernel/shmmax should be increased."); return -1; - } - else if(result == -1) { + } else if (result < 0) { PERROR("shmget"); return -1; } - /* FIXME: should have matching call to shmdt */ ptr = shmat(buf->shmid, NULL, 0); - if(ptr == (void *) -1) { + if (ptr == (void *) -1) { perror("shmat"); goto destroy_shmem; } @@ -177,7 +186,7 @@ static int ust_buffers_alloc_buf(struct ust_buffer *buf, size_t *size) return 0; - destroy_shmem: +destroy_shmem: result = shmctl(buf->shmid, IPC_RMID, NULL); if(result == -1) { perror("shmctl"); @@ -186,78 +195,97 @@ static int ust_buffers_alloc_buf(struct ust_buffer *buf, size_t *size) return -1; } -int ust_buffers_create_buf(struct ust_channel *channel, int cpu) +static int open_buf(struct ust_channel *chan, int cpu) { - int result; - struct ust_buffer *buf = channel->buf[cpu]; + int result, fds[2]; + unsigned int j; + struct ust_trace *trace = chan->trace; + struct ust_buffer *buf = chan->buf[cpu]; + unsigned int n_subbufs = chan->subbuf_cnt; - buf->cpu = cpu; - result = ust_buffers_alloc_buf(buf, &channel->alloc_size); - if(result) + + result = map_buf_data(buf, &chan->alloc_size); + if (result < 0) return -1; - buf->chan = channel; - urcu_ref_get(&channel->urcu_ref); - return 0; -} + buf->commit_count = + zmalloc(sizeof(*buf->commit_count) * n_subbufs); + if (!buf->commit_count) + goto unmap_buf; -static void ust_buffers_destroy_channel(struct urcu_ref *urcu_ref) -{ - struct ust_channel *chan = _ust_container_of(urcu_ref, struct ust_channel, urcu_ref); - free(chan); -} + result = pipe(fds); + if (result < 0) { + PERROR("pipe"); + goto free_commit_count; + } + buf->data_ready_fd_read = fds[0]; + buf->data_ready_fd_write = fds[1]; -static void ust_buffers_destroy_buf(struct ust_buffer *buf) -{ - struct ust_channel *chan = buf->chan; - int result; + buf->cpu = cpu; + buf->chan = chan; - result = munmap(buf->buf_data, buf->buf_size); - if(result == -1) { - PERROR("munmap"); + uatomic_set(&buf->offset, ltt_subbuffer_header_size()); + uatomic_set(&buf->consumed, 0); + uatomic_set(&buf->active_readers, 0); + for (j = 0; j < n_subbufs; j++) { + uatomic_set(&buf->commit_count[j].cc, 0); + uatomic_set(&buf->commit_count[j].cc_sb, 0); } -//ust// chan->buf[buf->cpu] = NULL; - free(buf); - urcu_ref_put(&chan->urcu_ref, ust_buffers_destroy_channel); -} + ltt_buffer_begin(buf, trace->start_tsc, 0); -/* called from urcu_ref_put */ -static void ust_buffers_remove_buf(struct urcu_ref *urcu_ref) -{ - struct ust_buffer *buf = _ust_container_of(urcu_ref, struct ust_buffer, urcu_ref); - ust_buffers_destroy_buf(buf); + uatomic_add(&buf->commit_count[0].cc, ltt_subbuffer_header_size()); + + uatomic_set(&buf->events_lost, 0); + uatomic_set(&buf->corrupted_subbuffers, 0); + + memset(buf->commit_seq, 0, sizeof(buf->commit_seq[0]) * n_subbufs); + + return 0; + +free_commit_count: + free(buf->commit_count); + +unmap_buf: + if (shmdt(buf->buf_data) < 0) { + PERROR("shmdt failed"); + } + + return -1; } -int ust_buffers_open_buf(struct ust_channel *chan, int cpu) +static void ltt_relay_print_buffer_errors(struct ust_channel *chan, int cpu); + +static void close_buf(struct ust_buffer *buf) { + struct ust_channel *chan = buf->chan; + int cpu = buf->cpu; int result; - result = ust_buffers_create_buf(chan, cpu); - if (result == -1) - return -1; + result = shmdt(buf->buf_data); + if (result < 0) { + PERROR("shmdt"); + } - urcu_ref_init(&chan->buf[cpu]->urcu_ref); + free(buf->commit_count); - result = ust_buffers_init_buffer(chan->trace, chan, chan->buf[cpu], chan->subbuf_cnt); - if(result == -1) - return -1; + result = close(buf->data_ready_fd_read); + if (result < 0) { + PERROR("close"); + } - return 0; + result = close(buf->data_ready_fd_write); + if (result < 0 && errno != EBADF) { + PERROR("close"); + } - /* FIXME: decrementally destroy on error? */ + /* FIXME: This spews out errors, are they real?: + * ltt_relay_print_buffer_errors(chan, cpu); */ } -/** - * ust_buffers_close_buf - close a channel buffer - * @buf: buffer - */ -static void ust_buffers_close_buf(struct ust_buffer *buf) -{ - urcu_ref_put(&buf->urcu_ref, ust_buffers_remove_buf); -} -int ust_buffers_channel_open(struct ust_channel *chan, size_t subbuf_size, size_t subbuf_cnt) +static int open_channel(struct ust_channel *chan, size_t subbuf_size, + size_t subbuf_cnt) { int i; int result; @@ -280,11 +308,9 @@ int ust_buffers_channel_open(struct ust_channel *chan, size_t subbuf_size, size_ chan->subbuf_size_order = get_count_order(subbuf_size); chan->alloc_size = subbuf_size * subbuf_cnt; - urcu_ref_init(&chan->urcu_ref); - pthread_mutex_lock(&ust_buffers_channels_mutex); - for(i=0; in_cpus; i++) { - result = ust_buffers_open_buf(chan, i); + for (i=0; i < chan->n_cpus; i++) { + result = open_buf(chan, i); if (result == -1) goto error; } @@ -296,17 +322,16 @@ int ust_buffers_channel_open(struct ust_channel *chan, size_t subbuf_size, size_ /* Jump directly inside the loop to close the buffers that were already * opened. */ for(; i>=0; i--) { - ust_buffers_close_buf(chan->buf[i]); + close_buf(chan->buf[i]); error: do {} while(0); } - urcu_ref_put(&chan->urcu_ref, ust_buffers_destroy_channel); pthread_mutex_unlock(&ust_buffers_channels_mutex); return -1; } -void ust_buffers_channel_close(struct ust_channel *chan) +static void close_channel(struct ust_channel *chan) { int i; if(!chan) @@ -317,41 +342,18 @@ void ust_buffers_channel_close(struct ust_channel *chan) /* FIXME: if we make it here, then all buffers were necessarily allocated. Moreover, we don't * initialize to NULL so we cannot use this check. Should we? */ //ust// if (chan->buf[i]) - ust_buffers_close_buf(chan->buf[i]); + close_buf(chan->buf[i]); } cds_list_del(&chan->list); - urcu_ref_put(&chan->urcu_ref, ust_buffers_destroy_channel); + pthread_mutex_unlock(&ust_buffers_channels_mutex); } -/* - * ------- - */ - -static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan, int cpu); - static void ltt_force_switch(struct ust_buffer *buf, enum force_switch_mode mode); -/* - * Trace callbacks - */ -static void ltt_buffer_begin(struct ust_buffer *buf, - u64 tsc, unsigned int subbuf_idx) -{ - struct ust_channel *channel = buf->chan; - struct ltt_subbuffer_header *header = - (struct ltt_subbuffer_header *) - ust_buffers_offset_address(buf, - subbuf_idx * buf->chan->subbuf_size); - header->cycle_count_begin = tsc; - header->data_size = 0xFFFFFFFF; /* for recognizing crashed buffers */ - header->sb_size = 0xFFFFFFFF; /* for recognizing crashed buffers */ - /* FIXME: add memory cmm_barrier? */ - ltt_write_trace_header(channel->trace, header); -} /* * offset is assumed to never be 0 here : never deliver a completely empty @@ -429,7 +431,7 @@ int ust_buffers_get_subbuf(struct ust_buffer *buf, long *consumed) * smp_mb() * * However, smp_call_function_single() does not seem to clearly execute - * such cmm_barriers. It depends on spinlock semantic to provide the cmm_barrier + * such barriers. It depends on spinlock semantic to provide the barrier * before executing the IPI and, when busy-looping, csd_lock_wait only * executes smp_mb() when it has to wait for the other CPU. * @@ -590,122 +592,7 @@ static void ltt_relay_print_buffer_errors(struct ust_channel *channel, int cpu) ltt_relay_print_errors(trace, channel, cpu); } -static void ltt_relay_release_channel(struct urcu_ref *urcu_ref) -{ - struct ust_channel *ltt_chan = _ust_container_of(urcu_ref, - struct ust_channel, urcu_ref); - free(ltt_chan->buf); -} - -/* - * Create ltt buffer. - */ -//ust// static int ltt_relay_create_buffer(struct ust_trace *trace, -//ust// struct ltt_channel_struct *ltt_chan, struct rchan_buf *buf, -//ust// unsigned int cpu, unsigned int n_subbufs) -//ust// { -//ust// struct ltt_channel_buf_struct *ltt_buf = -//ust// percpu_ptr(ltt_chan->buf, cpu); -//ust// unsigned int j; -//ust// -//ust// ltt_buf->commit_count = -//ust// kzalloc_node(sizeof(ltt_buf->commit_count) * n_subbufs, -//ust// GFP_KERNEL, cpu_to_node(cpu)); -//ust// if (!ltt_buf->commit_count) -//ust// return -ENOMEM; -//ust// kref_get(&trace->kref); -//ust// kref_get(&trace->ltt_transport_kref); -//ust// kref_get(<t_chan->kref); -//ust// uatomic_set(<t_buf->offset, ltt_subbuffer_header_size()); -//ust// uatomic_set(<t_buf->consumed, 0); -//ust// uatomic_set(<t_buf->active_readers, 0); -//ust// for (j = 0; j < n_subbufs; j++) -//ust// uatomic_set(<t_buf->commit_count[j], 0); -//ust// init_waitqueue_head(<t_buf->write_wait); -//ust// uatomic_set(<t_buf->wakeup_readers, 0); -//ust// spin_lock_init(<t_buf->full_lock); -//ust// -//ust// ltt_buffer_begin_callback(buf, trace->start_tsc, 0); -//ust// /* atomic_add made on local variable on data that belongs to -//ust// * various CPUs : ok because tracing not started (for this cpu). */ -//ust// uatomic_add(<t_buf->commit_count[0], ltt_subbuffer_header_size()); -//ust// -//ust// uatomic_set(<t_buf->events_lost, 0); -//ust// uatomic_set(<t_buf->corrupted_subbuffers, 0); -//ust// -//ust// return 0; -//ust// } - -static int ust_buffers_init_buffer(struct ust_trace *trace, - struct ust_channel *ltt_chan, struct ust_buffer *buf, - unsigned int n_subbufs) -{ - unsigned int j; - int fds[2]; - int result; - - buf->commit_count = - zmalloc(sizeof(*buf->commit_count) * n_subbufs); - if (!buf->commit_count) - return -ENOMEM; - urcu_ref_get(&trace->urcu_ref); - urcu_ref_get(&trace->ltt_transport_urcu_ref); - urcu_ref_get(<t_chan->urcu_ref); - uatomic_set(&buf->offset, ltt_subbuffer_header_size()); - uatomic_set(&buf->consumed, 0); - uatomic_set(&buf->active_readers, 0); - for (j = 0; j < n_subbufs; j++) { - uatomic_set(&buf->commit_count[j].cc, 0); - uatomic_set(&buf->commit_count[j].cc_sb, 0); - } -//ust// init_waitqueue_head(&buf->write_wait); -//ust// uatomic_set(&buf->wakeup_readers, 0); -//ust// spin_lock_init(&buf->full_lock); - - ltt_buffer_begin(buf, trace->start_tsc, 0); - - uatomic_add(&buf->commit_count[0].cc, ltt_subbuffer_header_size()); - - uatomic_set(&buf->events_lost, 0); - uatomic_set(&buf->corrupted_subbuffers, 0); - - result = pipe(fds); - if(result == -1) { - PERROR("pipe"); - return -1; - } - buf->data_ready_fd_read = fds[0]; - buf->data_ready_fd_write = fds[1]; - -//ust// buf->commit_seq = malloc(sizeof(buf->commit_seq) * n_subbufs); -//ust// if(!ltt_buf->commit_seq) { -//ust// return -1; -//ust// } - memset(buf->commit_seq, 0, sizeof(buf->commit_seq[0]) * n_subbufs); - - /* FIXME: decrementally destroy on error */ - - return 0; -} - -/* FIXME: use this function */ -static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan, int cpu) -{ - struct ust_trace *trace = ltt_chan->trace; - struct ust_buffer *ltt_buf = ltt_chan->buf[cpu]; - - urcu_ref_put(<t_chan->trace->ltt_transport_urcu_ref, - ltt_release_transport); - ltt_relay_print_buffer_errors(ltt_chan, cpu); -//ust// free(ltt_buf->commit_seq); - free(ltt_buf->commit_count); - ltt_buf->commit_count = NULL; - urcu_ref_put(<t_chan->urcu_ref, ltt_relay_release_channel); - urcu_ref_put(&trace->urcu_ref, ltt_release_trace); -//ust// wake_up_interruptible(&trace->kref_wq); -} - -static int ust_buffers_alloc_channel_buf_structs(struct ust_channel *chan) +static int map_buf_structs(struct ust_channel *chan) { void *ptr; int result; @@ -722,7 +609,6 @@ static int ust_buffers_alloc_channel_buf_structs(struct ust_channel *chan) goto destroy_previous; } - /* FIXME: should have matching call to shmdt */ ptr = shmat(chan->buf_struct_shmids[i], NULL, 0); if(ptr == (void *) -1) { perror("shmat"); @@ -760,57 +646,86 @@ static int ust_buffers_alloc_channel_buf_structs(struct ust_channel *chan) return -1; } +static int unmap_buf_structs(struct ust_channel *chan) +{ + int i; + + for (i=0; i < chan->n_cpus; i++) { + if (shmdt(chan->buf[i]) < 0) { + PERROR("shmdt"); + } + } +} + /* * Create channel. */ -static int ust_buffers_create_channel(const char *trace_name, struct ust_trace *trace, - const char *channel_name, struct ust_channel *ltt_chan, +static int create_channel(const char *trace_name, struct ust_trace *trace, + const char *channel_name, struct ust_channel *chan, unsigned int subbuf_size, unsigned int n_subbufs, int overwrite) { - int result; + int i, result; - urcu_ref_init(<t_chan->urcu_ref); + chan->trace = trace; + chan->overwrite = overwrite; + chan->n_subbufs_order = get_count_order(n_subbufs); + chan->commit_count_mask = (~0UL >> chan->n_subbufs_order); + chan->n_cpus = get_n_cpus(); - ltt_chan->trace = trace; - ltt_chan->overwrite = overwrite; - ltt_chan->n_subbufs_order = get_count_order(n_subbufs); - ltt_chan->commit_count_mask = (~0UL >> ltt_chan->n_subbufs_order); - ltt_chan->n_cpus = get_n_cpus(); -//ust// ltt_chan->buf = percpu_alloc_mask(sizeof(struct ltt_channel_buf_struct), GFP_KERNEL, cpu_possible_map); - ltt_chan->buf = (void *) zmalloc(ltt_chan->n_cpus * sizeof(void *)); - if(ltt_chan->buf == NULL) { + /* These mappings should ideall be per-cpu, if somebody can do that + * from userspace, that would be cool! + */ + chan->buf = (void *) zmalloc(chan->n_cpus * sizeof(void *)); + if(chan->buf == NULL) { goto error; } - ltt_chan->buf_struct_shmids = (int *) zmalloc(ltt_chan->n_cpus * sizeof(int)); - if(ltt_chan->buf_struct_shmids == NULL) + chan->buf_struct_shmids = (int *) zmalloc(chan->n_cpus * sizeof(int)); + if(chan->buf_struct_shmids == NULL) goto free_buf; - result = ust_buffers_alloc_channel_buf_structs(ltt_chan); + result = map_buf_structs(chan); if(result != 0) { goto free_buf_struct_shmids; } - result = ust_buffers_channel_open(ltt_chan, subbuf_size, n_subbufs); + result = open_channel(chan, subbuf_size, n_subbufs); if (result != 0) { ERR("Cannot open channel for trace %s", trace_name); - goto unalloc_buf_structs; + goto unmap_buf_structs; } return 0; -unalloc_buf_structs: - /* FIXME: put a call here to unalloc the buf structs! */ +unmap_buf_structs: + for (i=0; i < chan->n_cpus; i++) { + if (shmdt(chan->buf[i]) < 0) { + PERROR("shmdt bufstruct"); + } + } free_buf_struct_shmids: - free(ltt_chan->buf_struct_shmids); + free(chan->buf_struct_shmids); free_buf: - free(ltt_chan->buf); + free(chan->buf); error: return -1; } + +static void remove_channel(struct ust_channel *chan) +{ + close_channel(chan); + + unmap_buf_structs(chan); + + free(chan->buf_struct_shmids); + + free(chan->buf); + +} + static void ltt_relay_async_wakeup_chan(struct ust_channel *ltt_channel) { //ust// unsigned int i; @@ -834,20 +749,14 @@ static void ltt_relay_finish_buffer(struct ust_channel *channel, unsigned int cp if (channel->buf[cpu]) { struct ust_buffer *buf = channel->buf[cpu]; ltt_force_switch(buf, FORCE_FLUSH); -//ust// ltt_relay_wake_writers(ltt_buf); + /* closing the pipe tells the consumer the buffer is finished */ - - //result = write(ltt_buf->data_ready_fd_write, "D", 1); - //if(result == -1) { - // PERROR("write (in ltt_relay_finish_buffer)"); - // ERR("this should never happen!"); - //} close(buf->data_ready_fd_write); } } -static void ltt_relay_finish_channel(struct ust_channel *channel) +static void finish_channel(struct ust_channel *channel) { unsigned int i; @@ -856,11 +765,6 @@ static void ltt_relay_finish_channel(struct ust_channel *channel) } } -static void ltt_relay_remove_channel(struct ust_channel *channel) -{ - ust_buffers_channel_close(channel); - urcu_ref_put(&channel->urcu_ref, ltt_relay_release_channel); -} /* * ltt_reserve_switch_old_subbuf: switch old subbuffer @@ -895,7 +799,7 @@ static void ltt_reserve_switch_old_subbuf( /* * Must write slot data before incrementing commit count. - * This compiler cmm_barrier is upgraded into a cmm_smp_wmb() by the IPI + * This compiler barrier is upgraded into a cmm_smp_wmb() by the IPI * sent by get_subbuf() when it does its cmm_smp_rmb(). */ cmm_smp_wmb(); @@ -924,7 +828,7 @@ static void ltt_reserve_switch_new_subbuf( /* * Must write slot data before incrementing commit count. - * This compiler cmm_barrier is upgraded into a cmm_smp_wmb() by the IPI + * This compiler barrier is upgraded into a cmm_smp_wmb() by the IPI * sent by get_subbuf() when it does its cmm_smp_rmb(). */ cmm_smp_wmb(); @@ -969,7 +873,7 @@ static void ltt_reserve_end_switch_current( /* * Must write slot data before incrementing commit count. - * This compiler cmm_barrier is upgraded into a cmm_smp_wmb() by the IPI + * This compiler barrier is upgraded into a cmm_smp_wmb() by the IPI * sent by get_subbuf() when it does its cmm_smp_rmb(). */ cmm_smp_wmb(); @@ -1298,9 +1202,9 @@ int ltt_reserve_slot_lockless_slow(struct ust_channel *chan, static struct ltt_transport ust_relay_transport = { .name = "ustrelay", .ops = { - .create_channel = ust_buffers_create_channel, - .finish_channel = ltt_relay_finish_channel, - .remove_channel = ltt_relay_remove_channel, + .create_channel = create_channel, + .finish_channel = finish_channel, + .remove_channel = remove_channel, .wakeup_channel = ltt_relay_async_wakeup_chan, }, }; diff --git a/libust/tracer.c b/libust/tracer.c index eb44f54..435c22d 100644 --- a/libust/tracer.c +++ b/libust/tracer.c @@ -795,9 +795,6 @@ static void __ltt_trace_destroy(struct ust_trace *trace, int drop) } } - return; /* FIXME: temporary for ust */ -//ust// flush_scheduled_work(); - /* * The currently destroyed trace is not in the trace list anymore, * so it's safe to call the async wakeup ourself. It will deliver