ust/ringbuffer-abi.h \
ust/lttng-tracer.h \
ust/usterr-signal-safe.h \
- ust/core.h \
+ ust/config.h \
ust/share.h \
- ust/ust.h
+ ust/ust.h \
+ ust/core.h
# note: usterr-signal-safe.h, core.h and share.h need namespace cleanup.
usterr.h \
ust_snprintf.h \
ust/compat.h \
- ust/config.h \
ust/marker-internal.h \
ust/tracepoint-internal.h \
ust/clock.h \
ust/probe-internal.h \
- ust/processor.h \
ust/kcompat/kcompat.h \
ust/kcompat/jhash.h \
ust/kcompat/compiler.h \
ust/kcompat/types.h \
ust/stringify.h \
- ust/wait.h
+ ust/wait.h \
+ ust/ringbuffer-config.h \
+ ust/processor.h
* channel.
*/
extern
-void *channel_destroy(struct channel *chan, struct shm_handle *handle);
+void *channel_destroy(struct channel *chan, struct shm_handle *handle,
+ int shadow);
/* Buffer read operations */
int *shm_fd, int *wait_fd,
uint64_t *memory_map_size);
extern int lib_ring_buffer_open_read(struct lib_ring_buffer *buf,
- struct shm_handle *handle);
+ struct shm_handle *handle,
+ int shadow);
extern void lib_ring_buffer_release_read(struct lib_ring_buffer *buf,
- struct shm_handle *handle);
+ struct shm_handle *handle,
+ int shadow);
/*
* Read sequence: snapshot, many get_subbuf/put_subbuf, move_consumer.
* RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free.
*/
if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER
- && uatomic_read(&buf->active_readers)
+ && (uatomic_read(&buf->active_readers)
+ || uatomic_read(&buf->active_shadow_readers))
&& lib_ring_buffer_poll_deliver(config, buf, chan, handle)) {
int wakeup_fd = shm_get_wakeup_fd(handle, &buf->self._ref);
* Active readers count
* standard atomic access (shared)
*/
+ long active_shadow_readers;
/* Dropped records */
union v_atomic records_lost_full; /* Buffer full */
union v_atomic records_lost_wrap; /* Nested wrap-around */
/*
* Only flush buffers periodically if readers are active.
*/
- if (uatomic_read(&buf->active_readers))
+ if (uatomic_read(&buf->active_readers) || uatomic_read(&buf->active_shadow_readers))
lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle);
//TODO timers
CHAN_WARN_ON(chan, !buf->backend.allocated);
- if (uatomic_read(&buf->active_readers)
+ if (uatomic_read(&buf->active_readers) || uatomic_read(&buf->active_shadow_readers))
&& lib_ring_buffer_poll_deliver(config, buf, chan)) {
//TODO
//wake_up_interruptible(&buf->read_wait);
//channel_backend_unregister_notifiers(&chan->backend);
}
-static void channel_free(struct channel *chan, struct shm_handle *handle)
+static void channel_free(struct channel *chan, struct shm_handle *handle,
+ int shadow)
{
int ret;
- channel_backend_free(&chan->backend, handle);
+ if (!shadow)
+ channel_backend_free(&chan->backend, handle);
/* chan is freed by shm teardown */
shm_object_table_destroy(handle->table);
free(handle);
}
static
-void channel_release(struct channel *chan, struct shm_handle *handle)
+void channel_release(struct channel *chan, struct shm_handle *handle,
+ int shadow)
{
- channel_free(chan, handle);
+ channel_free(chan, handle, shadow);
}
/**
* They should release their handle at that point. Returns the private
* data pointer.
*/
-void *channel_destroy(struct channel *chan, struct shm_handle *handle)
+void *channel_destroy(struct channel *chan, struct shm_handle *handle,
+ int shadow)
{
const struct lib_ring_buffer_config *config = &chan->backend.config;
void *priv;
int cpu;
+ if (shadow) {
+ channel_release(chan, handle, shadow);
+ return NULL;
+ }
+
channel_unregister_notifiers(chan, handle);
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
* descriptor directly. No need to refcount.
*/
priv = chan->backend.priv;
- channel_release(chan, handle);
+ channel_release(chan, handle, shadow);
return priv;
}
}
int lib_ring_buffer_open_read(struct lib_ring_buffer *buf,
- struct shm_handle *handle)
+ struct shm_handle *handle,
+ int shadow)
{
struct channel *chan = shmp(handle, buf->backend.chan);
+ if (shadow) {
+ if (uatomic_cmpxchg(&buf->active_shadow_readers, 0, 1) != 0)
+ return -EBUSY;
+ cmm_smp_mb();
+ return 0;
+ }
if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0)
return -EBUSY;
cmm_smp_mb();
}
void lib_ring_buffer_release_read(struct lib_ring_buffer *buf,
- struct shm_handle *handle)
+ struct shm_handle *handle,
+ int shadow)
{
struct channel *chan = shmp(handle, buf->backend.chan);
+ if (shadow) {
+ CHAN_WARN_ON(chan, uatomic_read(&buf->active_shadow_readers) != 1);
+ cmm_smp_mb();
+ uatomic_dec(&buf->active_shadow_readers);
+ return;
+ }
CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
cmm_smp_mb();
uatomic_dec(&buf->active_readers);
struct channel *chan = shmp(handle, bufb->chan);
unsigned long consumed;
- CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
+ CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1
+ && uatomic_read(&buf->active_shadow_readers) != 1);
/*
* Only push the consumed value forward.
const struct lib_ring_buffer_config *config = &chan->backend.config;
unsigned long read_sb_bindex, consumed_idx, consumed;
- CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
+ CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1
+ && uatomic_read(&buf->active_shadow_readers) != 1);
if (!buf->get_subbuf) {
/*
assert(0);
}
for (i = 0; i < 2; i++) {
+ if (obj->wait_fd[i] < 0)
+ continue;
ret = close(obj->wait_fd[i]);
if (ret) {
PERROR("close");
static
void ltt_channel_destroy(struct ltt_channel *ltt_chan)
{
- channel_destroy(ltt_chan->chan, ltt_chan->handle);
+ channel_destroy(ltt_chan->chan, ltt_chan->handle, 0);
}
static
buf = channel_get_ring_buffer(&client_config, chan,
cpu, handle, shm_fd, wait_fd,
memory_map_size);
- if (!lib_ring_buffer_open_read(buf, handle))
+ if (!lib_ring_buffer_open_read(buf, handle, 0))
return buf;
}
return NULL;
void ltt_buffer_read_close(struct lib_ring_buffer *buf,
struct shm_handle *handle)
{
- lib_ring_buffer_release_read(buf, handle);
+ lib_ring_buffer_release_read(buf, handle, 0);
}
static
static
void ltt_channel_destroy(struct ltt_channel *ltt_chan)
{
- channel_destroy(ltt_chan->chan, ltt_chan->handle);
+ channel_destroy(ltt_chan->chan, ltt_chan->handle, 0);
}
static
buf = channel_get_ring_buffer(&client_config, chan,
0, handle, shm_fd, wait_fd, memory_map_size);
- if (!lib_ring_buffer_open_read(buf, handle))
+ if (!lib_ring_buffer_open_read(buf, handle, 0))
return buf;
return NULL;
}
void ltt_buffer_read_close(struct lib_ring_buffer *buf,
struct shm_handle *handle)
{
- lib_ring_buffer_release_read(buf, handle);
+ lib_ring_buffer_release_read(buf, handle, 0);
}
static
for (i = 0; i < objd_table.allocated_len; i++) {
struct obj *obj = _objd_get(i);
- const struct objd_ops *ops;
- if (!obj)
- continue;
- ops = obj->u.s.ops;
- if (ops->release)
- ops->release(i);
+ (void) objd_unref(i);
}
free(objd_table.array);
objd_table.array = NULL;
buf = priv->buf;
channel = priv->ltt_chan;
free(priv);
+ channel->ops->buffer_read_close(buf, channel->handle);
return objd_unref(channel->objd);
}
shm_fd = lum->u.stream.shm_fd;
wait_fd = lum->u.stream.wait_fd;
break;
+ case LTTNG_UST_METADATA:
case LTTNG_UST_CHANNEL:
lur.u.channel.memory_map_size = lum->u.channel.memory_map_size;
shm_fd = lum->u.channel.shm_fd;
goto error;
}
- if ((lum->cmd == LTTNG_UST_STREAM || lum->cmd == LTTNG_UST_CHANNEL)
+ if ((lum->cmd == LTTNG_UST_STREAM
+ || lum->cmd == LTTNG_UST_CHANNEL
+ || lum->cmd == LTTNG_UST_METADATA)
&& lur.ret_code == LTTCOMM_OK) {
/* we also need to send the file descriptors. */
ret = lttcomm_send_fds_unix_sock(sock,