The metadata thread is now created in the lttng-consumerd daemon so all
thread could be controlled inside the daemon.
This is the first step of a consumer thread refactoring which aims at
moving data and metadata stream operations inside a dedicated thread so
the session daemon thread does not block and is more efficient at adding
streams.
The most important concept is that a stream file descriptor MUST be
opened as quickly as we can then passed to the right thread (for UST
since they are already opened by the session daemon for the kernel).
Signed-off-by: David Goulet <dgoulet@efficios.com>
}
lttng_consumer_set_error_sock(ctx, ret);
}
lttng_consumer_set_error_sock(ctx, ret);
- /* Create the thread to manage the receive of fd */
- ret = pthread_create(&threads[0], NULL, lttng_consumer_thread_receive_fds,
+ /* Create thread to manage the polling/writing of trace metadata */
+ ret = pthread_create(&threads[0], NULL, consumer_thread_metadata_poll,
+ (void *) ctx);
+ if (ret != 0) {
+ perror("pthread_create");
+ goto error;
+ }
+
+ /* Create thread to manage the polling/writing of trace data */
+ ret = pthread_create(&threads[1], NULL, consumer_thread_data_poll,
(void *) ctx);
if (ret != 0) {
perror("pthread_create");
goto error;
}
(void *) ctx);
if (ret != 0) {
perror("pthread_create");
goto error;
}
- /* Create thread to manage the polling/writing of traces */
- ret = pthread_create(&threads[1], NULL, lttng_consumer_thread_poll_fds,
+ /* Create the thread to manage the receive of fd */
+ ret = pthread_create(&threads[2], NULL, consumer_thread_sessiond_poll,
(void *) ctx);
if (ret != 0) {
perror("pthread_create");
goto error;
}
(void *) ctx);
if (ret != 0) {
perror("pthread_create");
goto error;
}
- for (i = 0; i < 2; i++) {
+ for (i = 0; i < 3; i++) {
ret = pthread_join(threads[i], &status);
if (ret != 0) {
perror("pthread_join");
ret = pthread_join(threads[i], &status);
if (ret != 0) {
perror("pthread_join");
* Thread polls on metadata file descriptor and write them on disk or on the
* network.
*/
* Thread polls on metadata file descriptor and write them on disk or on the
* network.
*/
-void *lttng_consumer_thread_poll_metadata(void *data)
+void *consumer_thread_metadata_poll(void *data)
{
int ret, i, pollfd;
uint32_t revents, nb_fd;
{
int ret, i, pollfd;
uint32_t revents, nb_fd;
* This thread polls the fds in the set to consume the data and write
* it to tracefile if necessary.
*/
* This thread polls the fds in the set to consume the data and write
* it to tracefile if necessary.
*/
-void *lttng_consumer_thread_poll_fds(void *data)
+void *consumer_thread_data_poll(void *data)
{
int num_rdy, num_hup, high_prio, ret, i;
struct pollfd *pollfd = NULL;
{
int num_rdy, num_hup, high_prio, ret, i;
struct pollfd *pollfd = NULL;
int nb_fd = 0;
struct lttng_consumer_local_data *ctx = data;
ssize_t len;
int nb_fd = 0;
struct lttng_consumer_local_data *ctx = data;
ssize_t len;
- pthread_t metadata_thread;
- void *status;
- /* Start metadata polling thread */
- ret = pthread_create(&metadata_thread, NULL,
- lttng_consumer_thread_poll_metadata, (void *) ctx);
- if (ret < 0) {
- PERROR("pthread_create metadata thread");
- goto end;
- }
-
local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
while (1) {
local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
while (1) {
/*
* Close the write side of the pipe so epoll_wait() in
/*
* Close the write side of the pipe so epoll_wait() in
- * lttng_consumer_thread_poll_metadata can catch it. The thread is
- * monitoring the read side of the pipe. If we close them both, epoll_wait
- * strangely does not return and could create a endless wait period if the
- * pipe is the only tracked fd in the poll set. The thread will take care
- * of closing the read side.
+ * consumer_thread_metadata_poll can catch it. The thread is monitoring the
+ * read side of the pipe. If we close them both, epoll_wait strangely does
+ * not return and could create a endless wait period if the pipe is the
+ * only tracked fd in the poll set. The thread will take care of closing
+ * the read side.
*/
close(ctx->consumer_metadata_pipe[1]);
*/
close(ctx->consumer_metadata_pipe[1]);
- if (ret) {
- ret = pthread_join(metadata_thread, &status);
- if (ret < 0) {
- PERROR("pthread_join metadata thread");
- }
- }
rcu_unregister_thread();
return NULL;
rcu_unregister_thread();
return NULL;
* This thread listens on the consumerd socket and receives the file
* descriptors from the session daemon.
*/
* This thread listens on the consumerd socket and receives the file
* descriptors from the session daemon.
*/
-void *lttng_consumer_thread_receive_fds(void *data)
+void *consumer_thread_sessiond_poll(void *data)
{
int sock, client_socket, ret;
/*
{
int sock, client_socket, ret;
/*
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream,
unsigned long *pos);
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream,
unsigned long *pos);
-extern void *lttng_consumer_thread_poll_fds(void *data);
-extern void *lttng_consumer_thread_receive_fds(void *data);
+extern void *consumer_thread_metadata_poll(void *data);
+extern void *consumer_thread_data_poll(void *data);
+extern void *consumer_thread_sessiond_poll(void *data);
extern int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll);
extern int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll);