}
lttng_consumer_set_error_sock(ctx, ret);
- /* Create the thread to manage the receive of fd */
- ret = pthread_create(&threads[0], NULL, lttng_consumer_thread_receive_fds,
+ /* Create thread to manage the polling/writing of trace metadata */
+ ret = pthread_create(&threads[0], NULL, consumer_thread_metadata_poll,
+ (void *) ctx);
+ if (ret != 0) {
+ perror("pthread_create");
+ goto error;
+ }
+
+ /* Create thread to manage the polling/writing of trace data */
+ ret = pthread_create(&threads[1], NULL, consumer_thread_data_poll,
(void *) ctx);
if (ret != 0) {
perror("pthread_create");
goto error;
}
- /* Create thread to manage the polling/writing of traces */
- ret = pthread_create(&threads[1], NULL, lttng_consumer_thread_poll_fds,
+ /* Create the thread to manage the receive of fd */
+ ret = pthread_create(&threads[2], NULL, consumer_thread_sessiond_poll,
(void *) ctx);
if (ret != 0) {
perror("pthread_create");
goto error;
}
- for (i = 0; i < 2; i++) {
+ for (i = 0; i < 3; i++) {
ret = pthread_join(threads[i], &status);
if (ret != 0) {
perror("pthread_join");
* Thread polls on metadata file descriptor and write them on disk or on the
* network.
*/
-void *lttng_consumer_thread_poll_metadata(void *data)
+void *consumer_thread_metadata_poll(void *data)
{
int ret, i, pollfd;
uint32_t revents, nb_fd;
* This thread polls the fds in the set to consume the data and write
* it to tracefile if necessary.
*/
-void *lttng_consumer_thread_poll_fds(void *data)
+void *consumer_thread_data_poll(void *data)
{
int num_rdy, num_hup, high_prio, ret, i;
struct pollfd *pollfd = NULL;
int nb_fd = 0;
struct lttng_consumer_local_data *ctx = data;
ssize_t len;
- pthread_t metadata_thread;
- void *status;
rcu_register_thread();
- /* Start metadata polling thread */
- ret = pthread_create(&metadata_thread, NULL,
- lttng_consumer_thread_poll_metadata, (void *) ctx);
- if (ret < 0) {
- PERROR("pthread_create metadata thread");
- goto end;
- }
-
local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
while (1) {
/*
* Close the write side of the pipe so epoll_wait() in
- * lttng_consumer_thread_poll_metadata can catch it. The thread is
- * monitoring the read side of the pipe. If we close them both, epoll_wait
- * strangely does not return and could create a endless wait period if the
- * pipe is the only tracked fd in the poll set. The thread will take care
- * of closing the read side.
+ * consumer_thread_metadata_poll can catch it. The thread is monitoring the
+ * read side of the pipe. If we close them both, epoll_wait strangely does
+ * not return and could create a endless wait period if the pipe is the
+ * only tracked fd in the poll set. The thread will take care of closing
+ * the read side.
*/
close(ctx->consumer_metadata_pipe[1]);
- if (ret) {
- ret = pthread_join(metadata_thread, &status);
- if (ret < 0) {
- PERROR("pthread_join metadata thread");
- }
- }
rcu_unregister_thread();
return NULL;
* This thread listens on the consumerd socket and receives the file
* descriptors from the session daemon.
*/
-void *lttng_consumer_thread_receive_fds(void *data)
+void *consumer_thread_sessiond_poll(void *data)
{
int sock, client_socket, ret;
/*
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream,
unsigned long *pos);
-extern void *lttng_consumer_thread_poll_fds(void *data);
-extern void *lttng_consumer_thread_receive_fds(void *data);
+extern void *consumer_thread_metadata_poll(void *data);
+extern void *consumer_thread_data_poll(void *data);
+extern void *consumer_thread_sessiond_poll(void *data);
extern int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll);