Between threads, when the metadata stream is received, it is allocated.
We now pass the pointer to the metadata thread thus fixing a memory leak
because the original stream was never freed.
This commit also modified some debug statements and remove a duplicate
code snippet.
Signed-off-by: David Goulet <dgoulet@efficios.com>
close(ctx->consumer_metadata_pipe[0]);
continue;
} else if (revents & LPOLLIN) {
- stream = zmalloc(sizeof(struct lttng_consumer_stream));
- if (stream == NULL) {
- PERROR("zmalloc metadata consumer stream");
- goto error;
- }
-
do {
- /* Get the stream and add it to the local hash table */
- ret = read(pollfd, stream,
- sizeof(struct lttng_consumer_stream));
+ /* Get the stream pointer received */
+ ret = read(pollfd, &stream, sizeof(stream));
} while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret < sizeof(struct lttng_consumer_stream)) {
+ if (ret < 0 ||
+ ret < sizeof(struct lttng_consumer_stream *)) {
PERROR("read metadata stream");
- free(stream);
/*
* Let's continue here and hope we can still work
* without stopping the consumer. XXX: Should we?
goto end_nosignal;
}
- /* Send stream to the metadata thread */
- if (new_stream->metadata_flag) {
- if (ctx->on_recv_stream) {
- ret = ctx->on_recv_stream(new_stream);
- if (ret < 0) {
- goto end_nosignal;
- }
+ if (ctx->on_recv_stream) {
+ ret = ctx->on_recv_stream(new_stream);
+ if (ret < 0) {
+ goto end_nosignal;
}
+ }
+ /* Send stream to the metadata thread */
+ if (new_stream->metadata_flag) {
do {
- ret = write(ctx->consumer_metadata_pipe[1], new_stream,
- sizeof(struct lttng_consumer_stream));
+ ret = write(ctx->consumer_metadata_pipe[1], &new_stream,
+ sizeof(new_stream));
} while (ret < 0 && errno == EINTR);
if (ret < 0) {
PERROR("write metadata pipe");
}
} else {
- if (ctx->on_recv_stream) {
- ret = ctx->on_recv_stream(new_stream);
- if (ret < 0) {
- goto end_nosignal;
- }
- }
consumer_add_stream(new_stream);
}
goto error;
}
- DBG3("Relayd sending command %d", cmd);
+ DBG3("Relayd sending command %d of size %" PRIu64, cmd, buf_size);
error:
free(buf);
{
int ret;
- DBG3("Relayd waiting for reply...");
+ DBG3("Relayd waiting for reply of size %ld", size);
ret = sock->ops->recvmsg(sock, data, size, 0);
if (ret < 0) {
goto error;
}
- /* Recevie response */
+ /* Waiting for reply */
ret = recv_reply(sock, (void *) &reply, sizeof(reply));
if (ret < 0) {
goto error;
/*
* After that call, the metadata data MUST be sent to the relayd so the
* receive size on the other end matches the len of the metadata packet
- * header.
+ * header. This is why we don't wait for a reply here.
*/
error:
assert(sock);
assert(hdr);
- DBG3("Relayd sending data header...");
+ DBG3("Relayd sending data header of size %ld", size);
/* Again, safety net */
if (size == 0) {
goto end_nosignal;
}
- /* Send stream to the metadata thread */
- if (new_stream->metadata_flag) {
- if (ctx->on_recv_stream) {
- ret = ctx->on_recv_stream(new_stream);
- if (ret < 0) {
- goto end_nosignal;
- }
+ /* Do actions once stream has been received. */
+ if (ctx->on_recv_stream) {
+ ret = ctx->on_recv_stream(new_stream);
+ if (ret < 0) {
+ goto end_nosignal;
}
+ }
+ /* Send stream to the metadata thread */
+ if (new_stream->metadata_flag) {
do {
- ret = write(ctx->consumer_metadata_pipe[1], new_stream,
- sizeof(struct lttng_consumer_stream));
+ ret = write(ctx->consumer_metadata_pipe[1], &new_stream,
+ sizeof(new_stream));
} while (ret < 0 && errno == EINTR);
if (ret < 0) {
PERROR("write metadata pipe");
}
} else {
- if (ctx->on_recv_stream) {
- ret = ctx->on_recv_stream(new_stream);
- if (ret < 0) {
- goto end_nosignal;
- }
- }
consumer_add_stream(new_stream);
}
ERR("Error writing to tracefile "
"(ret: %zd != len: %lu != subbuf_size: %lu)",
ret, len, subbuf_size);
-
}
err = ustctl_put_next_subbuf(handle, buf);
assert(err == 0);