consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
}
+
+/*
+ * Process the ADD_RELAYD command receive by a consumer.
+ *
+ * This will create a relayd socket pair and add it to the relayd hash table.
+ * The caller MUST acquire a RCU read side lock before calling it.
+ */
+int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
+ struct lttng_consumer_local_data *ctx, int sock,
+ struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
+{
+ int fd, ret = -1;
+ struct consumer_relayd_sock_pair *relayd;
+
+ DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
+
+ /* Get relayd reference if exists. */
+ relayd = consumer_find_relayd(net_seq_idx);
+ if (relayd == NULL) {
+ /* Not found. Allocate one. */
+ relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
+ if (relayd == NULL) {
+ lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
+ goto error;
+ }
+ }
+
+ /* Poll on consumer socket. */
+ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ ret = -EINTR;
+ goto error;
+ }
+
+ /* Get relayd socket from session daemon */
+ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
+ if (ret != sizeof(fd)) {
+ lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+ ret = -1;
+ goto error;
+ }
+
+ /* Copy socket information and received FD */
+ switch (sock_type) {
+ case LTTNG_STREAM_CONTROL:
+ /* Copy received lttcomm socket */
+ lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
+ ret = lttcomm_create_sock(&relayd->control_sock);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Close the created socket fd which is useless */
+ close(relayd->control_sock.fd);
+
+ /* Assign new file descriptor */
+ relayd->control_sock.fd = fd;
+ break;
+ case LTTNG_STREAM_DATA:
+ /* Copy received lttcomm socket */
+ lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
+ ret = lttcomm_create_sock(&relayd->data_sock);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Close the created socket fd which is useless */
+ close(relayd->data_sock.fd);
+
+ /* Assign new file descriptor */
+ relayd->data_sock.fd = fd;
+ break;
+ default:
+ ERR("Unknown relayd socket type (%d)", sock_type);
+ goto error;
+ }
+
+ DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
+ sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
+ relayd->net_seq_idx, fd);
+
+ /*
+ * Add relayd socket pair to consumer data hashtable. If object already
+ * exists or on error, the function gracefully returns.
+ */
+ consumer_add_relayd(relayd);
+
+ /* All good! */
+ ret = 0;
+
+error:
+ return ret;
+}
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx);
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
+int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
+ struct lttng_consumer_local_data *ctx, int sock,
+ struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock);
#endif /* LIB_CONSUMER_H */
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
- int fd;
- struct consumer_relayd_sock_pair *relayd;
-
- DBG("Consumer adding relayd socket");
-
- /* Get relayd reference if exists. */
- relayd = consumer_find_relayd(msg.u.relayd_sock.net_index);
- if (relayd == NULL) {
- /* Not found. Allocate one. */
- relayd = consumer_allocate_relayd_sock_pair(
- msg.u.relayd_sock.net_index);
- if (relayd == NULL) {
- lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
- goto end_nosignal;
- }
- }
-
- /* Poll on consumer socket. */
- if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
- rcu_read_unlock();
- return -EINTR;
- }
-
- /* Get relayd socket from session daemon */
- ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
- if (ret != sizeof(fd)) {
- lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
- goto end_nosignal;
- }
-
- /* Copy socket information and received FD */
- switch (msg.u.relayd_sock.type) {
- case LTTNG_STREAM_CONTROL:
- /* Copy received lttcomm socket */
- lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock);
-
- ret = lttcomm_create_sock(&relayd->control_sock);
- if (ret < 0) {
- goto end_nosignal;
- }
-
- /* Close the created socket fd which is useless */
- close(relayd->control_sock.fd);
-
- /* Assign new file descriptor */
- relayd->control_sock.fd = fd;
- break;
- case LTTNG_STREAM_DATA:
- /* Copy received lttcomm socket */
- lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock);
- ret = lttcomm_create_sock(&relayd->data_sock);
- if (ret < 0) {
- goto end_nosignal;
- }
-
- /* Close the created socket fd which is useless */
- close(relayd->data_sock.fd);
-
- /* Assign new file descriptor */
- relayd->data_sock.fd = fd;
- break;
- default:
- ERR("Unknown relayd socket type");
- goto end_nosignal;
- }
-
- DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
- msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data",
- relayd->net_seq_idx, fd);
-
- /*
- * Add relayd socket pair to consumer data hashtable. If object already
- * exists or on error, the function gracefully returns.
- */
- consumer_add_relayd(relayd);
-
+ ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
+ msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
+ &msg.u.relayd_sock.sock);
goto end_nosignal;
}
case LTTNG_CONSUMER_ADD_CHANNEL:
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
- int fd;
- struct consumer_relayd_sock_pair *relayd;
-
- DBG("UST Consumer adding relayd socket");
-
- /* Get relayd reference if exists. */
- relayd = consumer_find_relayd(msg.u.relayd_sock.net_index);
- if (relayd == NULL) {
- /* Not found. Allocate one. */
- relayd = consumer_allocate_relayd_sock_pair(
- msg.u.relayd_sock.net_index);
- if (relayd == NULL) {
- lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
- goto end_nosignal;
- }
- }
-
- /* Poll on consumer socket. */
- if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
- rcu_read_unlock();
- return -EINTR;
- }
-
- /* Get relayd socket from session daemon */
- ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
- if (ret != sizeof(fd)) {
- lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
- goto end_nosignal;
- }
-
- /* Copy socket information and received FD */
- switch (msg.u.relayd_sock.type) {
- case LTTNG_STREAM_CONTROL:
- /* Copy received lttcomm socket */
- lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock);
- ret = lttcomm_create_sock(&relayd->control_sock);
- if (ret < 0) {
- goto end_nosignal;
- }
-
- /* Close the created socket fd which is useless */
- close(relayd->control_sock.fd);
-
- /* Assign new file descriptor */
- relayd->control_sock.fd = fd;
- break;
- case LTTNG_STREAM_DATA:
- /* Copy received lttcomm socket */
- lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock);
- ret = lttcomm_create_sock(&relayd->data_sock);
- if (ret < 0) {
- goto end_nosignal;
- }
-
- /* Close the created socket fd which is useless */
- close(relayd->data_sock.fd);
-
- /* Assign new file descriptor */
- relayd->data_sock.fd = fd;
- break;
- default:
- ERR("Unknown relayd socket type");
- goto end_nosignal;
- }
-
- DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
- msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data",
- relayd->net_seq_idx, fd);
-
- /*
- * Add relayd socket pair to consumer data hashtable. If object already
- * exists or on error, the function gracefully returns.
- */
- consumer_add_relayd(relayd);
-
+ ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
+ msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
+ &msg.u.relayd_sock.sock);
goto end_nosignal;
}
case LTTNG_CONSUMER_ADD_CHANNEL:
ret = write(ctx->consumer_poll_pipe[1], "", 1);
} while (ret < 0 && errno == EINTR);
end_nosignal:
+ /* XXX: At some point we might want to return something else than zero */
rcu_read_unlock();
return 0;
}