X-Git-Url: http://git.lttng.org./?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=3bb2e25d08333348f42c0c97060dc59496d32a5e;hb=dc5141b9bf8323b45582c255fd3afd541fa63e14;hp=129203fa5a9bb8ca655ae22c428d9531a1368adb;hpb=65da3cc731c72486d53d58c9b080f235121177d9;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 129203fa5..3bb2e25d0 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -47,6 +47,7 @@ #include "consumer.h" #include "consumer-stream.h" #include "consumer-testpoint.h" +#include "align.h" struct lttng_consumer_global_data consumer_data = { .stream_count = 0, @@ -2180,14 +2181,6 @@ void *consumer_thread_metadata_poll(void *data) DBG("Metadata main loop started"); while (1) { - health_code_update(); - - /* Only the metadata pipe is set */ - if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { - err = 0; /* All is OK */ - goto end; - } - restart: health_code_update(); health_poll_entry(); @@ -2202,7 +2195,10 @@ restart: ERR("Poll EINTR catched"); goto restart; } - goto error; + if (LTTNG_POLL_GETNB(&events) == 0) { + err = 0; /* All is OK */ + } + goto end; } nb_fd = ret; @@ -2335,7 +2331,6 @@ restart: /* All is OK */ err = 0; -error: end: DBG("Metadata poll thread exiting"); @@ -2758,14 +2753,6 @@ void *consumer_thread_channel_poll(void *data) DBG("Channel main loop started"); while (1) { - health_code_update(); - - /* Only the channel pipe is set */ - if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { - err = 0; /* All is OK */ - goto end; - } - restart: health_code_update(); DBG("Channel poll wait"); @@ -2780,6 +2767,9 @@ restart: ERR("Poll EINTR catched"); goto restart; } + if (LTTNG_POLL_GETNB(&events) == 0) { + err = 0; /* All is OK */ + } goto end; } @@ -3661,22 +3651,19 @@ int consumer_send_status_channel(int sock, return lttcomm_send_unix_sock(sock, &msg, sizeof(msg)); } -/* - * Using a maximum stream size with the produced and consumed position of a - * stream, computes the new consumed position to be as close as possible to the - * maximum possible stream size. - * - * If maximum stream size is lower than the possible buffer size (produced - - * consumed), the consumed_pos given is returned untouched else the new value - * is returned. - */ -unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos, - unsigned long produced_pos, uint64_t max_stream_size) +unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, + unsigned long produced_pos, uint64_t nb_packets_per_stream, + uint64_t max_sb_size) { - if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) { - /* Offset from the produced position to get the latest buffers. */ - return produced_pos - max_stream_size; - } + unsigned long start_pos; - return consumed_pos; + if (!nb_packets_per_stream) { + return consumed_pos; /* Grab everything */ + } + start_pos = produced_pos - offset_align_floor(produced_pos, max_sb_size); + start_pos -= max_sb_size * nb_packets_per_stream; + if ((long) (start_pos - consumed_pos) < 0) { + return consumed_pos; /* Grab everything */ + } + return start_pos; }