+void process_blocked_consumers(void)
+{
+ int n_fds = 0;
+ struct pollfd *fds;
+ struct blocked_consumer *bc;
+ int idx = 0;
+ char inbuf;
+ int result;
+
+ list_for_each_entry(bc, &blocked_consumers, list) {
+ n_fds++;
+ }
+
+ fds = (struct pollfd *) malloc(n_fds * sizeof(struct pollfd));
+ if(fds == NULL) {
+ ERR("malloc returned NULL");
+ return;
+ }
+
+ list_for_each_entry(bc, &blocked_consumers, list) {
+ fds[idx].fd = bc->fd_producer;
+ fds[idx].events = POLLIN;
+ bc->tmp_poll_idx = idx;
+ idx++;
+ }
+
+ result = poll(fds, n_fds, 0);
+ if(result == -1) {
+ PERROR("poll");
+ return -1;
+ }
+
+ list_for_each_entry(bc, &blocked_consumers, list) {
+ if(fds[bc->tmp_poll_idx].revents) {
+ long consumed_old = 0;
+ char *reply;
+
+ result = read(bc->fd_producer, &inbuf, 1);
+ if(result == -1) {
+ PERROR("read");
+ continue;
+ }
+ if(result == 0) {
+ DBG("PRODUCER END");
+
+ close(bc->fd_producer);
+
+ __list_del(bc->list.prev, bc->list.next);
+
+ result = ustcomm_send_reply(&bc->server, "END", &bc->src);
+ if(result < 0) {
+ ERR("ustcomm_send_reply failed");
+ continue;
+ }
+
+ continue;
+ }
+
+ result = ltt_do_get_subbuf(bc->rbuf, bc->lttbuf, &consumed_old);
+ if(result == -EAGAIN) {
+ WARN("missed buffer?");
+ continue;
+ }
+ else if(result < 0) {
+ DBG("ltt_do_get_subbuf: error: %s", strerror(-result));
+ }
+ asprintf(&reply, "%s %ld", "OK", consumed_old);
+ result = ustcomm_send_reply(&bc->server, reply, &bc->src);
+ if(result < 0) {
+ ERR("ustcomm_send_reply failed");
+ free(reply);
+ continue;
+ }
+ free(reply);
+
+ __list_del(bc->list.prev, bc->list.next);
+ }
+ }
+
+}
+