X-Git-Url: http://git.lttng.org./?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=aacabdb76d4acc5e54bd3cb9434bd789e0b71ca0;hb=d36f818641aba7c437cde1f9f6a8983af39cfa21;hp=7acb8560ca4e07bfdccc89518477dad6a8d79051;hpb=e5d1a9b33aa0920bbd9f6948bd0676156da67c61;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 7acb8560c..aacabdb76 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -854,13 +854,26 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->uid = uid; channel->gid = gid; channel->relayd_id = relayd_id; - channel->output = output; channel->tracefile_size = tracefile_size; channel->tracefile_count = tracefile_count; channel->monitor = monitor; pthread_mutex_init(&channel->lock, NULL); pthread_mutex_init(&channel->timer_lock, NULL); + switch (output) { + case LTTNG_EVENT_SPLICE: + channel->output = CONSUMER_CHANNEL_SPLICE; + break; + case LTTNG_EVENT_MMAP: + channel->output = CONSUMER_CHANNEL_MMAP; + break; + default: + assert(0); + free(channel); + channel = NULL; + goto end; + } + /* * In monitor mode, the streams associated with the channel will be put in * a special list ONLY owned by this channel. So, the refcount is set to 1 @@ -1235,6 +1248,57 @@ error: return NULL; } +/* + * Iterate over all streams of the hashtable and free them properly. + */ +static void destroy_data_stream_ht(struct lttng_ht *ht) +{ + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + + if (ht == NULL) { + return; + } + + rcu_read_lock(); + cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_stream(stream, ht); + } + rcu_read_unlock(); + + lttng_ht_destroy(ht); +} + +/* + * Iterate over all streams of the metadata hashtable and free them + * properly. + */ +static void destroy_metadata_stream_ht(struct lttng_ht *ht) +{ + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + + if (ht == NULL) { + return; + } + + rcu_read_lock(); + cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_metadata_stream(stream, ht); + } + rcu_read_unlock(); + + lttng_ht_destroy(ht); +} + /* * Close all fds associated with the instance and free the context. */ @@ -1244,6 +1308,9 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) DBG("Consumer destroying it. Closing everything."); + destroy_data_stream_ht(data_ht); + destroy_metadata_stream_ht(metadata_ht); + ret = close(ctx->consumer_error_socket); if (ret) { PERROR("close"); @@ -1790,60 +1857,6 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } -/* - * Iterate over all streams of the hashtable and free them properly. - * - * WARNING: *MUST* be used with data stream only. - */ -static void destroy_data_stream_ht(struct lttng_ht *ht) -{ - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - - if (ht == NULL) { - return; - } - - rcu_read_lock(); - cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { - /* - * Ignore return value since we are currently cleaning up so any error - * can't be handled. - */ - (void) consumer_del_stream(stream, ht); - } - rcu_read_unlock(); - - lttng_ht_destroy(ht); -} - -/* - * Iterate over all streams of the hashtable and free them properly. - * - * XXX: Should not be only for metadata stream or else use an other name. - */ -static void destroy_stream_ht(struct lttng_ht *ht) -{ - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - - if (ht == NULL) { - return; - } - - rcu_read_lock(); - cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { - /* - * Ignore return value since we are currently cleaning up so any error - * can't be handled. - */ - (void) consumer_del_metadata_stream(stream, ht); - } - rcu_read_unlock(); - - lttng_ht_destroy(ht); -} - void lttng_consumer_close_metadata(void) { switch (consumer_data.type) { @@ -2148,12 +2161,6 @@ void *consumer_thread_metadata_poll(void *data) rcu_register_thread(); - metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); - if (!metadata_ht) { - /* ENOMEM at this point. Better to bail out. */ - goto end_ht; - } - DBG("Thread metadata poll started"); /* Size is set to 1 for the consumer_metadata pipe */ @@ -2197,11 +2204,6 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - /* Just don't waste time if no returned events for the fd */ - if (!revents) { - continue; - } - if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) { if (revents & (LPOLLERR | LPOLLHUP )) { DBG("Metadata thread pipe hung up"); @@ -2318,8 +2320,6 @@ end: lttng_poll_clean(&events); end_poll: - destroy_stream_ht(metadata_ht); -end_ht: rcu_unregister_thread(); return NULL; } @@ -2341,12 +2341,6 @@ void *consumer_thread_data_poll(void *data) rcu_register_thread(); - data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); - if (data_ht == NULL) { - /* ENOMEM at this point. Better to bail out. */ - goto end; - } - local_stream = zmalloc(sizeof(struct lttng_consumer_stream *)); if (local_stream == NULL) { PERROR("local_stream malloc"); @@ -2563,8 +2557,6 @@ end: */ (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe); - destroy_data_stream_ht(data_ht); - rcu_unregister_thread(); return NULL; } @@ -3063,12 +3055,42 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream) /* * Allocate and set consumer data hash tables. */ -void lttng_consumer_init(void) +int lttng_consumer_init(void) { consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!consumer_data.channel_ht) { + goto error; + } + consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!consumer_data.relayd_ht) { + goto error; + } + consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!consumer_data.stream_list_ht) { + goto error; + } + consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!consumer_data.stream_per_chan_id_ht) { + goto error; + } + + data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!data_ht) { + goto error; + } + + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!metadata_ht) { + goto error; + } + + return 0; + +error: + return -1; } /* @@ -3083,7 +3105,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id) { int fd = -1, ret = -1, relayd_created = 0; - enum lttng_error_code ret_code = LTTNG_OK; + enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct consumer_relayd_sock_pair *relayd = NULL; assert(ctx); @@ -3119,7 +3141,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, } /* First send a status message before receiving the fds. */ - ret = consumer_send_status_msg(sock, LTTNG_OK); + ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL); @@ -3502,9 +3524,9 @@ int consumer_send_status_channel(int sock, assert(sock >= 0); if (!channel) { - msg.ret_code = -LTTNG_ERR_UST_CHAN_FAIL; + msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL; } else { - msg.ret_code = LTTNG_OK; + msg.ret_code = LTTCOMM_CONSUMERD_SUCCESS; msg.key = channel->key; msg.stream_count = channel->streams.count; }