return POLLERR;
if (channel->ops->is_finalized(channel->chan))
return POLLHUP;
- else
+ if (channel->ops->buffer_has_read_closed_stream(channel->chan))
return POLLIN | POLLRDNORM;
+ return 0;
}
return mask;
unsigned int read_timer_interval);
void (*channel_destroy)(struct channel *chan);
struct lib_ring_buffer *(*buffer_read_open)(struct channel *chan);
+ int (*buffer_has_read_closed_stream)(struct channel *chan);
void (*buffer_read_close)(struct lib_ring_buffer *buf);
int (*event_reserve)(struct lib_ring_buffer_ctx *ctx,
uint32_t event_id);
return NULL;
}
+static
+int ltt_buffer_has_read_closed_stream(struct channel *chan)
+{
+ struct lib_ring_buffer *buf;
+ int cpu;
+
+ for_each_channel_cpu(cpu, chan) {
+ buf = channel_get_ring_buffer(&client_config, chan, cpu);
+ if (!atomic_long_read(&buf->active_readers))
+ return 1;
+ }
+ return 0;
+}
+
static
void ltt_buffer_read_close(struct lib_ring_buffer *buf)
{
.channel_create = _channel_create,
.channel_destroy = ltt_channel_destroy,
.buffer_read_open = ltt_buffer_read_open,
+ .buffer_has_read_closed_stream =
+ ltt_buffer_has_read_closed_stream,
.buffer_read_close = ltt_buffer_read_close,
.event_reserve = ltt_event_reserve,
.event_commit = ltt_event_commit,
return NULL;
}
+static
+int ltt_buffer_has_read_closed_stream(struct channel *chan)
+{
+ struct lib_ring_buffer *buf;
+ int cpu;
+
+ for_each_channel_cpu(cpu, chan) {
+ buf = channel_get_ring_buffer(&client_config, chan, cpu);
+ if (!atomic_long_read(&buf->active_readers))
+ return 1;
+ }
+ return 0;
+}
+
static
void ltt_buffer_read_close(struct lib_ring_buffer *buf)
{
.channel_create = _channel_create,
.channel_destroy = ltt_channel_destroy,
.buffer_read_open = ltt_buffer_read_open,
+ .buffer_has_read_closed_stream =
+ ltt_buffer_has_read_closed_stream,
.buffer_read_close = ltt_buffer_read_close,
.event_reserve = ltt_event_reserve,
.event_commit = ltt_event_commit,