X-Git-Url: http://git.lttng.org./?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=301a8bc7ac52d3574c89ff73d5fa975ae75001a6;hb=47287c8c2ec367b9718dc1eb0f5aef4f492637af;hp=26fc819f6ee64c6d572ff2012ec1d1ff25922434;hpb=fa29bfbf73e837b936d80b4d5a1206dfb8496f07;p=lttng-tools.git diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 26fc819f6..301a8bc7a 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2011 Julien Desfossez + * Copyright (C) 2011 EfficiOS Inc. * Copyright (C) 2011 Mathieu Desnoyers * Copyright (C) 2012 David Goulet * @@ -8,6 +8,7 @@ */ #include "common/index/ctf-index.h" +#include #define _LGPL_SOURCE #include #include @@ -3382,9 +3383,12 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, ssize_t ret, written_bytes = 0; int rotation_ret; struct stream_subbuffer subbuffer = {}; + enum get_next_subbuffer_status get_next_status; if (!locked_by_caller) { stream->read_subbuffer_ops.lock(stream); + } else { + stream->read_subbuffer_ops.assert_locked(stream); } if (stream->read_subbuffer_ops.on_wake_up) { @@ -3407,14 +3411,20 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, } } - ret = stream->read_subbuffer_ops.get_next_subbuffer(stream, &subbuffer); - if (ret) { - if (ret == -ENODATA) { - /* Not an error. */ - ret = 0; - goto sleep_stream; - } + get_next_status = stream->read_subbuffer_ops.get_next_subbuffer( + stream, &subbuffer); + switch (get_next_status) { + case GET_NEXT_SUBBUFFER_STATUS_OK: + break; + case GET_NEXT_SUBBUFFER_STATUS_NO_DATA: + /* Not an error. */ + ret = 0; + goto sleep_stream; + case GET_NEXT_SUBBUFFER_STATUS_ERROR: + ret = -1; goto end; + default: + abort(); } ret = stream->read_subbuffer_ops.pre_consume_subbuffer( @@ -3554,18 +3564,22 @@ error: * This will create a relayd socket pair and add it to the relayd hash table. * The caller MUST acquire a RCU read side lock before calling it. */ - void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, - struct lttng_consumer_local_data *ctx, int sock, +void consumer_add_relayd_socket(uint64_t net_seq_idx, + int sock_type, + struct lttng_consumer_local_data *ctx, + int sock, struct pollfd *consumer_sockpoll, - struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id, - uint64_t relayd_session_id) + uint64_t sessiond_id, + uint64_t relayd_session_id, + uint32_t relayd_version_major, + uint32_t relayd_version_minor, + enum lttcomm_sock_proto relayd_socket_protocol) { int fd = -1, ret = -1, relayd_created = 0; enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct consumer_relayd_sock_pair *relayd = NULL; assert(ctx); - assert(relayd_sock); DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx); @@ -3634,54 +3648,25 @@ error: switch (sock_type) { case LTTNG_STREAM_CONTROL: /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock); - ret = lttcomm_create_sock(&relayd->control_sock.sock); - /* Handle create_sock error. */ - if (ret < 0) { - ret_code = LTTCOMM_CONSUMERD_ENOMEM; - goto error; - } - /* - * Close the socket created internally by - * lttcomm_create_sock, so we can replace it by the one - * received from sessiond. - */ - if (close(relayd->control_sock.sock.fd)) { - PERROR("close"); - } + ret = lttcomm_populate_sock_from_open_socket( + &relayd->control_sock.sock, fd, + relayd_socket_protocol); - /* Assign new file descriptor */ - relayd->control_sock.sock.fd = fd; /* Assign version values. */ - relayd->control_sock.major = relayd_sock->major; - relayd->control_sock.minor = relayd_sock->minor; + relayd->control_sock.major = relayd_version_major; + relayd->control_sock.minor = relayd_version_minor; relayd->relayd_session_id = relayd_session_id; break; case LTTNG_STREAM_DATA: /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock); - ret = lttcomm_create_sock(&relayd->data_sock.sock); - /* Handle create_sock error. */ - if (ret < 0) { - ret_code = LTTCOMM_CONSUMERD_ENOMEM; - goto error; - } - /* - * Close the socket created internally by - * lttcomm_create_sock, so we can replace it by the one - * received from sessiond. - */ - if (close(relayd->data_sock.sock.fd)) { - PERROR("close"); - } - - /* Assign new file descriptor */ - relayd->data_sock.sock.fd = fd; + ret = lttcomm_populate_sock_from_open_socket( + &relayd->data_sock.sock, fd, + relayd_socket_protocol); /* Assign version values. */ - relayd->data_sock.major = relayd_sock->major; - relayd->data_sock.minor = relayd_sock->minor; + relayd->data_sock.major = relayd_version_major; + relayd->data_sock.minor = relayd_version_minor; break; default: ERR("Unknown relayd socket type (%d)", sock_type); @@ -3689,6 +3674,11 @@ error: goto error; } + if (ret < 0) { + ret_code = LTTCOMM_CONSUMERD_FATAL; + goto error; + } + DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)", sock_type == LTTNG_STREAM_CONTROL ? "control" : "data", relayd->net_seq_idx, fd); @@ -4392,7 +4382,11 @@ int consumer_clear_buffer(struct lttng_consumer_stream *stream) break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - lttng_ustconsumer_clear_buffer(stream); + ret = lttng_ustconsumer_clear_buffer(stream); + if (ret < 0) { + ERR("Failed to clear ust stream (ret = %d)", ret); + goto end; + } break; default: ERR("Unknown consumer_data type"); @@ -5242,3 +5236,8 @@ error_unlock: pthread_mutex_unlock(&stream->lock); goto end_rcu_unlock; } + +void lttng_consumer_sigbus_handle(void *addr) +{ + lttng_ustconsumer_sigbus_handle(addr); +}