X-Git-Url: http://git.lttng.org./?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=786a1db7bd93b41ef36f27a978c424d1b434aeaf;hb=6d67e4b4a3fb93ef10449e00d42f8344355c2d78;hp=1dcc52c6a648f2d15f8f48b53b27e3fbcf67769a;hpb=f1dab979da8fc9a7ad198d84ef9fc18b60d5f8de;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 1dcc52c6a..786a1db7b 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -164,20 +164,20 @@ static struct lttng_consumer_stream *find_stream(uint64_t key, return stream; } -static void steal_stream_key(int key, struct lttng_ht *ht) +static void steal_stream_key(uint64_t key, struct lttng_ht *ht) { struct lttng_consumer_stream *stream; rcu_read_lock(); stream = find_stream(key, ht); if (stream) { - stream->key = -1ULL; + stream->key = (uint64_t) -1ULL; /* * We don't want the lookup to match, but we still need * to iterate on this stream when iterating over the hash table. Just * change the node key. */ - stream->node.key = -1ULL; + stream->node.key = (uint64_t) -1ULL; } rcu_read_unlock(); } @@ -292,6 +292,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) DBG("Consumer delete channel key %" PRIu64, channel->key); pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&channel->lock); switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -321,6 +322,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) call_rcu(&channel->node.head, free_channel_rcu); end: + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); } @@ -352,13 +354,13 @@ static void cleanup_relayd_ht(void) * It's atomically set without having the stream mutex locked which is fine * because we handle the write/read race with a pipe wakeup for each thread. */ -static void update_endpoint_status_by_netidx(int net_seq_idx, +static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, enum consumer_endpoint_status status) { struct lttng_ht_iter iter; struct lttng_consumer_stream *stream; - DBG("Consumer set delete flag on stream by idx %d", net_seq_idx); + DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx); rcu_read_lock(); @@ -391,7 +393,7 @@ static void update_endpoint_status_by_netidx(int net_seq_idx, static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, struct lttng_consumer_local_data *ctx) { - int netidx; + uint64_t netidx; assert(relayd); @@ -562,6 +564,19 @@ free_stream_rcu: call_rcu(&stream->node.head, free_stream_rcu); } +/* + * XXX naming of del vs destroy is all mixed up. + */ +void consumer_del_stream_for_data(struct lttng_consumer_stream *stream) +{ + consumer_del_stream(stream, data_ht); +} + +void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream) +{ + consumer_del_stream(stream, metadata_ht); +} + struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, uint64_t stream_key, enum lttng_consumer_stream_state state, @@ -589,11 +604,13 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->key = stream_key; stream->out_fd = -1; stream->out_fd_offset = 0; + stream->output_written = 0; stream->state = state; stream->uid = uid; stream->gid = gid; stream->net_seq_idx = relayd_id; stream->session_id = session_id; + stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; pthread_mutex_init(&stream->lock, NULL); /* If channel is the metadata, flag this stream as metadata. */ @@ -639,9 +656,9 @@ end: /* * Add a stream to the global list protected by a mutex. */ -static int add_stream(struct lttng_consumer_stream *stream, - struct lttng_ht *ht) +int consumer_add_data_stream(struct lttng_consumer_stream *stream) { + struct lttng_ht *ht = data_ht; int ret = 0; struct consumer_relayd_sock_pair *relayd; @@ -651,6 +668,8 @@ static int add_stream(struct lttng_consumer_stream *stream, DBG3("Adding consumer stream %" PRIu64, stream->key); pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->chan->lock); + pthread_mutex_lock(&stream->chan->timer_lock); pthread_mutex_lock(&stream->lock); rcu_read_lock(); @@ -694,11 +713,18 @@ static int add_stream(struct lttng_consumer_stream *stream, rcu_read_unlock(); pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->timer_lock); + pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); return ret; } +void consumer_del_data_stream(struct lttng_consumer_stream *stream) +{ + consumer_del_stream(stream, data_ht); +} + /* * Add relayd socket to global consumer data hashtable. RCU read side lock MUST * be acquired before calling this. @@ -727,12 +753,12 @@ end: * Allocate and return a consumer relayd socket. */ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( - int net_seq_idx) + uint64_t net_seq_idx) { struct consumer_relayd_sock_pair *obj = NULL; - /* Negative net sequence index is a failure */ - if (net_seq_idx < 0) { + /* net sequence index of -1 is a failure */ + if (net_seq_idx == (uint64_t) -1ULL) { goto error; } @@ -858,7 +884,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t relayd_id, enum lttng_event_output output, uint64_t tracefile_size, - uint64_t tracefile_count) + uint64_t tracefile_count, + uint64_t session_id_per_pid) { struct lttng_consumer_channel *channel; @@ -871,12 +898,15 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->key = key; channel->refcount = 0; channel->session_id = session_id; + channel->session_id_per_pid = session_id_per_pid; channel->uid = uid; channel->gid = gid; channel->relayd_id = relayd_id; channel->output = output; channel->tracefile_size = tracefile_size; channel->tracefile_count = tracefile_count; + pthread_mutex_init(&channel->lock, NULL); + pthread_mutex_init(&channel->timer_lock, NULL); strncpy(channel->pathname, pathname, sizeof(channel->pathname)); channel->pathname[sizeof(channel->pathname) - 1] = '\0'; @@ -909,6 +939,8 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, struct lttng_ht_iter iter; pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&channel->lock); + pthread_mutex_lock(&channel->timer_lock); rcu_read_lock(); lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter); @@ -925,6 +957,8 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, end: rcu_read_unlock(); + pthread_mutex_unlock(&channel->timer_lock); + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); if (!ret && channel->wait_fd != -1 && @@ -1160,7 +1194,7 @@ struct lttng_consumer_local_data *lttng_consumer_create( struct lttng_consumer_local_data *ctx), int (*recv_channel)(struct lttng_consumer_channel *channel), int (*recv_stream)(struct lttng_consumer_stream *stream), - int (*update_stream)(int stream_key, uint32_t state)) + int (*update_stream)(uint64_t stream_key, uint32_t state)) { int ret; struct lttng_consumer_local_data *ctx; @@ -1331,9 +1365,10 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( rcu_read_lock(); /* Flag that the current stream if set for network streaming. */ - if (stream->net_seq_idx != -1) { + if (stream->net_seq_idx != (uint64_t) -1ULL) { relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd == NULL) { + ret = -EPIPE; goto end; } } @@ -1343,28 +1378,31 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( case LTTNG_CONSUMER_KERNEL: mmap_base = stream->mmap_base; ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset); + if (ret != 0) { + PERROR("tracer ctl get_mmap_read_offset"); + written = -errno; + goto end; + } break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: mmap_base = lttng_ustctl_get_mmap_base(stream); if (!mmap_base) { ERR("read mmap get mmap base for stream %s", stream->name); - written = -1; + written = -EPERM; goto end; } ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset); - + if (ret != 0) { + PERROR("tracer ctl get_mmap_read_offset"); + written = ret; + goto end; + } break; default: ERR("Unknown consumer_data type"); assert(0); } - if (ret != 0) { - errno = -ret; - PERROR("tracer ctl get_mmap_read_offset"); - written = ret; - goto end; - } /* Handle stream on the relayd if the output is on the network */ if (relayd) { @@ -1427,6 +1465,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( outfd = stream->out_fd = ret; /* Reset current size because we just perform a rotation. */ stream->tracefile_size_current = 0; + stream->out_fd_offset = 0; + orig_offset = 0; } stream->tracefile_size_current += len; } @@ -1445,7 +1485,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( */ DBG("Error in file write mmap"); if (written == 0) { - written = ret; + written = -errno; } /* Socket operation failed. We consider the relayd dead */ if (errno == EPIPE || errno == EINVAL) { @@ -1469,6 +1509,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( SYNC_FILE_RANGE_WRITE); stream->out_fd_offset += ret; } + stream->output_written += ret; written += ret; } lttng_consumer_sync_trace_file(stream, orig_offset); @@ -1530,9 +1571,10 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( rcu_read_lock(); /* Flag that the current stream if set for network streaming. */ - if (stream->net_seq_idx != -1) { + if (stream->net_seq_idx != (uint64_t) -1ULL) { relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd == NULL) { + ret = -EPIPE; goto end; } } @@ -1609,6 +1651,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( outfd = stream->out_fd = ret; /* Reset current size because we just perform a rotation. */ stream->tracefile_size_current = 0; + stream->out_fd_offset = 0; + orig_offset = 0; } stream->tracefile_size_current += len; } @@ -1679,6 +1723,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( SYNC_FILE_RANGE_WRITE); stream->out_fd_offset += ret_splice; } + stream->output_written += ret_splice; written += ret_splice; } lttng_consumer_sync_trace_file(stream, orig_offset); @@ -1884,6 +1929,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, } pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); switch (consumer_data.type) { @@ -1972,12 +2018,13 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, end: /* * Nullify the stream reference so it is not used after deletion. The - * consumer data lock MUST be acquired before being able to check for a - * NULL pointer value. + * channel lock MUST be acquired before being able to check for + * a NULL pointer value. */ stream->chan->metadata_stream = NULL; pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); if (free_chan) { @@ -1992,9 +2039,9 @@ free_stream_rcu: * Action done with the metadata stream when adding it to the consumer internal * data structures to handle it. */ -static int add_metadata_stream(struct lttng_consumer_stream *stream, - struct lttng_ht *ht) +int consumer_add_metadata_stream(struct lttng_consumer_stream *stream) { + struct lttng_ht *ht = metadata_ht; int ret = 0; struct consumer_relayd_sock_pair *relayd; struct lttng_ht_iter iter; @@ -2006,6 +2053,8 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key); pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->chan->lock); + pthread_mutex_lock(&stream->chan->timer_lock); pthread_mutex_lock(&stream->lock); /* @@ -2057,6 +2106,8 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, rcu_read_unlock(); pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); + pthread_mutex_unlock(&stream->chan->timer_lock); pthread_mutex_unlock(&consumer_data.lock); return ret; } @@ -2202,7 +2253,7 @@ restart: pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe, &stream, sizeof(stream)); if (pipe_len < 0) { - ERR("read metadata stream, ret: %ld", pipe_len); + ERR("read metadata stream, ret: %zd", pipe_len); /* * Continue here to handle the rest of the streams. */ @@ -2219,14 +2270,6 @@ restart: DBG("Adding metadata stream %d to poll set", stream->wait_fd); - ret = add_metadata_stream(stream, metadata_ht); - if (ret) { - ERR("Unable to add metadata stream"); - /* Stream was not setup properly. Continuing. */ - consumer_del_metadata_stream(stream, NULL); - continue; - } - /* Add metadata stream to the global poll events list */ lttng_poll_add(&events, stream->wait_fd, LPOLLIN | LPOLLPRI); @@ -2331,7 +2374,11 @@ void *consumer_thread_data_poll(void *data) goto end; } - local_stream = zmalloc(sizeof(struct lttng_consumer_stream)); + local_stream = zmalloc(sizeof(struct lttng_consumer_stream *)); + if (local_stream == NULL) { + PERROR("local_stream malloc"); + goto end; + } while (1) { high_prio = 0; @@ -2414,7 +2461,7 @@ void *consumer_thread_data_poll(void *data) pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe, &new_stream, sizeof(new_stream)); if (pipe_readlen < 0) { - ERR("Consumer data pipe ret %ld", pipe_readlen); + ERR("Consumer data pipe ret %zd", pipe_readlen); /* Continue so we can at least handle the current stream(s). */ continue; } @@ -2429,17 +2476,6 @@ void *consumer_thread_data_poll(void *data) continue; } - ret = add_stream(new_stream, data_ht); - if (ret) { - ERR("Consumer add stream %" PRIu64 " failed. Continuing", - new_stream->key); - /* - * At this point, if the add_stream fails, it is not in the - * hash table thus passing the NULL value here. - */ - consumer_del_stream(new_stream, NULL); - } - /* Continue to update the local streams and handle prio ones */ continue; } @@ -3069,10 +3105,10 @@ void lttng_consumer_init(void) * This will create a relayd socket pair and add it to the relayd hash table. * The caller MUST acquire a RCU read side lock before calling it. */ -int consumer_add_relayd_socket(int net_seq_idx, int sock_type, +int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll, - struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id) + struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id) { int fd = -1, ret = -1, relayd_created = 0; enum lttng_error_code ret_code = LTTNG_OK; @@ -3081,18 +3117,20 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, assert(ctx); assert(relayd_sock); - DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx); + DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx); /* Get relayd reference if exists. */ relayd = consumer_find_relayd(net_seq_idx); if (relayd == NULL) { + assert(sock_type == LTTNG_STREAM_CONTROL); /* Not found. Allocate one. */ relayd = consumer_allocate_relayd_sock_pair(net_seq_idx); if (relayd == NULL) { - ret_code = LTTCOMM_CONSUMERD_ENOMEM; ret = -ENOMEM; + ret_code = LTTCOMM_CONSUMERD_ENOMEM; + goto error; } else { - relayd->sessiond_session_id = (uint64_t) sessiond_id; + relayd->sessiond_session_id = sessiond_id; relayd_created = 1; } @@ -3101,26 +3139,31 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, * we can notify the session daemon and continue our work without * killing everything. */ + } else { + /* + * relayd key should never be found for control socket. + */ + assert(sock_type != LTTNG_STREAM_CONTROL); } /* First send a status message before receiving the fds. */ - ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0 || ret_code != LTTNG_OK) { + ret = consumer_send_status_msg(sock, LTTNG_OK); + if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto error; + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL); + goto error_nosignal; } /* Poll on consumer socket. */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); ret = -EINTR; - goto error; + goto error_nosignal; } /* Get relayd socket from session daemon */ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); if (ret != sizeof(fd)) { - ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD; ret = -1; fd = -1; /* Just in case it gets set with an invalid value. */ @@ -3134,18 +3177,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, * issue when reaching the fd limit. */ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); - - /* - * This code path MUST continue to the consumer send status message so - * we can send the error to the thread expecting a reply. The above - * call will make everything stop. - */ - } - - /* We have the fds without error. Send status back. */ - ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0 || ret_code != LTTNG_OK) { - /* Somehow, the session daemon is not responding anymore. */ + ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD; goto error; } @@ -3155,16 +3187,19 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, /* Copy received lttcomm socket */ lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock); ret = lttcomm_create_sock(&relayd->control_sock.sock); - /* Immediately try to close the created socket if valid. */ - if (relayd->control_sock.sock.fd >= 0) { - if (close(relayd->control_sock.sock.fd)) { - PERROR("close relayd control socket"); - } - } /* Handle create_sock error. */ if (ret < 0) { + ret_code = LTTCOMM_CONSUMERD_ENOMEM; goto error; } + /* + * Close the socket created internally by + * lttcomm_create_sock, so we can replace it by the one + * received from sessiond. + */ + if (close(relayd->control_sock.sock.fd)) { + PERROR("close"); + } /* Assign new file descriptor */ relayd->control_sock.sock.fd = fd; @@ -3193,6 +3228,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, */ (void) relayd_close(&relayd->control_sock); (void) relayd_close(&relayd->data_sock); + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; goto error; } @@ -3201,16 +3237,19 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, /* Copy received lttcomm socket */ lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock); ret = lttcomm_create_sock(&relayd->data_sock.sock); - /* Immediately try to close the created socket if valid. */ - if (relayd->data_sock.sock.fd >= 0) { - if (close(relayd->data_sock.sock.fd)) { - PERROR("close relayd data socket"); - } - } /* Handle create_sock error. */ if (ret < 0) { + ret_code = LTTCOMM_CONSUMERD_ENOMEM; goto error; } + /* + * Close the socket created internally by + * lttcomm_create_sock, so we can replace it by the one + * received from sessiond. + */ + if (close(relayd->data_sock.sock.fd)) { + PERROR("close"); + } /* Assign new file descriptor */ relayd->data_sock.sock.fd = fd; @@ -3222,6 +3261,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, default: ERR("Unknown relayd socket type (%d)", sock_type); ret = -1; + ret_code = LTTCOMM_CONSUMERD_FATAL; goto error; } @@ -3229,6 +3269,14 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, sock_type == LTTNG_STREAM_CONTROL ? "control" : "data", relayd->net_seq_idx, fd); + /* We successfully added the socket. Send status back. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL); + goto error_nosignal; + } + /* * Add relayd socket pair to consumer data hashtable. If object already * exists or on error, the function gracefully returns. @@ -3239,6 +3287,11 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, return 0; error: + if (consumer_send_status_msg(sock, ret_code) < 0) { + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL); + } + +error_nosignal: /* Close received socket if valid. */ if (fd >= 0) { if (close(fd)) { @@ -3378,6 +3431,15 @@ int consumer_data_pending(uint64_t id) */ ret = cds_lfht_is_node_deleted(&stream->node.node); if (!ret) { + /* + * An empty output file is not valid. We need at least one packet + * generated per stream, even if it contains no event, so it + * contains at least one packet header. + */ + if (stream->output_written == 0) { + pthread_mutex_unlock(&stream->lock); + goto data_pending; + } /* Check the stream if there is data in the buffers. */ ret = data_pending(stream); if (ret == 1) {