consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
} else if (len > 0) {
- local_stream[i]->data_read = 1;
+ local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
}
}
}
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
} else if (len > 0) {
- local_stream[i]->data_read = 1;
+ local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
}
}
}
pollfd[i].fd);
lttng_ustconsumer_on_stream_hangup(local_stream[i]);
/* Attempt read again, for the data we just flushed. */
- local_stream[i]->data_read = 1;
+ local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
}
/*
+ * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is
+ * performed. This type of flush ensures that a new packet is produced no
+ * matter the consumed/produced positions are.
+ *
+ * This, in turn, causes the next pass to see that data available for the
+ * stream. When we come back here, we can be assured that all available
+ * data has been consumed and we can finally destroy the stream.
+ *
* If the poll flag is HUP/ERR/NVAL and we have
* read no data in this pass, we can remove the
* stream from its hash table.
*/
if ((pollfd[i].revents & POLLHUP)) {
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
- if (!local_stream[i]->data_read) {
+ if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
num_hup++;
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
- if (!local_stream[i]->data_read) {
+ if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
num_hup++;
}
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
- if (!local_stream[i]->data_read) {
+ if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
num_hup++;
}
}
if (local_stream[i] != NULL) {
- local_stream[i]->data_read = 0;
+ local_stream[i]->has_data_left_to_be_read_before_teardown = 0;
}
}
}