X-Git-Url: http://git.lttng.org./?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=06b59c58442e4a833b3b0947aa3d34244b28a305;hb=31fa4745f181bd1bdbceb89fbe27e130f5b4e2b9;hp=a6a4f1a91735911633a8d85d098e63cd01425417;hpb=d88aee689d5bd0067f362a323cb69c37717df59f;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index a6a4f1a91..06b59c584 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -88,14 +88,14 @@ static int add_channel(struct lttng_consumer_channel *channel, if (ctx->on_recv_channel != NULL) { ret = ctx->on_recv_channel(channel); if (ret == 0) { - ret = consumer_add_channel(channel); + ret = consumer_add_channel(channel, ctx); } else if (ret < 0) { /* Most likely an ENOMEM. */ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto error; } } else { - ret = consumer_add_channel(channel); + ret = consumer_add_channel(channel, ctx); } DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key); @@ -256,7 +256,7 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, while ((ustream = ustctl_create_stream(channel->uchan, cpu))) { int wait_fd; - wait_fd = ustctl_get_wait_fd(ustream); + wait_fd = ustctl_stream_get_wait_fd(ustream); /* Allocate consumer stream object. */ stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret); @@ -368,11 +368,6 @@ static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream) goto error; } - ret = ustctl_stream_close_wakeup_fd(stream->ustream); - if (ret < 0) { - goto error; - } - error: return ret; } @@ -401,6 +396,11 @@ static int send_sessiond_channel(int sock, goto error; } + ret = ustctl_channel_close_wakeup_fd(channel->uchan); + if (ret < 0) { + goto error; + } + /* The channel was sent successfully to the sessiond at this point. */ cds_list_for_each_entry(stream, &channel->streams.head, send_node) { /* Try to send the stream to the relayd if one is available. */ @@ -476,6 +476,12 @@ static int ask_channel(struct lttng_consumer_local_data *ctx, int sock, goto error; } + channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan); + + if (ret < 0) { + goto error; + } + /* Open all streams for this channel. */ ret = create_ust_streams(channel, ctx); if (ret < 0) { @@ -551,6 +557,43 @@ error: return ret; } +/* + * Flush channel's streams using the given key to retrieve the channel. + * + * Return 0 on success else an LTTng error code. + */ +static int flush_channel(uint64_t chan_key) +{ + int ret = 0; + struct lttng_consumer_channel *channel; + struct lttng_consumer_stream *stream; + struct lttng_ht *ht; + struct lttng_ht_iter iter; + + DBG("UST consumer flush channel key %lu", chan_key); + + channel = consumer_find_channel(chan_key); + if (!channel) { + ERR("UST consumer flush channel %lu not found", chan_key); + ret = LTTNG_ERR_UST_CHAN_NOT_FOUND; + goto error; + } + + ht = consumer_data.stream_per_chan_id_ht; + + /* For each stream of the channel id, flush it. */ + rcu_read_lock(); + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, + &channel->key, &iter.iter, stream, node_channel_id.node) { + ustctl_flush_buffer(stream->ustream, 1); + } + rcu_read_unlock(); + +error: + return ret; +} + /* * Close metadata stream wakeup_fd using the given key to retrieve the channel. * @@ -761,6 +804,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, attr.overwrite = msg.u.ask_channel.overwrite; attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval; attr.read_timer_interval = msg.u.ask_channel.read_timer_interval; + attr.chan_id = msg.u.ask_channel.chan_id; memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid)); /* Translate the event output type to UST. */ @@ -898,6 +942,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_msg_sessiond; } + case LTTNG_CONSUMER_FLUSH_CHANNEL: + { + int ret; + + ret = flush_channel(msg.u.flush_channel.key); + if (ret != 0) { + ret_code = ret; + } + + goto end_msg_sessiond; + } case LTTNG_CONSUMER_PUSH_METADATA: { int ret; @@ -913,6 +968,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (!channel) { ERR("UST consumer push metadata %lu not found", key); ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND; + goto end_msg_sessiond; } metadata_str = zmalloc(len * sizeof(char)); @@ -1277,3 +1333,13 @@ void lttng_ustconsumer_close_metadata(struct lttng_ht *metadata_ht) } rcu_read_unlock(); } + +void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) +{ + int ret; + + ret = ustctl_stream_close_wakeup_fd(stream->ustream); + if (ret < 0) { + ERR("Unable to close wakeup fd"); + } +}