* Returns 0 on success or a negative value on error.
*/
static
-int init_viewer_stream(struct relay_stream *stream,
- struct lttng_ht *viewer_streams_ht)
+int init_viewer_stream(struct relay_stream *stream)
{
int ret;
struct relay_viewer_stream *viewer_stream;
assert(stream);
- assert(viewer_streams_ht);
viewer_stream = zmalloc(sizeof(*viewer_stream));
if (!viewer_stream) {
*/
static
int viewer_attach_session(struct relay_command *cmd,
- struct lttng_ht *sessions_ht,
- struct lttng_ht *viewer_streams_ht)
+ struct lttng_ht *sessions_ht)
{
int ret, send_streams = 0, nb_streams = 0;
struct lttng_viewer_attach_session_request request;
assert(cmd);
assert(sessions_ht);
- assert(viewer_streams_ht);
DBG("Attach session received");
continue;
}
- vstream = live_find_viewer_stream_by_id(stream->stream_handle,
- viewer_streams_ht);
+ vstream = live_find_viewer_stream_by_id(stream->stream_handle);
if (!vstream) {
- ret = init_viewer_stream(stream, viewer_streams_ht);
+ ret = init_viewer_stream(stream);
if (ret < 0) {
goto end_unlock;
}
*
* RCU read side lock MUST be acquired.
*/
-struct relay_viewer_stream *live_find_viewer_stream_by_id(uint64_t stream_id,
- struct lttng_ht *viewer_streams_ht)
+struct relay_viewer_stream *live_find_viewer_stream_by_id(uint64_t stream_id)
{
struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
struct relay_viewer_stream *stream = NULL;
- assert(viewer_streams_ht);
-
lttng_ht_lookup(viewer_streams_ht, &stream_id, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
if (node == NULL) {
*/
static
int viewer_get_next_index(struct relay_command *cmd,
- struct lttng_ht *viewer_streams_ht, struct lttng_ht *sessions_ht)
+ struct lttng_ht *sessions_ht)
{
int ret;
struct lttng_viewer_get_next_index request_index;
struct relay_stream *rstream;
assert(cmd);
- assert(viewer_streams_ht);
assert(sessions_ht);
DBG("Viewer get next index");
}
rcu_read_lock();
- vstream = live_find_viewer_stream_by_id(be64toh(request_index.stream_id),
- viewer_streams_ht);
+ vstream = live_find_viewer_stream_by_id(be64toh(request_index.stream_id));
if (!vstream) {
ret = -1;
goto end_unlock;
* Return 0 on success or else a negative value.
*/
static
-int viewer_get_packet(struct relay_command *cmd,
- struct lttng_ht *viewer_streams_ht)
+int viewer_get_packet(struct relay_command *cmd)
{
int ret, send_data = 0;
char *data = NULL;
struct relay_viewer_stream *stream;
assert(cmd);
- assert(viewer_streams_ht);
DBG2("Relay get data packet");
}
rcu_read_lock();
- stream = live_find_viewer_stream_by_id(be64toh(get_packet_info.stream_id),
- viewer_streams_ht);
+ stream = live_find_viewer_stream_by_id(be64toh(get_packet_info.stream_id));
if (!stream) {
goto error;
}
* Return 0 on success else a negative value.
*/
static
-int viewer_get_metadata(struct relay_command *cmd,
- struct lttng_ht *viewer_streams_ht)
+int viewer_get_metadata(struct relay_command *cmd)
{
int ret = 0;
ssize_t read_len;
struct relay_viewer_stream *stream;
assert(cmd);
- assert(viewer_streams_ht);
DBG("Relay get metadata");
}
rcu_read_lock();
- stream = live_find_viewer_stream_by_id(be64toh(request.stream_id),
- viewer_streams_ht);
+ stream = live_find_viewer_stream_by_id(be64toh(request.stream_id));
if (!stream || !stream->metadata_flag) {
ERR("Invalid metadata stream");
goto error;
*/
static
int process_control(struct lttng_viewer_cmd *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *sessions_ht,
- struct lttng_ht *viewer_streams_ht)
+ struct relay_command *cmd, struct lttng_ht *sessions_ht)
{
int ret = 0;
ret = viewer_list_sessions(cmd, sessions_ht);
break;
case VIEWER_ATTACH_SESSION:
- ret = viewer_attach_session(cmd, sessions_ht,
- viewer_streams_ht);
+ ret = viewer_attach_session(cmd, sessions_ht);
break;
case VIEWER_GET_NEXT_INDEX:
- ret = viewer_get_next_index(cmd, viewer_streams_ht, sessions_ht);
+ ret = viewer_get_next_index(cmd, sessions_ht);
break;
case VIEWER_GET_PACKET:
- ret = viewer_get_packet(cmd, viewer_streams_ht);
+ ret = viewer_get_packet(cmd);
break;
case VIEWER_GET_METADATA:
- ret = viewer_get_metadata(cmd, viewer_streams_ht);
+ ret = viewer_get_metadata(cmd);
break;
default:
ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd));
}
static
-void viewer_del_streams(struct lttng_ht *viewer_streams_ht,
- struct relay_session *session)
+void viewer_del_streams(struct relay_session *session)
{
int ret;
struct relay_viewer_stream *stream;
struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
- assert(viewer_streams_ht);
assert(session);
rcu_read_lock();
*/
static
void del_connection(struct lttng_ht *relay_connections_ht,
- struct lttng_ht_iter *iter, struct relay_command *relay_connection,
- struct lttng_ht *viewer_streams_ht)
+ struct lttng_ht_iter *iter, struct relay_command *relay_connection)
{
int ret;
assert(relay_connections_ht);
assert(iter);
assert(relay_connection);
- assert(viewer_streams_ht);
ret = lttng_ht_del(relay_connections_ht, iter);
assert(!ret);
if (relay_connection->session) {
- viewer_del_streams(viewer_streams_ht, relay_connection->session);
+ viewer_del_streams(relay_connection->session);
}
call_rcu(&relay_connection->rcu_node, deferred_free_connection);
struct lttng_viewer_cmd recv_hdr;
struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
- struct lttng_ht *viewer_streams_ht = relay_ctx->viewer_streams_ht;
DBG("[thread] Live viewer relay worker started");
ERR("VIEWER POLL ERROR");
cleanup_poll_connection(&events, pollfd);
del_connection(relay_connections_ht, &iter,
- relay_connection, viewer_streams_ht);
+ relay_connection);
} else if (revents & (LPOLLHUP | LPOLLRDHUP)) {
DBG("Viewer socket %d hung up", pollfd);
cleanup_poll_connection(&events, pollfd);
del_connection(relay_connections_ht, &iter,
- relay_connection, viewer_streams_ht);
+ relay_connection);
} else if (revents & LPOLLIN) {
ret = relay_connection->sock->ops->recvmsg(
relay_connection->sock, &recv_hdr,
if (ret <= 0) {
cleanup_poll_connection(&events, pollfd);
del_connection( relay_connections_ht, &iter,
- relay_connection, viewer_streams_ht);
+ relay_connection);
DBG("Viewer control connection closed with %d",
pollfd);
} else {
relay_connection->session->id);
}
ret = process_control(&recv_hdr, relay_connection,
- sessions_ht, viewer_streams_ht);
+ sessions_ht);
if (ret < 0) {
/* Clear the session on error. */
cleanup_poll_connection(&events, pollfd);
del_connection(relay_connections_ht, &iter,
- relay_connection, viewer_streams_ht);
+ relay_connection);
DBG("Viewer connection closed with %d", pollfd);
}
}
relay_connection = caa_container_of(node, struct relay_command,
sock_n);
- del_connection(relay_connections_ht, &iter, relay_connection,
- viewer_streams_ht);
+ del_connection(relay_connections_ht, &iter, relay_connection);
}
rcu_read_unlock();
error_poll_create:
/* Global relay stream hash table. */
struct lttng_ht *relay_streams_ht;
+/* Global relay viewer stream hash table. */
+struct lttng_ht *viewer_streams_ht;
+
/*
* usage function on stderr
*/
}
static void close_stream(struct relay_stream *stream,
- struct lttng_ht *viewer_streams_ht, struct lttng_ht *ctf_traces_ht)
+ struct lttng_ht *ctf_traces_ht)
{
int delret;
struct relay_viewer_stream *vstream;
struct lttng_ht_iter iter;
assert(stream);
- assert(viewer_streams_ht);
delret = close(stream->fd);
if (delret < 0) {
}
}
- vstream = live_find_viewer_stream_by_id(stream->stream_handle,
- viewer_streams_ht);
+ vstream = live_find_viewer_stream_by_id(stream->stream_handle);
if (vstream) {
/*
* Set the last good value into the viewer stream. This is done
*/
static
int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *viewer_streams_ht)
+ struct relay_command *cmd)
{
int ret, send_ret;
struct relay_session *session = cmd->session;
stream->close_flag = 1;
if (close_stream_check(stream)) {
- close_stream(stream, viewer_streams_ht, cmd->ctf_traces_ht);
+ close_stream(stream, cmd->ctf_traces_ht);
}
end_unlock:
ret = relay_send_version(recv_hdr, cmd, ctx->sessions_ht);
break;
case RELAYD_CLOSE_STREAM:
- ret = relay_close_stream(recv_hdr, cmd, ctx->viewer_streams_ht);
+ ret = relay_close_stream(recv_hdr, cmd);
break;
case RELAYD_DATA_PENDING:
ret = relay_data_pending(recv_hdr, cmd);
*/
static
int relay_process_data(struct relay_command *cmd,
- struct lttng_ht *indexes_ht, struct lttng_ht *viewer_streams_ht)
+ struct lttng_ht *indexes_ht)
{
int ret = 0, rotate_index = 0, index_created = 0;
struct relay_stream *stream;
/* Check if we need to close the FD */
if (close_stream_check(stream)) {
- close_stream(stream, viewer_streams_ht, cmd->ctf_traces_ht);
+ close_stream(stream, cmd->ctf_traces_ht);
}
end_rcu_unlock:
continue;
}
- ret = relay_process_data(relay_connection, indexes_ht,
- relay_ctx->viewer_streams_ht);
+ ret = relay_process_data(relay_connection, indexes_ht);
/* connection closed */
if (ret < 0) {
relay_cleanup_poll_connection(&events, pollfd);
}
/* tables of streams indexed by stream ID */
- relay_ctx->viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
- if (!relay_ctx->viewer_streams_ht) {
+ viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!viewer_streams_ht) {
goto exit_relay_ctx_viewer_streams;
}
PERROR("pthread_join");
goto error; /* join error, exit without cleanup */
}
- lttng_ht_destroy(relay_ctx->viewer_streams_ht);
+ lttng_ht_destroy(viewer_streams_ht);
exit_relay_ctx_viewer_streams:
lttng_ht_destroy(relay_streams_ht);