DBG("Relay deleting session %" PRIu64, cmd->session->id);
- lttcomm_destroy_sock(cmd->session->sock);
-
rcu_read_lock();
cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, node, node) {
node = lttng_ht_iter_get_node_ulong(&iter);
free(cmd->session);
}
+/*
+ * Handle the RELAYD_CREATE_SESSION command.
+ *
+ * On success, send back the session id or else return a negative value.
+ */
+static
+int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd)
+{
+ int ret = 0, send_ret;
+ struct relay_session *session;
+ struct lttcomm_relayd_status_session reply;
+
+ assert(recv_hdr);
+ assert(cmd);
+
+ memset(&reply, 0, sizeof(reply));
+
+ session = zmalloc(sizeof(struct relay_session));
+ if (session == NULL) {
+ PERROR("relay session zmalloc");
+ ret = -1;
+ goto error;
+ }
+
+ session->id = ++last_relay_session_id;
+ session->sock = cmd->sock;
+ cmd->session = session;
+
+ reply.session_id = htobe64(session->id);
+
+ DBG("Created session %" PRIu64, session->id);
+
+error:
+ if (ret < 0) {
+ reply.ret_code = htobe32(LTTNG_ERR_FATAL);
+ } else {
+ reply.ret_code = htobe32(LTTNG_OK);
+ }
+
+ send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (send_ret < 0) {
+ ERR("Relayd sending session id");
+ }
+
+ return ret;
+}
+
/*
* relay_add_stream: allocate a new stream for a session
*/
char *path = NULL, *root_path = NULL;
int ret, send_ret;
- if (!session || session->version_check_done == 0) {
+ if (!session || cmd->version_check_done == 0) {
ERR("Trying to add a stream before version check");
ret = -1;
goto end_no_session;
DBG("Close stream received");
- if (!session || session->version_check_done == 0) {
+ if (!session || cmd->version_check_done == 0) {
ERR("Trying to close a stream before version check");
ret = -1;
goto end_no_session;
{
int ret;
struct lttcomm_relayd_version reply, msg;
- struct relay_session *session;
- if (cmd->session == NULL) {
- session = zmalloc(sizeof(struct relay_session));
- if (session == NULL) {
- PERROR("relay session zmalloc");
- ret = -1;
- goto end;
- }
- session->id = ++last_relay_session_id;
- DBG("Created session %" PRIu64, session->id);
- cmd->session = session;
- } else {
- session = cmd->session;
- }
- session->version_check_done = 1;
+ assert(cmd);
+
+ cmd->version_check_done = 1;
/* Get version from the other side. */
ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
DBG("Data pending command received");
- if (!session || session->version_check_done == 0) {
+ if (!session || cmd->version_check_done == 0) {
ERR("Trying to check for data before version check");
ret = -1;
goto end_no_session;
int ret = 0;
switch (be32toh(recv_hdr->cmd)) {
- /*
case RELAYD_CREATE_SESSION:
ret = relay_create_session(recv_hdr, cmd);
break;
- */
case RELAYD_ADD_STREAM:
ret = relay_add_stream(recv_hdr, cmd, streams_ht);
break;
return ret;
}
+/*
+ * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
+ * set session_id of the relayd if we have a successful reply from the relayd.
+ *
+ * On success, return 0 else a negative value being a lttng_error_code returned
+ * from the relayd.
+ */
+int relayd_create_session(struct lttcomm_sock *sock, uint64_t *session_id)
+{
+ int ret;
+ struct lttcomm_relayd_status_session reply;
+
+ assert(sock);
+ assert(session_id);
+
+ DBG("Relayd create session");
+
+ /* Send command */
+ ret = send_command(sock, RELAYD_CREATE_SESSION, NULL, 0, 0);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Recevie response */
+ ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ if (ret < 0) {
+ goto error;
+ }
+
+ reply.session_id = be64toh(reply.session_id);
+ reply.ret_code = be32toh(reply.ret_code);
+
+ /* Return session id or negative ret code. */
+ if (reply.ret_code != LTTNG_OK) {
+ ret = -reply.ret_code;
+ ERR("Relayd create session replied error %d", ret);
+ goto error;
+ } else {
+ ret = 0;
+ *session_id = reply.session_id;
+ }
+
+ DBG("Relayd session created with id %" PRIu64, reply.session_id);
+
+error:
+ return ret;
+}
+
/*
* Add stream on the relayd and assign stream handle to the stream_id argument.
*
int relayd_connect(struct lttcomm_sock *sock);
int relayd_close(struct lttcomm_sock *sock);
-#if 0
-int relayd_create_session(struct lttcomm_sock *sock, const char *hostname,
- const char *session_name);
-#endif
+int relayd_create_session(struct lttcomm_sock *sock, uint64_t *session_id);
int relayd_add_stream(struct lttcomm_sock *sock, const char *channel_name,
const char *pathname, uint64_t *stream_id);
int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id,