/*
* Used to keep a unique index for each relayd socket created where this value
* is associated with streams on the consumer so it can match the right relayd
- * to send to.
- *
- * This value should be incremented atomically for safety purposes and future
- * possible concurrent access.
+ * to send to. It must be accessed with the relayd_net_seq_idx_lock
+ * held.
*/
-static unsigned int relayd_net_seq_idx;
+static pthread_mutex_t relayd_net_seq_idx_lock = PTHREAD_MUTEX_INITIALIZER;
+static uint64_t relayd_net_seq_idx;
/*
* Create a session path used by list_lttng_sessions for the case that the
}
/* Set the network sequence index if not set. */
- if (consumer->net_seq_index == -1) {
+ if (consumer->net_seq_index == (uint64_t) -1ULL) {
+ pthread_mutex_lock(&relayd_net_seq_idx_lock);
/*
* Increment net_seq_idx because we are about to transfer the
* new relayd socket to the consumer.
+ * Assign unique key so the consumer can match streams.
*/
- uatomic_inc(&relayd_net_seq_idx);
- /* Assign unique key so the consumer can match streams */
- uatomic_set(&consumer->net_seq_index,
- uatomic_read(&relayd_net_seq_idx));
+ consumer->net_seq_index = ++relayd_net_seq_idx;
+ pthread_mutex_unlock(&relayd_net_seq_idx_lock);
}
/* Send relayd socket to consumer. */
void cmd_init(void)
{
/*
- * Set network sequence index to 1 for streams to match a relayd socket on
- * the consumer side.
+ * Set network sequence index to 1 for streams to match a relayd
+ * socket on the consumer side.
*/
- uatomic_set(&relayd_net_seq_idx, 1);
+ pthread_mutex_lock(&relayd_net_seq_idx_lock);
+ relayd_net_seq_idx = 1;
+ pthread_mutex_unlock(&relayd_net_seq_idx_lock);
DBG("Command subsystem initialized");
}
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
+#include <inttypes.h>
#include <common/common.h>
#include <common/defaults.h>
* negative value is sent back and both parameters are untouched.
*/
int consumer_recv_status_channel(struct consumer_socket *sock,
- unsigned long *key, unsigned int *stream_count)
+ uint64_t *key, unsigned int *stream_count)
{
int ret;
struct lttcomm_consumer_status_channel reply;
/* By default, consumer output is enabled */
output->enabled = 1;
output->type = type;
- output->net_seq_index = -1;
+ output->net_seq_index = (uint64_t) -1ULL;
output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
const char *name,
uid_t uid,
gid_t gid,
- int relayd_id,
- unsigned long key,
+ uint64_t relayd_id,
+ uint64_t key,
unsigned char *uuid)
{
assert(msg);
*/
void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
enum lttng_consumer_command cmd,
- int channel_key,
+ uint64_t channel_key,
uint64_t session_id,
const char *pathname,
uid_t uid,
gid_t gid,
- int relayd_id,
+ uint64_t relayd_id,
const char *name,
unsigned int nb_init_streams,
enum lttng_event_output output,
*/
void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
enum lttng_consumer_command cmd,
- int channel_key,
- int stream_key,
+ uint64_t channel_key,
+ uint64_t stream_key,
int cpu)
{
assert(msg);
*/
int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
struct lttcomm_sock *sock, struct consumer_output *consumer,
- enum lttng_stream_type type, unsigned int session_id)
+ enum lttng_stream_type type, uint64_t session_id)
{
int ret;
struct lttcomm_consumer_msg msg;
* This function has a different behavior with the consumer i.e. that it waits
* for a reply from the consumer if yes or no the data is pending.
*/
-int consumer_is_data_pending(unsigned int id,
+int consumer_is_data_pending(uint64_t session_id,
struct consumer_output *consumer)
{
int ret;
msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
- msg.u.data_pending.session_id = (uint64_t) id;
+ msg.u.data_pending.session_id = session_id;
- DBG3("Consumer data pending for id %u", id);
+ DBG3("Consumer data pending for id %" PRIu64, session_id);
/* Send command for each consumer */
rcu_read_lock();
}
rcu_read_unlock();
- DBG("Consumer data is %s pending for session id %u",
- ret_code == 1 ? "" : "NOT", id);
+ DBG("Consumer data is %s pending for session id %" PRIu64,
+ ret_code == 1 ? "" : "NOT", session_id);
return ret_code;
error_unlock:
* side. It tells the consumer which streams goes to which relayd with this
* index. The relayd sockets are index with it on the consumer side.
*/
- int net_seq_index;
+ uint64_t net_seq_index;
/*
* Subdirectory path name used for both local and network consumer.
struct lttcomm_consumer_msg *msg);
int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
struct lttcomm_sock *sock, struct consumer_output *consumer,
- enum lttng_stream_type type, unsigned int session_id);
+ enum lttng_stream_type type, uint64_t session_id);
int consumer_send_destroy_relayd(struct consumer_socket *sock,
struct consumer_output *consumer);
int consumer_recv_status_reply(struct consumer_socket *sock);
int consumer_recv_status_channel(struct consumer_socket *sock,
- unsigned long *key, unsigned int *stream_count);
+ uint64_t *key, unsigned int *stream_count);
void consumer_output_send_destroy_relayd(struct consumer_output *consumer);
int consumer_create_socket(struct consumer_data *data,
struct consumer_output *output);
const char *name,
uid_t uid,
gid_t gid,
- int relayd_id,
- unsigned long key,
+ uint64_t relayd_id,
+ uint64_t key,
unsigned char *uuid);
void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
enum lttng_consumer_command cmd,
- int channel_key,
- int stream_key,
+ uint64_t channel_key,
+ uint64_t stream_key,
int cpu);
void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
enum lttng_consumer_command cmd,
- int channel_key,
+ uint64_t channel_key,
uint64_t session_id,
const char *pathname,
uid_t uid,
gid_t gid,
- int relayd_id,
+ uint64_t relayd_id,
const char *name,
unsigned int nb_init_streams,
enum lttng_event_output output,
int type);
-int consumer_is_data_pending(unsigned int id,
+int consumer_is_data_pending(uint64_t session_id,
struct consumer_output *consumer);
#endif /* _CONSUMER_H */
}
/*
- * Send the application sockets (cmd and notify) to the respective threads.
- * This is called from the dispatch UST registration thread once all sockets
- * are set for the application.
+ * Send a socket to a thread This is called from the dispatch UST registration
+ * thread once all sockets are set for the application.
*
* On success, return 0 else a negative value being the errno message of the
* write().
*/
-static int send_app_sockets_to_threads(struct ust_app *app)
+static int send_socket_to_thread(int fd, int sock)
{
int ret;
- assert(app);
/* Sockets MUST be set or else this should not have been called. */
- assert(app->sock >= 0);
- assert(app->notify_sock >= 0);
- assert(apps_cmd_pipe[1] >= 0);
- assert(apps_cmd_notify_pipe[1] >= 0);
+ assert(fd >= 0);
+ assert(sock >= 0);
do {
- ret = write(apps_cmd_pipe[1], &app->sock, sizeof(app->sock));
+ ret = write(fd, &sock, sizeof(sock));
} while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret != sizeof(app->sock)) {
- PERROR("write apps cmd pipe %d", apps_cmd_pipe[1]);
- if (ret < 0) {
- ret = -errno;
- }
- goto error;
- }
-
- do {
- ret = write(apps_cmd_notify_pipe[1], &app->notify_sock,
- sizeof(app->notify_sock));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret != sizeof(app->notify_sock)) {
- PERROR("write apps notify cmd pipe %d", apps_cmd_notify_pipe[1]);
+ if (ret < 0 || ret != sizeof(sock)) {
+ PERROR("write apps pipe %d", fd);
if (ret < 0) {
ret = -errno;
}
struct {
struct ust_app *app;
struct cds_list_head head;
- } *wait_node = NULL;
+ } *wait_node = NULL, *tmp_wait_node;
CDS_LIST_HEAD(wait_queue);
if (ret < 0) {
PERROR("close ust sock dispatch %d", ust_cmd->sock);
}
+ lttng_fd_put(1, LTTNG_FD_APPS);
+ free(wait_node);
continue;
}
/*
* Look for the application in the local wait queue and set the
* notify socket if found.
*/
- cds_list_for_each_entry(wait_node, &wait_queue, head) {
+ cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
+ &wait_queue, head) {
if (wait_node->app->pid == ust_cmd->reg_msg.pid) {
wait_node->app->notify_sock = ust_cmd->sock;
cds_list_del(&wait_node->head);
}
if (app) {
- ret = send_app_sockets_to_threads(app);
- if (ret < 0) {
- goto error;
- }
/*
* @session_lock_list
*
*/
session_lock_list();
rcu_read_lock();
+
/*
* Add application to the global hash table. This needs to be
* done before the update to the UST registry can locate the
* application.
*/
ust_app_add(app);
- /*
- * Get app version.
- */
- ret = ust_app_version(app);
- if (ret) {
- ERR("Unable to get app version");
+
+ /* Set app version. This call will print an error if needed. */
+ (void) ust_app_version(app);
+
+ /* Send notify socket through the notify pipe. */
+ ret = send_socket_to_thread(apps_cmd_notify_pipe[1],
+ app->notify_sock);
+ if (ret < 0) {
+ rcu_read_unlock();
+ session_unlock_list();
+ /* No notify thread, stop the UST tracing. */
+ goto error;
}
+
/*
* Update newly registered application with the tracing
* registry info already enabled information.
*/
update_ust_app(app->sock);
- ret = ust_app_register_done(app->sock);
+
+ /*
+ * Don't care about return value. Let the manage apps threads
+ * handle app unregistration upon socket close.
+ */
+ (void) ust_app_register_done(app->sock);
+
+ /*
+ * Even if the application socket has been closed, send the app
+ * to the thread and unregistration will take place at that
+ * place.
+ */
+ ret = send_socket_to_thread(apps_cmd_pipe[1], app->sock);
if (ret < 0) {
- /* Remove application from the registry. */
- ust_app_unregister(app->sock);
+ rcu_read_unlock();
+ session_unlock_list();
+ /* No apps. thread, stop the UST tracing. */
+ goto error;
}
+
rcu_read_unlock();
session_unlock_list();
} else {
if (ret < 0) {
PERROR("close ust_cmd sock");
}
+ lttng_fd_put(1, LTTNG_FD_APPS);
}
free(ust_cmd);
} while (node != NULL);
}
error:
+ /* Clean up wait queue. */
+ cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
+ &wait_queue, head) {
+ cds_list_del(&wait_node->head);
+ free(wait_node);
+ }
+
DBG("Dispatch thread dying");
return NULL;
}
sock = -1;
continue;
}
+
health_code_update();
ret = ust_app_recv_registration(sock, &ust_cmd->reg_msg);
if (ret < 0) {
}
socket = consumer_find_socket(consumer_fd, consumer);
+ if (!socket) {
+ ERR("Consumer socket fd %d not found in consumer obj %p",
+ consumer_fd, consumer);
+ }
end:
return socket;
assert(node_ptr == &event->node.node);
}
+/*
+ * Close the notify socket from the given RCU head object. This MUST be called
+ * through a call_rcu().
+ */
+static void close_notify_sock_rcu(struct rcu_head *head)
+{
+ int ret;
+ struct ust_app_notify_sock_obj *obj =
+ caa_container_of(head, struct ust_app_notify_sock_obj, head);
+
+ /* Must have a valid fd here. */
+ assert(obj->fd >= 0);
+
+ ret = close(obj->fd);
+ if (ret) {
+ ERR("close notify sock %d RCU", obj->fd);
+ }
+ lttng_fd_put(LTTNG_FD_APPS, 1);
+
+ free(obj);
+}
+
/*
* Delete ust context safely. RCU read lock must be held before calling
* this function.
free(ua_chan);
}
+/*
+ * For a given application and session, push metadata to consumer. The session
+ * lock MUST be acquired here before calling this.
+ *
+ * Return 0 on success else a negative error.
+ */
+static int push_metadata(struct ust_app *app, struct ust_app_session *ua_sess)
+{
+ int ret;
+ char *metadata_str = NULL;
+ size_t len, offset;
+ struct consumer_socket *socket;
+
+ assert(app);
+ assert(ua_sess);
+
+ if (!ua_sess->consumer || !ua_sess->metadata) {
+ /* No consumer means no stream associated so just return gracefully. */
+ ret = 0;
+ goto end;
+ }
+
+ rcu_read_lock();
+
+ /* Get consumer socket to use to push the metadata.*/
+ socket = find_consumer_socket_by_bitness(app->bits_per_long,
+ ua_sess->consumer);
+ if (!socket) {
+ ret = -1;
+ goto error_rcu_unlock;
+ }
+
+ /*
+ * TODO: Currently, we hold the socket lock around sampling of the next
+ * metadata segment to ensure we send metadata over the consumer socket in
+ * the correct order. This makes the registry lock nest inside the socket
+ * lock.
+ *
+ * Please note that this is a temporary measure: we should move this lock
+ * back into ust_consumer_push_metadata() when the consumer gets the
+ * ability to reorder the metadata it receives.
+ */
+ pthread_mutex_lock(socket->lock);
+ pthread_mutex_lock(&ua_sess->registry.lock);
+
+ offset = ua_sess->registry.metadata_len_sent;
+ len = ua_sess->registry.metadata_len - ua_sess->registry.metadata_len_sent;
+ if (len == 0) {
+ DBG3("No metadata to push for session id %d", ua_sess->id);
+ ret = 0;
+ goto error_reg_unlock;
+ }
+ assert(len > 0);
+
+ /* Allocate only what we have to send. */
+ metadata_str = zmalloc(len);
+ if (!metadata_str) {
+ PERROR("zmalloc ust app metadata string");
+ ret = -ENOMEM;
+ goto error_reg_unlock;
+ }
+ /* Copy what we haven't send out. */
+ memcpy(metadata_str, ua_sess->registry.metadata + offset, len);
+
+ pthread_mutex_unlock(&ua_sess->registry.lock);
+
+ ret = ust_consumer_push_metadata(socket, ua_sess, metadata_str, len,
+ offset);
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto error_rcu_unlock;
+ }
+
+ /* Update len sent of the registry. */
+ pthread_mutex_lock(&ua_sess->registry.lock);
+ ua_sess->registry.metadata_len_sent += len;
+ pthread_mutex_unlock(&ua_sess->registry.lock);
+ pthread_mutex_unlock(socket->lock);
+
+ rcu_read_unlock();
+ free(metadata_str);
+ return 0;
+
+error_reg_unlock:
+ pthread_mutex_unlock(&ua_sess->registry.lock);
+ pthread_mutex_unlock(socket->lock);
+error_rcu_unlock:
+ rcu_read_unlock();
+ free(metadata_str);
+end:
+ return ret;
+}
+
+/*
+ * Send to the consumer a close metadata command for the given session. Once
+ * done, the metadata channel is deleted and the session metadata pointer is
+ * nullified. The session lock MUST be acquired here unless the application is
+ * in the destroy path.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int close_metadata(struct ust_app *app, struct ust_app_session *ua_sess)
+{
+ int ret;
+ struct consumer_socket *socket;
+
+ assert(app);
+ assert(ua_sess);
+
+ /* Ignore if no metadata. Valid since it can be called on unregister. */
+ if (!ua_sess->metadata) {
+ ret = 0;
+ goto error;
+ }
+
+ rcu_read_lock();
+
+ /* Get consumer socket to use to push the metadata.*/
+ socket = find_consumer_socket_by_bitness(app->bits_per_long,
+ ua_sess->consumer);
+ if (!socket) {
+ ret = -1;
+ goto error_rcu_unlock;
+ }
+
+ ret = ust_consumer_close_metadata(socket, ua_sess->metadata);
+ if (ret < 0) {
+ goto error_rcu_unlock;
+ }
+
+error_rcu_unlock:
+ /* Destroy metadata on our side since we must not use it anymore. */
+ delete_ust_app_channel(-1, ua_sess->metadata, app);
+ ua_sess->metadata = NULL;
+
+ rcu_read_unlock();
+error:
+ return ret;
+}
+
/*
* Delete ust app session safely. RCU read lock must be held before calling
* this function.
struct lttng_ht_iter iter;
struct ust_app_channel *ua_chan;
+ assert(ua_sess);
+
if (ua_sess->metadata) {
- delete_ust_app_channel(sock, ua_sess->metadata, app);
+ /* Push metadata for application before freeing the application. */
+ (void) push_metadata(app, ua_sess);
+
+ /* And ask to close it for this session. */
+ (void) close_metadata(app, ua_sess);
}
cds_lfht_for_each_entry(ua_sess->channels->ht, &iter.iter, ua_chan,
}
/*
- * Create the specified channel onto the UST tracer for a UST session.
- * Called with UST app session lock held.
+ * Create the specified channel onto the UST tracer for a UST session. This
+ * MUST be called with UST app session lock held.
*
* Return 0 on success. On error, a negative value is returned.
*/
assert(ua_chan);
assert(consumer);
+ rcu_read_lock();
health_code_update();
/* Get the right consumer socket for the application. */
/*
* Compute the number of fd needed before receiving them. It must be 2 per
- * stream.
+ * stream (2 being the default value here).
*/
nb_fd = DEFAULT_UST_STREAM_FD_NUM * ua_chan->expected_stream_count;
goto error;
}
+ health_code_update();
+
/* Send all streams to application. */
cds_list_for_each_entry_safe(stream, stmp, &ua_chan->streams.head, list) {
ret = ust_consumer_send_stream_to_ust(app, ua_chan, stream);
}
}
+ rcu_read_unlock();
return 0;
error_destroy:
(void) ust_consumer_destroy_channel(socket, ua_chan);
error:
health_code_update();
+ rcu_read_unlock();
return ret;
}
DBG2("UST app session created successfully with handle %d", ret);
}
+ /*
+ * Assign consumer if not already set. For one application, there is only
+ * one possible consumer has of now.
+ */
+ if (!ua_sess->consumer) {
+ ua_sess->consumer = usess->consumer;
+ }
+
*ua_sess_ptr = ua_sess;
if (is_created) {
*is_created = created;
}
+
/* Everything went well. */
ret = 0;
{
int ret = 0;
struct ust_app_channel *metadata;
+ struct consumer_socket *socket;
assert(ua_sess);
assert(app);
+ assert(consumer);
if (ua_sess->metadata) {
/* Already exist. Return success. */
metadata->attr.output = LTTNG_UST_MMAP;
metadata->attr.type = LTTNG_UST_CHAN_METADATA;
- ret = create_ust_channel(app, ua_sess, metadata, consumer);
+ /* Get the right consumer socket for the application. */
+ socket = find_consumer_socket_by_bitness(app->bits_per_long, consumer);
+ if (!socket) {
+ ret = -EINVAL;
+ goto error_consumer;
+ }
+
+ /*
+ * Ask the metadata channel creation to the consumer. The metadata object
+ * will be created by the consumer and kept their. However, the stream is
+ * never added or monitored until we do a first push metadata to the
+ * consumer.
+ */
+ ret = ust_consumer_ask_channel(ua_sess, metadata, consumer, socket);
+ if (ret < 0) {
+ goto error_consumer;
+ }
+
+ /*
+ * The setup command will make the metadata stream be sent to the relayd,
+ * if applicable, and the thread managing the metadatas. This is important
+ * because after this point, if an error occurs, the only way the stream
+ * can be deleted is to be monitored in the consumer.
+ */
+ ret = ust_consumer_setup_metadata(socket, metadata);
if (ret < 0) {
- goto error_create;
+ goto error_consumer;
}
ua_sess->metadata = metadata;
- DBG2("UST metadata opened for app pid %d", app->pid);
+ DBG2("UST metadata created for app pid %d", app->pid);
end:
return 0;
-error_create:
- delete_ust_app_channel(metadata->is_sent ? app->sock : -1, metadata, app);
+error_consumer:
+ delete_ust_app_channel(-1, metadata, app);
error:
return ret;
}
}
/*
- * Return ust app pointer or NULL if not found.
+ * Return ust app pointer or NULL if not found. RCU read side lock MUST be
+ * acquired before calling this function.
*/
struct ust_app *ust_app_find_by_pid(pid_t pid)
{
+ struct ust_app *app = NULL;
struct lttng_ht_node_ulong *node;
struct lttng_ht_iter iter;
DBG2("Found UST app by pid %d", pid);
- return caa_container_of(node, struct ust_app, pid_n);
+ app = caa_container_of(node, struct ust_app, pid_n);
error:
- rcu_read_unlock();
- return NULL;
+ return app;
}
+/*
+ * Allocate and init an UST app object using the registration information and
+ * the command socket. This is called when the command socket connects to the
+ * session daemon.
+ *
+ * The object is returned on success or else NULL.
+ */
struct ust_app *ust_app_create(struct ust_register_msg *msg, int sock)
{
struct ust_app *lta = NULL;
lta->ppid = msg->ppid;
lta->uid = msg->uid;
lta->gid = msg->gid;
- lta->compatible = 0; /* Not compatible until proven */
lta->bits_per_long = msg->bits_per_long;
lta->uint8_t_alignment = msg->uint8_t_alignment;
lta->v_major = msg->major;
lta->v_minor = msg->minor;
- strncpy(lta->name, msg->name, sizeof(lta->name));
- lta->name[LTTNG_UST_ABI_PROCNAME_LEN] = '\0';
lta->sessions = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
lta->ust_objd = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
lta->notify_sock = -1;
+
+ /* Copy name and make sure it's NULL terminated. */
+ strncpy(lta->name, msg->name, sizeof(lta->name));
+ lta->name[UST_APP_PROCNAME_LEN] = '\0';
+
+ /*
+ * Before this can be called, when receiving the registration information,
+ * the application compatibility is checked. So, at this point, the
+ * application can work with this session daemon.
+ */
lta->compatible = 1;
lta->pid = msg->pid;
return lta;
}
+/*
+ * For a given application object, add it to every hash table.
+ */
void ust_app_add(struct ust_app *app)
{
assert(app);
lttng_ht_add_unique_ulong(ust_app_ht_by_notify_sock, &app->notify_sock_n);
DBG("App registered with pid:%d ppid:%d uid:%d gid:%d sock:%d name:%s "
- "(version %d.%d)", app->pid, app->ppid, app->uid, app->gid,
- app->sock, app->name, app->v_major, app->v_minor);
+ "notify_sock:%d (version %d.%d)", app->pid, app->ppid, app->uid,
+ app->gid, app->sock, app->name, app->notify_sock, app->v_major,
+ app->v_minor);
rcu_read_unlock();
}
+/*
+ * Set the application version into the object.
+ *
+ * Return 0 on success else a negative value either an errno code or a
+ * LTTng-UST error code.
+ */
int ust_app_version(struct ust_app *app)
{
+ int ret;
+
assert(app);
- return ustctl_tracer_version(app->sock, &app->version);
+
+ ret = ustctl_tracer_version(app->sock, &app->version);
+ if (ret < 0) {
+ if (ret != -LTTNG_UST_ERR_EXITING && ret != -EPIPE) {
+ ERR("UST app %d verson failed with ret %d", app->sock, ret);
+ } else {
+ DBG3("UST app %d verion failed. Application is dead", app->sock);
+ }
+ }
+
+ return ret;
}
/*
ret = lttng_ht_del(ust_app_ht_by_sock, &iter);
assert(!ret);
- /* Remove application from notify hash table */
+ /*
+ * Remove application from notify hash table. The thread handling the
+ * notify socket could have deleted the node so ignore on error because
+ * either way it's valid. The close of that socket is handled by the other
+ * thread.
+ */
iter.iter.node = <a->notify_sock_n.node;
- ret = lttng_ht_del(ust_app_ht_by_notify_sock, &iter);
+ (void) lttng_ht_del(ust_app_ht_by_notify_sock, &iter);
/*
* Ignore return value since the node might have been removed before by an
* Add session to list for teardown. This is safe since at this point we
* are the only one using this list.
*/
+ pthread_mutex_lock(&ua_sess->lock);
+
+ /*
+ * Normally, this is done in the delete session process which is
+ * executed in the call rcu below. However, upon registration we can't
+ * afford to wait for the grace period before pushing data or else the
+ * data pending feature can race between the unregistration and stop
+ * command where the data pending command is sent *before* the grace
+ * period ended.
+ *
+ * The close metadata below nullifies the metadata pointer in the
+ * session so the delete session will NOT push/close a second time.
+ */
+ (void) push_metadata(lta, ua_sess);
+ (void) close_metadata(lta, ua_sess);
+
cds_list_add(&ua_sess->teardown_node, <a->teardown_head);
+ pthread_mutex_unlock(&ua_sess->lock);
}
/* Free memory */
assert(!ret);
}
+ /* Cleanup notify socket hash table */
+ cds_lfht_for_each_entry(ust_app_ht_by_notify_sock->ht, &iter.iter, app,
+ notify_sock_n.node) {
+ ret = lttng_ht_del(ust_app_ht_by_notify_sock, &iter);
+ assert(!ret);
+ }
+
/* Destroy is done only when the ht is empty */
lttng_ht_destroy(ust_app_ht);
lttng_ht_destroy(ust_app_ht_by_sock);
+ lttng_ht_destroy(ust_app_ht_by_notify_sock);
rcu_read_unlock();
}
rcu_read_lock();
if (!app->compatible) {
- goto end;
+ goto end_no_session;
}
ua_sess = lookup_session_by_app(usess, app);
if (ua_sess == NULL) {
- goto end;
+ goto end_no_session;
}
+ pthread_mutex_lock(&ua_sess->lock);
+
/*
* If started = 0, it means that stop trace has been called for a session
* that was never started. It's possible since we can have a fail start
DBG3("UST app failed to flush %s. Application is dead.",
ua_chan->name);
/* No need to continue. */
- goto end;
+ break;
}
/* Continuing flushing all buffers */
continue;
health_code_update();
- assert(ua_sess->metadata->is_sent);
- /* Flush all buffers before stopping */
- ret = ustctl_sock_flush_buffer(app->sock, ua_sess->metadata->obj);
+ ret = push_metadata(app, ua_sess);
if (ret < 0) {
- if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
- ERR("UST app PID %d metadata flush failed with ret %d", app->pid,
- ret);
- goto error_rcu_unlock;
- } else {
- DBG3("UST app failed to flush metadata. Application is dead.");
- }
+ goto error_rcu_unlock;
}
-end:
+ pthread_mutex_unlock(&ua_sess->lock);
+end_no_session:
rcu_read_unlock();
health_code_update();
return 0;
error_rcu_unlock:
+ pthread_mutex_unlock(&ua_sess->lock);
rcu_read_unlock();
health_code_update();
return -1;
ERR("UST app wait quiescent failed for app pid %d ret %d",
app->pid, ret);
}
-
end:
rcu_read_unlock();
health_code_update();
cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
ret = ust_app_stop_trace(usess, app);
if (ret < 0) {
+ ERR("UST app stop trace failed with ret %d", ret);
/* Continue to next apps even on error */
continue;
}
app = find_app_by_sock(sock);
if (app == NULL) {
- ERR("Failed to find app sock %d", sock);
+ /*
+ * Application can be unregistered before so this is possible hence
+ * simply stopping the update.
+ */
+ DBG3("UST app update failed to find app sock %d", sock);
goto error;
}
return ret;
}
+/*
+ * Return a ust app channel object using the application object and the channel
+ * object descriptor has a key. If not found, NULL is returned. A RCU read side
+ * lock MUST be acquired before calling this function.
+ */
static struct ust_app_channel *find_channel_by_objd(struct ust_app *app,
int objd)
{
return ua_chan;
}
+/*
+ * Reply to a register channel notification from an application on the notify
+ * socket. The channel metadata is also created.
+ *
+ * The session UST registry lock is acquired in this function.
+ *
+ * On success 0 is returned else a negative value.
+ */
static int reply_ust_register_channel(int sock, int sobjd, int cobjd,
size_t nr_fields, struct ustctl_field *fields)
{
/* Lookup application. If not found, there is a code flow error. */
app = find_app_by_notify_sock(sock);
- assert(app);
+ if (!app) {
+ DBG("Application socket %d is being teardown. Abort event notify",
+ sock);
+ ret = 0;
+ goto error_rcu_unlock;
+ }
/* Lookup channel by UST object descriptor. Should always be found. */
ua_chan = find_channel_by_objd(app, cobjd);
error:
pthread_mutex_unlock(&ua_sess->registry.lock);
+error_rcu_unlock:
rcu_read_unlock();
return ret;
}
+/*
+ * Add event to the UST channel registry. When the event is added to the
+ * registry, the metadata is also created. Once done, this replies to the
+ * application with the appropriate error code.
+ *
+ * The session UST registry lock is acquired in the function.
+ *
+ * On success 0 is returned else a negative value.
+ */
static int add_event_ust_registry(int sock, int sobjd, int cobjd, char *name,
char *sig, size_t nr_fields, struct ustctl_field *fields, int loglevel,
char *model_emf_uri)
/* Lookup application. If not found, there is a code flow error. */
app = find_app_by_notify_sock(sock);
- assert(app);
+ if (!app) {
+ DBG("Application socket %d is being teardown. Abort event notify",
+ sock);
+ ret = 0;
+ goto error_rcu_unlock;
+ }
/* Lookup channel by UST object descriptor. Should always be found. */
ua_chan = find_channel_by_objd(app, cobjd);
pthread_mutex_lock(&ua_sess->registry.lock);
- ret_code = ust_registry_create_event(&ua_sess->registry, &ua_chan->registry, sobjd, cobjd,
- name, sig, nr_fields, fields, loglevel, model_emf_uri, &event_id);
+ ret_code = ust_registry_create_event(&ua_sess->registry,
+ &ua_chan->registry, sobjd, cobjd, name, sig, nr_fields, fields,
+ loglevel, model_emf_uri, &event_id);
/*
* The return value is returned to ustctl so in case of an error, the
goto error;
}
+ DBG3("UST registry event %s has been added successfully", name);
+
error:
pthread_mutex_unlock(&ua_sess->registry.lock);
+error_rcu_unlock:
rcu_read_unlock();
return ret;
}
+/*
+ * Handle application notification through the given notify socket.
+ *
+ * Return 0 on success or else a negative value.
+ */
int ust_app_recv_notify(int sock)
{
int ret;
error:
return ret;
}
+
+/*
+ * Once the notify socket hangs up, this is called. First, it tries to find the
+ * corresponding application. On failure, the call_rcu to close the socket is
+ * executed. If an application is found, it tries to delete it from the notify
+ * socket hash table. Whathever the result, it proceeds to the call_rcu.
+ *
+ * Note that an object needs to be allocated here so on ENOMEM failure, the
+ * call RCU is not done but the rest of the cleanup is.
+ */
+void ust_app_notify_sock_unregister(int sock)
+{
+ int err_enomem = 0;
+ struct lttng_ht_iter iter;
+ struct ust_app *app;
+ struct ust_app_notify_sock_obj *obj;
+
+ assert(sock >= 0);
+
+ rcu_read_lock();
+
+ obj = zmalloc(sizeof(*obj));
+ if (!obj) {
+ /*
+ * An ENOMEM is kind of uncool. If this strikes we continue the
+ * procedure but the call_rcu will not be called. In this case, we
+ * accept the fd leak rather than possibly creating an unsynchronized
+ * state between threads.
+ *
+ * TODO: The notify object should be created once the notify socket is
+ * registered and stored independantely from the ust app object. The
+ * tricky part is to synchronize the teardown of the application and
+ * this notify object. Let's keep that in mind so we can avoid this
+ * kind of shenanigans with ENOMEM in the teardown path.
+ */
+ err_enomem = 1;
+ } else {
+ obj->fd = sock;
+ }
+
+ DBG("UST app notify socket unregister %d", sock);
+
+ /*
+ * Lookup application by notify socket. If this fails, this means that the
+ * hash table delete has already been done by the application
+ * unregistration process so we can safely close the notify socket in a
+ * call RCU.
+ */
+ app = find_app_by_notify_sock(sock);
+ if (!app) {
+ goto close_socket;
+ }
+
+ iter.iter.node = &app->notify_sock_n.node;
+
+ /*
+ * Whatever happens here either we fail or succeed, in both cases we have
+ * to close the socket after a grace period to continue to the call RCU
+ * here. If the deletion is successful, the application is not visible
+ * anymore by other threads and is it fails it means that it was already
+ * deleted from the hash table so either way we just have to close the
+ * socket.
+ */
+ (void) lttng_ht_del(ust_app_ht_by_notify_sock, &iter);
+
+close_socket:
+ rcu_read_unlock();
+
+ /*
+ * Close socket after a grace period to avoid for the socket to be reused
+ * before the application object is freed creating potential race between
+ * threads trying to add unique in the global hash table.
+ */
+ if (!err_enomem) {
+ call_rcu(&obj->head, close_notify_sock_rcu);
+ }
+}
#include "trace-ust.h"
#include "ust-registry.h"
-/* lttng-ust supported version. */
-//#define LTTNG_UST_COMM_MAJOR 2 /* comm protocol major version */
-//#define UST_APP_MAJOR_VERSION 3 /* Internal UST version supported */
-
#define UST_APP_EVENT_LIST_SIZE 32
-/* Process name (short). Extra for the NULL byte. */
-#define UST_APP_PROCNAME_LEN 17
+/* Process name (short). */
+#define UST_APP_PROCNAME_LEN 16
struct lttng_filter_bytecode;
struct lttng_ust_filter_bytecode;
extern int ust_consumerd64_fd, ust_consumerd32_fd;
+/*
+ * Object used to close the notify socket in a call_rcu(). Since the
+ * application might not be found, we need an independant object containing the
+ * notify socket fd.
+ */
+struct ust_app_notify_sock_obj {
+ int fd;
+ struct rcu_head head;
+};
+
struct ust_app_ht_key {
const char *name;
const struct lttng_ust_filter_bytecode *filter;
/* Channel and streams were sent to the UST tracer. */
int is_sent;
/* Unique key used to identify the channel on the consumer side. */
- unsigned long key;
+ uint64_t key;
/* Number of stream that this channel is expected to receive. */
unsigned int expected_stream_count;
char name[LTTNG_UST_SYM_NAME_LEN];
uid_t uid;
gid_t gid;
struct cds_list_head teardown_node;
+ /*
+ * Once at least *one* session is created onto the application, the
+ * corresponding consumer is set so we can use it on unregistration.
+ */
+ struct consumer_output *consumer;
};
/*
struct lttng_ust_tracer_version version;
uint32_t v_major; /* Version major number */
uint32_t v_minor; /* Version minor number */
- char name[UST_APP_PROCNAME_LEN];
+ /* Extra for the NULL byte. */
+ char name[UST_APP_PROCNAME_LEN + 1];
struct lttng_ht *sessions;
struct lttng_ht_node_ulong pid_n;
struct lttng_ht_node_ulong sock_n;
int ust_app_recv_notify(int sock);
void ust_app_add(struct ust_app *app);
struct ust_app *ust_app_create(struct ust_register_msg *msg, int sock);
+void ust_app_notify_sock_unregister(int sock);
#else /* HAVE_LIBLTTNG_UST_CTL */
void ust_app_add(struct ust_app *app)
{
}
+static inline
+void ust_app_notify_sock_unregister(int sock)
+{
+}
#endif /* HAVE_LIBLTTNG_UST_CTL */
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
+#include <inttypes.h>
#include <common/common.h>
#include <common/consumer.h>
struct consumer_socket *socket)
{
int ret;
- unsigned long key;
+ uint64_t key;
char *pathname = NULL;
struct lttcomm_consumer_msg msg;
/* We need at least one where 1 stream for 1 cpu. */
assert(ua_chan->expected_stream_count > 0);
- DBG2("UST ask channel %lu successfully done with %u stream(s)", key,
+ DBG2("UST ask channel %" PRIu64 " successfully done with %u stream(s)", key,
ua_chan->expected_stream_count);
error:
error:
return ret;
}
+
+/*
+ * Send metadata string to consumer.
+ *
+ * Return 0 on success else a negative value.
+ */
+int ust_consumer_push_metadata(struct consumer_socket *socket,
+ struct ust_app_session *ua_sess, char *metadata_str,
+ size_t len, size_t target_offset)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+
+ assert(socket);
+ assert(socket->fd >= 0);
+ assert(ua_sess);
+ assert(ua_sess->metadata);
+
+ DBG2("UST consumer push metadata to consumer socket %d", socket->fd);
+
+ msg.cmd_type = LTTNG_CONSUMER_PUSH_METADATA;
+ msg.u.push_metadata.key = ua_sess->metadata->key;
+ msg.u.push_metadata.target_offset = target_offset;
+ msg.u.push_metadata.len = len;
+
+ /*
+ * TODO: reenable these locks when the consumerd gets the ability to
+ * reorder the metadata it receives. This fits with locking in
+ * src/bin/lttng-sessiond/ust-app.c:push_metadata()
+ *
+ * pthread_mutex_lock(socket->lock);
+ */
+
+ health_code_update();
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ goto error;
+ }
+
+ DBG3("UST consumer push metadata on sock %d of len %lu", socket->fd, len);
+
+ ret = lttcomm_send_unix_sock(socket->fd, metadata_str, len);
+ if (ret < 0) {
+ fprintf(stderr, "send error: %d\n", ret);
+ goto error;
+ }
+
+ health_code_update();
+ ret = consumer_recv_status_reply(socket);
+ if (ret < 0) {
+ goto error;
+ }
+
+error:
+ health_code_update();
+ /*
+ * pthread_mutex_unlock(socket->lock);
+ */
+ return ret;
+}
+
+/*
+ * Send a close metdata command to consumer using the given channel key.
+ *
+ * Return 0 on success else a negative value.
+ */
+int ust_consumer_close_metadata(struct consumer_socket *socket,
+ struct ust_app_channel *ua_chan)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+
+ assert(ua_chan);
+ assert(socket);
+ assert(socket->fd >= 0);
+
+ DBG2("UST consumer close metadata channel key %lu", ua_chan->key);
+
+ msg.cmd_type = LTTNG_CONSUMER_CLOSE_METADATA;
+ msg.u.close_metadata.key = ua_chan->key;
+
+ pthread_mutex_lock(socket->lock);
+ health_code_update();
+
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ goto error;
+ }
+
+error:
+ health_code_update();
+ pthread_mutex_unlock(socket->lock);
+ return ret;
+}
+
+/*
+ * Send a setup metdata command to consumer using the given channel key.
+ *
+ * Return 0 on success else a negative value.
+ */
+int ust_consumer_setup_metadata(struct consumer_socket *socket,
+ struct ust_app_channel *ua_chan)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+
+ assert(ua_chan);
+ assert(socket);
+ assert(socket->fd >= 0);
+
+ DBG2("UST consumer setup metadata channel key %lu", ua_chan->key);
+
+ msg.cmd_type = LTTNG_CONSUMER_SETUP_METADATA;
+ msg.u.setup_metadata.key = ua_chan->key;
+
+ pthread_mutex_lock(socket->lock);
+ health_code_update();
+
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ goto error;
+ }
+
+error:
+ health_code_update();
+ pthread_mutex_unlock(socket->lock);
+ return ret;
+}
int ust_consumer_send_channel_to_ust(struct ust_app *app,
struct ust_app_session *ua_sess, struct ust_app_channel *channel);
+int ust_consumer_push_metadata(struct consumer_socket *socket,
+ struct ust_app_session *ua_sess, char *metadata_str,
+ size_t len, size_t target_offset);
+
+int ust_consumer_close_metadata(struct consumer_socket *socket,
+ struct ust_app_channel *ua_chan);
+
+int ust_consumer_setup_metadata(struct consumer_socket *socket,
+ struct ust_app_channel *ua_chan);
+
#endif /* _UST_CONSUMER_H */
" tracer_name = \"lttng-ust\";\n"
" tracer_major = %u;\n"
" tracer_minor = %u;\n"
- " tracer_patchlevel = %u;\n"
- "};\n\n",
+ " tracer_patchlevel = %u;\n",
hostname,
app->version.major,
app->version.minor,
if (app) {
ret = lttng_metadata_printf(session,
" vpid = %d;\n"
- " procname = \"%s\";\n"
- "};\n\n",
+ " procname = \"%s\";\n",
(int) app->pid,
app->name
);
/* Generated metadata. */
char *metadata; /* NOT null-terminated ! Use memcpy. */
size_t metadata_len, metadata_alloc_len;
+ /* Length of bytes sent to the consumer. */
+ size_t metadata_len_sent;
};
struct ust_registry_channel {
goto error;
}
- ret = close(pollfd);
- if (ret < 0) {
- PERROR("close sock %d", pollfd);
- }
- lttng_fd_put(LTTNG_FD_APPS, 1);
+ /* The socket is closed after a grace period here. */
+ ust_app_notify_sock_unregister(pollfd);
} else if (revents & (LPOLLIN | LPOLLPRI)) {
ret = ust_app_recv_notify(pollfd);
if (ret < 0) {
- ret = lttng_poll_del(&events, pollfd);
- if (ret < 0) {
- goto error;
- }
-
- ret = close(pollfd);
- if (ret < 0) {
- PERROR("close sock %d", pollfd);
- }
- lttng_fd_put(LTTNG_FD_APPS, 1);
+ /*
+ * If the notification failed either the application is
+ * dead or an internal error happened. In both cases,
+ * we can only continue here. If the application is
+ * dead, an unregistration will follow or else the
+ * application will notice that we are not responding
+ * on that socket and will close it.
+ */
+ continue;
}
} else {
ERR("Unknown poll events %u for sock %d", revents, pollfd);
* Find a stream. The consumer_data.lock must be locked during this
* call.
*/
-static struct lttng_consumer_stream *find_stream(int key,
+static struct lttng_consumer_stream *find_stream(uint64_t key,
struct lttng_ht *ht)
{
struct lttng_ht_iter iter;
- struct lttng_ht_node_ulong *node;
+ struct lttng_ht_node_u64 *node;
struct lttng_consumer_stream *stream = NULL;
assert(ht);
- /* Negative keys are lookup failures */
- if (key < 0) {
+ /* -1ULL keys are lookup failures */
+ if (key == (uint64_t) -1ULL) {
return NULL;
}
rcu_read_lock();
- lttng_ht_lookup(ht, (void *)((unsigned long) key), &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
+ lttng_ht_lookup(ht, &key, &iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
if (node != NULL) {
stream = caa_container_of(node, struct lttng_consumer_stream, node);
}
rcu_read_lock();
stream = find_stream(key, ht);
if (stream) {
- stream->key = -1;
+ stream->key = -1ULL;
/*
* We don't want the lookup to match, but we still need
* to iterate on this stream when iterating over the hash table. Just
* change the node key.
*/
- stream->node.key = -1;
+ stream->node.key = -1ULL;
}
rcu_read_unlock();
}
* RCU read side lock MUST be acquired before calling this function and
* protects the channel ptr.
*/
-struct lttng_consumer_channel *consumer_find_channel(unsigned long key)
+struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
{
struct lttng_ht_iter iter;
- struct lttng_ht_node_ulong *node;
+ struct lttng_ht_node_u64 *node;
struct lttng_consumer_channel *channel = NULL;
- /* Negative keys are lookup failures */
- if (key < 0) {
+ /* -1ULL keys are lookup failures */
+ if (key == (uint64_t) -1ULL) {
return NULL;
}
- lttng_ht_lookup(consumer_data.channel_ht, (void *) key, &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
+ lttng_ht_lookup(consumer_data.channel_ht, &key, &iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
if (node != NULL) {
channel = caa_container_of(node, struct lttng_consumer_channel, node);
}
static void free_stream_rcu(struct rcu_head *head)
{
- struct lttng_ht_node_ulong *node =
- caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct lttng_ht_node_u64 *node =
+ caa_container_of(head, struct lttng_ht_node_u64, head);
struct lttng_consumer_stream *stream =
caa_container_of(node, struct lttng_consumer_stream, node);
static void free_channel_rcu(struct rcu_head *head)
{
- struct lttng_ht_node_ulong *node =
- caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct lttng_ht_node_u64 *node =
+ caa_container_of(head, struct lttng_ht_node_u64, head);
struct lttng_consumer_channel *channel =
caa_container_of(node, struct lttng_consumer_channel, node);
*/
static void free_relayd_rcu(struct rcu_head *head)
{
- struct lttng_ht_node_ulong *node =
- caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct lttng_ht_node_u64 *node =
+ caa_container_of(head, struct lttng_ht_node_u64, head);
struct consumer_relayd_sock_pair *relayd =
caa_container_of(node, struct consumer_relayd_sock_pair, node);
int ret;
struct lttng_ht_iter iter;
- DBG("Consumer delete channel key %d", channel->key);
+ DBG("Consumer delete channel key %" PRIu64, channel->key);
pthread_mutex_lock(&consumer_data.lock);
call_rcu(&stream->node.head, free_stream_rcu);
}
-struct lttng_consumer_stream *consumer_allocate_stream(int channel_key,
- int stream_key,
+struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
+ uint64_t stream_key,
enum lttng_consumer_stream_state state,
const char *channel_name,
uid_t uid,
}
/* Key is always the wait_fd for streams. */
- lttng_ht_node_init_ulong(&stream->node, stream->key);
+ lttng_ht_node_init_u64(&stream->node, stream->key);
/* Init session id node with the stream session id */
- lttng_ht_node_init_ulong(&stream->node_session_id, stream->session_id);
+ lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
- DBG3("Allocated stream %s (key %d, relayd_id %d, session_id %" PRIu64,
+ DBG3("Allocated stream %s (key %" PRIu64 ", relayd_id %" PRIu64 ", session_id %" PRIu64,
stream->name, stream->key, stream->net_seq_idx, stream->session_id);
rcu_read_unlock();
assert(stream);
assert(ht);
- DBG3("Adding consumer stream %d", stream->key);
+ DBG3("Adding consumer stream %" PRIu64, stream->key);
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->lock);
/* Steal stream identifier to avoid having streams with the same key */
steal_stream_key(stream->key, ht);
- lttng_ht_add_unique_ulong(ht, &stream->node);
+ lttng_ht_add_unique_u64(ht, &stream->node);
/*
* Add stream to the stream_list_ht of the consumer data. No need to steal
* the key since the HT does not use it and we allow to add redundant keys
* into this table.
*/
- lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
+ lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
/* Check and cleanup relayd */
relayd = consumer_find_relayd(stream->net_seq_idx);
static int add_relayd(struct consumer_relayd_sock_pair *relayd)
{
int ret = 0;
- struct lttng_ht_node_ulong *node;
+ struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
assert(relayd);
lttng_ht_lookup(consumer_data.relayd_ht,
- (void *)((unsigned long) relayd->net_seq_idx), &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
+ &relayd->net_seq_idx, &iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
if (node != NULL) {
goto end;
}
- lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
+ lttng_ht_add_unique_u64(consumer_data.relayd_ht, &relayd->node);
end:
return ret;
obj->net_seq_idx = net_seq_idx;
obj->refcount = 0;
obj->destroy_flag = 0;
- lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx);
+ lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
error:
* RCU read-side lock must be held across this call and while using the
* returned object.
*/
-struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
+struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
{
struct lttng_ht_iter iter;
- struct lttng_ht_node_ulong *node;
+ struct lttng_ht_node_u64 *node;
struct consumer_relayd_sock_pair *relayd = NULL;
/* Negative keys are lookup failures */
- if (key < 0) {
+ if (key == (uint64_t) -1ULL) {
goto error;
}
- lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
+ lttng_ht_lookup(consumer_data.relayd_ht, &key,
&iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
if (node != NULL) {
relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
}
strncpy(channel->name, name, sizeof(channel->name));
channel->name[sizeof(channel->name) - 1] = '\0';
- lttng_ht_node_init_ulong(&channel->node, channel->key);
+ lttng_ht_node_init_u64(&channel->node, channel->key);
CDS_INIT_LIST_HEAD(&channel->streams.head);
- DBG("Allocated channel (key %d)", channel->key)
+ DBG("Allocated channel (key %" PRIu64 ")", channel->key)
end:
return channel;
int consumer_add_channel(struct lttng_consumer_channel *channel)
{
int ret = 0;
- struct lttng_ht_node_ulong *node;
+ struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
pthread_mutex_lock(&consumer_data.lock);
rcu_read_lock();
lttng_ht_lookup(consumer_data.channel_ht,
- (void *)((unsigned long) channel->key), &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
+ &channel->key, &iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
if (node != NULL) {
/* Channel already exist. Ignore the insertion */
- ERR("Consumer add channel key %d already exists!", channel->key);
+ ERR("Consumer add channel key %" PRIu64 " already exists!",
+ channel->key);
ret = -1;
goto end;
}
- lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
+ lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
end:
rcu_read_unlock();
lttng_ht_destroy(ht);
}
+void lttng_consumer_close_metadata(void)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ /*
+ * The Kernel consumer has a different metadata scheme so we don't
+ * close anything because the stream will be closed by the session
+ * daemon.
+ */
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ /*
+ * Close all metadata streams. The metadata hash table is passed and
+ * this call iterates over it by closing all wakeup fd. This is safe
+ * because at this point we are sure that the metadata producer is
+ * either dead or blocked.
+ */
+ lttng_ustconsumer_close_metadata(metadata_ht);
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ }
+}
+
/*
* Clean up a metadata stream and free its memory.
*/
int ret = 0;
struct consumer_relayd_sock_pair *relayd;
struct lttng_ht_iter iter;
- struct lttng_ht_node_ulong *node;
+ struct lttng_ht_node_u64 *node;
assert(stream);
assert(ht);
- DBG3("Adding metadata stream %d to hash table", stream->key);
+ DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->lock);
* Lookup the stream just to make sure it does not exist in our internal
* state. This should NEVER happen.
*/
- lttng_ht_lookup(ht, (void *)((unsigned long) stream->key), &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
+ lttng_ht_lookup(ht, &stream->key, &iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
assert(!node);
/* Find relayd and, if one is found, increment refcount. */
uatomic_dec(&stream->chan->nb_init_stream_left);
}
- lttng_ht_add_unique_ulong(ht, &stream->node);
+ lttng_ht_add_unique_u64(ht, &stream->node);
/*
* Add stream to the stream_list_ht of the consumer data. No need to steal
* the key since the HT does not use it and we allow to add redundant keys
* into this table.
*/
- lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
+ lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
rcu_read_unlock();
uint32_t revents, nb_fd;
struct lttng_consumer_stream *stream = NULL;
struct lttng_ht_iter iter;
- struct lttng_ht_node_ulong *node;
+ struct lttng_ht_node_u64 *node;
struct lttng_poll_event events;
struct lttng_consumer_local_data *ctx = data;
ssize_t len;
rcu_register_thread();
- metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!metadata_ht) {
/* ENOMEM at this point. Better to bail out. */
goto error;
}
rcu_read_lock();
- lttng_ht_lookup(metadata_ht, (void *)((unsigned long) pollfd),
- &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
+ {
+ uint64_t tmp_id = (uint64_t) pollfd;
+
+ lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
+ }
+ node = lttng_ht_iter_get_node_u64(&iter);
assert(node);
stream = caa_container_of(node, struct lttng_consumer_stream,
rcu_register_thread();
- data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (data_ht == NULL) {
/* ENOMEM at this point. Better to bail out. */
goto end;
ret = add_stream(new_stream, data_ht);
if (ret) {
- ERR("Consumer add stream %d failed. Continuing",
+ ERR("Consumer add stream %" PRIu64 " failed. Continuing",
new_stream->key);
/*
* At this point, if the add_stream fails, it is not in the
end:
DBG("Consumer thread sessiond poll exiting");
+ /*
+ * Close metadata streams since the producer is the session daemon which
+ * just died.
+ *
+ * NOTE: for now, this only applies to the UST tracer.
+ */
+ lttng_consumer_close_metadata();
+
/*
* when all fds have hung up, the polling thread
* can exit cleanly
*/
void lttng_consumer_init(void)
{
- consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
- consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
- consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
}
/*
goto error;
}
- DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
+ DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
relayd->net_seq_idx, fd);
}
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
- ht->match_fct, (void *)((unsigned long) id),
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct, &id,
&iter.iter, stream, node_session_id.node) {
/* If this call fails, the stream is being used hence data pending. */
ret = stream_try_lock(stream);
LTTNG_CONSUMER_ASK_CHANNEL_CREATION,
LTTNG_CONSUMER_GET_CHANNEL,
LTTNG_CONSUMER_DESTROY_CHANNEL,
+ LTTNG_CONSUMER_PUSH_METADATA,
+ LTTNG_CONSUMER_CLOSE_METADATA,
+ LTTNG_CONSUMER_SETUP_METADATA,
};
/* State of each fd in consumer */
enum consumer_channel_type {
CONSUMER_CHANNEL_TYPE_METADATA = 0,
- CONSUMER_CHANNEL_TYPE_DATA = 1,
+ CONSUMER_CHANNEL_TYPE_DATA = 1,
};
struct stream_list {
struct lttng_consumer_channel {
/* HT node used for consumer_data.channel_ht */
- struct lttng_ht_node_ulong node;
+ struct lttng_ht_node_u64 node;
/* Indexed key. Incremented value in the consumer. */
- int key;
+ uint64_t key;
/* Number of streams referencing this channel */
int refcount;
/* Tracing session id on the session daemon side. */
uid_t uid;
gid_t gid;
/* Relayd id of the channel. -1 if it does not apply. */
- int relayd_id;
+ int64_t relayd_id;
/*
* Number of streams NOT initialized yet. This is used in order to not
* delete this channel if streams are getting initialized.
* LTTNG_CONSUMER_GET_CHANNEL.
*/
struct stream_list streams;
+ /*
+ * Set if the channel is metadata. We keep a reference to the stream
+ * because we have to flush data once pushed by the session daemon. For a
+ * regular channel, this is always set to NULL.
+ */
+ struct lttng_consumer_stream *metadata_stream;
+ /*
+ * Metadata written so far. Helps keeping track of
+ * contiguousness and order.
+ */
+ uint64_t contig_metadata_written;
};
/*
*/
struct lttng_consumer_stream {
/* HT node used by the data_ht and metadata_ht */
- struct lttng_ht_node_ulong node;
+ struct lttng_ht_node_u64 node;
/* HT node used in consumer_data.stream_list_ht */
- struct lttng_ht_node_ulong node_session_id;
+ struct lttng_ht_node_u64 node_session_id;
/* Pointer to associated channel. */
struct lttng_consumer_channel *chan;
/* Key by which the stream is indexed for 'node'. */
- int key;
+ uint64_t key;
/*
* File descriptor of the data output file. This can be either a file or a
* socket fd for relayd streaming.
uid_t uid;
gid_t gid;
/* Network sequence number. Indicating on which relayd socket it goes. */
- int net_seq_idx;
+ uint64_t net_seq_idx;
/* Identify if the stream is the metadata */
unsigned int metadata_flag;
/* Used when the stream is set for network streaming */
*/
struct consumer_relayd_sock_pair {
/* Network sequence number. */
- int net_seq_idx;
+ int64_t net_seq_idx;
/* Number of stream associated with this relayd */
unsigned int refcount;
* this socket is for now only used in a single thread.
*/
struct lttcomm_sock data_sock;
- struct lttng_ht_node_ulong node;
+ struct lttng_ht_node_u64 node;
/* Session id on both sides for the sockets. */
uint64_t relayd_session_id;
*/
int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll);
-struct lttng_consumer_stream *consumer_allocate_stream(int channel_key,
- int stream_key,
+struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
+ uint64_t stream_key,
enum lttng_consumer_stream_state state,
const char *channel_name,
uid_t uid,
int cpu,
int *alloc_ret,
enum consumer_channel_type type);
-struct lttng_consumer_channel *consumer_allocate_channel(unsigned long key,
+struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
uint64_t session_id,
const char *pathname,
const char *name,
/* lttng-relayd consumer command */
struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
int net_seq_idx);
-struct consumer_relayd_sock_pair *consumer_find_relayd(int key);
-struct lttng_consumer_channel *consumer_find_channel(unsigned long key);
+struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key);
+struct lttng_consumer_channel *consumer_find_channel(uint64_t key);
int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
size_t data_size);
void consumer_steal_stream_key(int key, struct lttng_ht *ht);
return hash_match_key_ulong((void *) match_node->key, (void *) key);
}
+/*
+ * Match function for u64 node.
+ */
+static int match_u64(struct cds_lfht_node *node, const void *key)
+{
+ struct lttng_ht_node_u64 *match_node =
+ caa_container_of(node, struct lttng_ht_node_u64, node);
+
+ return hash_match_key_u64(&match_node->key, (void *) key);
+}
+
/*
* Return an allocated lttng hashtable.
*/
ht->match_fct = match_ulong;
ht->hash_fct = hash_key_ulong;
break;
+ case LTTNG_HT_TYPE_U64:
+ ht->match_fct = match_u64;
+ ht->hash_fct = hash_key_u64;
+ break;
default:
ERR("Unknown lttng hashtable type %d", type);
goto error;
cds_lfht_node_init(&node->node);
}
+/*
+ * Init lttng ht node uint64_t.
+ */
+void lttng_ht_node_init_u64(struct lttng_ht_node_u64 *node,
+ uint64_t key)
+{
+ assert(node);
+
+ node->key = key;
+ cds_lfht_node_init(&node->node);
+}
+
/*
* Free lttng ht node string.
*/
free(node);
}
+/*
+ * Free lttng ht node uint64_t.
+ */
+void lttng_ht_node_free_u64(struct lttng_ht_node_ulong *node)
+{
+ assert(node);
+ free(node);
+}
+
/*
* Lookup function in hashtable.
*/
&node->node);
}
+/*
+ * Add uint64_t node to hashtable.
+
+ */
+void lttng_ht_add_u64(struct lttng_ht *ht, struct lttng_ht_node_u64 *node)
+{
+ assert(ht);
+ assert(ht->ht);
+ assert(node);
+
+ cds_lfht_add(ht->ht, ht->hash_fct(&node->key, lttng_ht_seed),
+ &node->node);
+}
+
/*
* Add unique unsigned long node to hashtable.
*/
assert(node_ptr == &node->node);
}
+/*
+ * Add unique uint64_t node to hashtable.
+ */
+void lttng_ht_add_unique_u64(struct lttng_ht *ht,
+ struct lttng_ht_node_u64 *node)
+{
+ struct cds_lfht_node *node_ptr;
+ assert(ht);
+ assert(ht->ht);
+ assert(node);
+
+ node_ptr = cds_lfht_add_unique(ht->ht,
+ ht->hash_fct(&node->key, lttng_ht_seed), ht->match_fct,
+ &node->key, &node->node);
+ assert(node_ptr == &node->node);
+}
+
/*
* Add replace unsigned long node to hashtable.
*/
assert(node_ptr == &node->node);
}
+/*
+ * Add replace unsigned long node to hashtable.
+ */
+struct lttng_ht_node_u64 *lttng_ht_add_replace_u64(struct lttng_ht *ht,
+ struct lttng_ht_node_u64 *node)
+{
+ struct cds_lfht_node *node_ptr;
+ assert(ht);
+ assert(ht->ht);
+ assert(node);
+
+ node_ptr = cds_lfht_add_replace(ht->ht,
+ ht->hash_fct(&node->key, lttng_ht_seed), ht->match_fct,
+ &node->key, &node->node);
+ if (!node_ptr) {
+ return NULL;
+ } else {
+ return caa_container_of(node_ptr, struct lttng_ht_node_u64, node);
+ }
+ assert(node_ptr == &node->node);
+}
+
/*
* Delete node from hashtable.
*/
return caa_container_of(node, struct lttng_ht_node_ulong, node);
}
+/*
+ * Return lttng ht unsigned long node from iterator.
+ */
+struct lttng_ht_node_u64 *lttng_ht_iter_get_node_u64(
+ struct lttng_ht_iter *iter)
+{
+ struct cds_lfht_node *node;
+
+ assert(iter);
+ node = cds_lfht_iter_get_node(&iter->iter);
+ if (!node) {
+ return NULL;
+ }
+ return caa_container_of(node, struct lttng_ht_node_u64, node);
+}
+
/*
* lib constructor
*/
-static void __attribute__((constructor)) _init()
+static void __attribute__((constructor)) _init(void)
{
/* Init hash table seed */
lttng_ht_seed = (unsigned long) time(NULL);
#define _LTT_HT_H
#include <urcu.h>
+#include <stdint.h>
#include "rculfhash.h"
#include "rculfhash-internal.h"
enum lttng_ht_type {
LTTNG_HT_TYPE_STRING,
LTTNG_HT_TYPE_ULONG,
+ LTTNG_HT_TYPE_U64,
};
struct lttng_ht {
struct rcu_head head;
};
+struct lttng_ht_node_u64 {
+ uint64_t key;
+ struct cds_lfht_node node;
+ struct rcu_head head;
+};
+
/* Hashtable new and destroy */
extern struct lttng_ht *lttng_ht_new(unsigned long size, int type);
extern void lttng_ht_destroy(struct lttng_ht *ht);
extern void lttng_ht_node_init_str(struct lttng_ht_node_str *node, char *key);
extern void lttng_ht_node_init_ulong(struct lttng_ht_node_ulong *node,
unsigned long key);
+extern void lttng_ht_node_init_u64(struct lttng_ht_node_u64 *node,
+ uint64_t key);
extern void lttng_ht_node_free_str(struct lttng_ht_node_str *node);
extern void lttng_ht_node_free_ulong(struct lttng_ht_node_ulong *node);
+extern void lttng_ht_node_free_u64(struct lttng_ht_node_ulong *node);
extern void lttng_ht_lookup(struct lttng_ht *ht, void *key,
struct lttng_ht_iter *iter);
struct lttng_ht_node_str *node);
extern void lttng_ht_add_unique_ulong(struct lttng_ht *ht,
struct lttng_ht_node_ulong *node);
+extern void lttng_ht_add_unique_u64(struct lttng_ht *ht,
+ struct lttng_ht_node_u64 *node);
extern struct lttng_ht_node_ulong *lttng_ht_add_replace_ulong(
struct lttng_ht *ht, struct lttng_ht_node_ulong *node);
+extern struct lttng_ht_node_u64 *lttng_ht_add_replace_u64(
+ struct lttng_ht *ht, struct lttng_ht_node_u64 *node);
extern void lttng_ht_add_ulong(struct lttng_ht *ht,
struct lttng_ht_node_ulong *node);
+extern void lttng_ht_add_u64(struct lttng_ht *ht,
+ struct lttng_ht_node_u64 *node);
extern int lttng_ht_del(struct lttng_ht *ht, struct lttng_ht_iter *iter);
struct lttng_ht_iter *iter);
extern struct lttng_ht_node_ulong *lttng_ht_iter_get_node_ulong(
struct lttng_ht_iter *iter);
+extern struct lttng_ht_node_u64 *lttng_ht_iter_get_node_u64(
+ struct lttng_ht_iter *iter);
#endif /* _LTT_HT_H */
return c;
}
-#if (CAA_BITS_PER_LONG == 64)
-/*
- * Hash function for number value.
- */
LTTNG_HIDDEN
-unsigned long hash_key_ulong(void *_key, unsigned long seed)
+unsigned long hash_key_u64(void *_key, unsigned long seed)
{
union {
uint64_t v64;
} key;
v.v64 = (uint64_t) seed;
- key.v64 = (uint64_t) _key;
+ key.v64 = *(uint64_t *) _key;
hashword2(key.v32, 2, &v.v32[0], &v.v32[1]);
return v.v64;
}
+
+#if (CAA_BITS_PER_LONG == 64)
+/*
+ * Hash function for number value.
+ */
+LTTNG_HIDDEN
+unsigned long hash_key_ulong(void *_key, unsigned long seed)
+{
+ uint64_t __key = (uint64_t) _key;
+ return (unsigned long) hash_key_u64(&__key, seed);
+}
#else
/*
* Hash function for number value.
return 0;
}
+/*
+ * Hash function compare for number value.
+ */
+LTTNG_HIDDEN
+int hash_match_key_u64(void *key1, void *key2)
+{
+ if (*(uint64_t *) key1 == *(uint64_t *) key2) {
+ return 1;
+ }
+
+ return 0;
+}
+
/*
* Hash compare function for string.
*/
#include <stdint.h>
unsigned long hash_key_ulong(void *_key, unsigned long seed);
+unsigned long hash_key_u64(void *_key, unsigned long seed);
unsigned long hash_key_str(void *key, unsigned long seed);
int hash_match_key_ulong(void *key1, void *key2);
+int hash_match_key_u64(void *key1, void *key2);
int hash_match_key_str(void *key1, void *key2);
#endif /* _LTT_HT_UTILS_H */
goto end_nosignal;
}
- DBG("consumer_add_channel %d", msg.u.channel.channel_key);
+ DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
msg.u.channel.session_id, msg.u.channel.pathname,
msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid,
* We could not find the channel. Can happen if cpu hotplug
* happens while tearing down.
*/
- ERR("Unable to find channel key %d", msg.u.stream.channel_key);
+ ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key);
ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
}
consumer_del_stream(new_stream, NULL);
goto end_nosignal;
}
- } else if (new_stream->net_seq_idx != -1) {
- ERR("Network sequence index %d unknown. Not adding stream.",
+ } else if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
+ ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
new_stream->net_seq_idx);
consumer_del_stream(new_stream, NULL);
goto end_nosignal;
* network streaming or the full padding (len) size when we are _not_
* streaming.
*/
- if ((ret != subbuf_size && stream->net_seq_idx != -1) ||
- (ret != len && stream->net_seq_idx == -1)) {
+ if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
+ (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
/*
* Display the error but continue processing to try to release the
* subbuffer
}
/* Opening the tracefile in write mode */
- if (stream->net_seq_idx == -1) {
+ if (stream->net_seq_idx == (uint64_t) -1ULL) {
ret = run_as_open(full_path, O_WRONLY | O_CREAT | O_TRUNC,
S_IRWXU|S_IRWXG|S_IRWXO, stream->uid, stream->gid);
if (ret < 0) {
LTTCOMM_CONSUMERD_SPLICE_ENOMEM, /* ENOMEM from splice(2) */
LTTCOMM_CONSUMERD_SPLICE_ESPIPE, /* ESPIPE from splice(2) */
LTTCOMM_CONSUMERD_ENOMEM, /* Consumer is out of memory */
+ LTTCOMM_CONSUMERD_ERROR_METADATA, /* Error with metadata. */
+ LTTCOMM_CONSUMERD_FATAL, /* Fatal error. */
/* MUST be last element */
LTTCOMM_NR, /* Last element */
} LTTNG_PACKED;
struct lttcomm_sock {
- int fd;
+ int32_t fd;
enum lttcomm_sock_proto proto;
struct lttcomm_sockaddr sockaddr;
const struct lttcomm_proto_ops *ops;
uint32_t cmd_type; /* enum consumerd_command */
union {
struct {
- int channel_key;
+ uint64_t channel_key;
uint64_t session_id;
char pathname[PATH_MAX];
- uid_t uid;
- gid_t gid;
- int relayd_id;
+ uint32_t uid;
+ uint32_t gid;
+ uint64_t relayd_id;
/* nb_init_streams is the number of streams open initially. */
- unsigned int nb_init_streams;
+ uint32_t nb_init_streams;
char name[LTTNG_SYMBOL_NAME_LEN];
/* Use splice or mmap to consume this fd */
enum lttng_event_output output;
int type; /* Per cpu or metadata. */
} LTTNG_PACKED channel; /* Only used by Kernel. */
struct {
- int stream_key;
- int channel_key;
- int cpu; /* On which CPU this stream is assigned. */
+ uint64_t stream_key;
+ uint64_t channel_key;
+ int32_t cpu; /* On which CPU this stream is assigned. */
} LTTNG_PACKED stream; /* Only used by Kernel. */
struct {
- int net_index;
+ uint64_t net_index;
enum lttng_stream_type type;
/* Open socket to the relayd */
struct lttcomm_sock sock;
struct {
uint64_t subbuf_size; /* bytes */
uint64_t num_subbuf; /* power of 2 */
- int overwrite; /* 1: overwrite, 0: discard */
- unsigned int switch_timer_interval; /* usec */
- unsigned int read_timer_interval; /* usec */
- int output; /* splice, mmap */
- int type; /* metadata or per_cpu */
+ int32_t overwrite; /* 1: overwrite, 0: discard */
+ uint32_t switch_timer_interval; /* usec */
+ uint32_t read_timer_interval; /* usec */
+ int32_t output; /* splice, mmap */
+ int32_t type; /* metadata or per_cpu */
uint64_t session_id; /* Tracing session id */
char pathname[PATH_MAX]; /* Channel file path. */
char name[LTTNG_SYMBOL_NAME_LEN]; /* Channel name. */
- uid_t uid; /* User ID of the session */
- gid_t gid; /* Group ID ot the session */
- int relayd_id; /* Relayd id if apply. */
- unsigned long key; /* Unique channel key. */
+ uint32_t uid; /* User ID of the session */
+ uint32_t gid; /* Group ID ot the session */
+ uint64_t relayd_id; /* Relayd id if apply. */
+ uint64_t key; /* Unique channel key. */
unsigned char uuid[UUID_STR_LEN]; /* uuid for ust tracer. */
} LTTNG_PACKED ask_channel;
struct {
- unsigned long key;
+ uint64_t key;
} LTTNG_PACKED get_channel;
struct {
- unsigned long key;
+ uint64_t key;
} LTTNG_PACKED destroy_channel;
+ struct {
+ uint64_t key; /* Metadata channel key. */
+ uint64_t target_offset; /* Offset in the consumer */
+ uint64_t len; /* Length of metadata to be received. */
+ } LTTNG_PACKED push_metadata;
+ struct {
+ uint64_t key; /* Metadata channel key. */
+ } LTTNG_PACKED close_metadata;
+ struct {
+ uint64_t key; /* Metadata channel key. */
+ } LTTNG_PACKED setup_metadata;
} u;
} LTTNG_PACKED;
struct lttcomm_consumer_status_channel {
enum lttng_error_code ret_code;
- unsigned long key;
+ uint64_t key;
unsigned int stream_count;
} LTTNG_PACKED;
ret = consumer_add_channel(channel);
}
- DBG("UST consumer channel added (key: %u)", channel->key);
+ DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key);
error:
return ret;
*/
static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
const char *pathname, const char *name, uid_t uid, gid_t gid,
- int relayd_id, unsigned long key, enum lttng_event_output output)
+ int relayd_id, uint64_t key, enum lttng_event_output output)
{
assert(pathname);
assert(name);
if (ret < 0) {
goto error;
}
- } else if (stream->net_seq_idx != -1) {
- ERR("Network sequence index %d unknown. Not adding stream.",
+ } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
+ ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
stream->net_seq_idx);
ret = -1;
goto error;
return ret;
}
+/*
+ * Create streams for the given channel using liblttng-ust-ctl.
+ *
+ * Return 0 on success else a negative value.
+ */
static int create_ust_streams(struct lttng_consumer_channel *channel,
struct lttng_consumer_local_data *ctx)
{
}
}
- DBG("UST consumer add stream %s (key: %d) with relayd id %" PRIu64,
+ DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64,
stream->name, stream->key, stream->relayd_stream_id);
/* Set next CPU stream. */
channel->streams.count = ++cpu;
+
+ /* Keep stream reference when creating metadata. */
+ if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
+ channel->metadata_stream = stream;
+ }
}
return 0;
return ret;
}
+/*
+ * Send a single given stream to the session daemon using the sock.
+ *
+ * Return 0 on success else a negative value.
+ */
static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
{
int ret;
assert(stream);
assert(sock >= 0);
- DBG2("UST consumer sending stream %d to sessiond", stream->key);
+ DBG2("UST consumer sending stream %" PRIu64 " to sessiond", stream->key);
/* Send stream to session daemon. */
ret = ustctl_send_stream_to_sessiond(sock, stream->ustream);
/*
* Send channel to sessiond.
*
- * Return 0 on success or else a negative value. On error, the channel is
- * destroy using ustctl.
+ * Return 0 on success or else a negative value.
*/
static int send_sessiond_channel(int sock,
struct lttng_consumer_channel *channel,
return ret;
}
+/*
+ * Send all stream of a channel to the right thread handling it.
+ *
+ * On error, return a negative value else 0 on success.
+ */
+static int send_streams_to_thread(struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret = 0;
+ struct lttng_consumer_stream *stream, *stmp;
+
+ assert(channel);
+ assert(ctx);
+
+ /* Send streams to the corresponding thread. */
+ cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
+ send_node) {
+ /* Sending the stream to the thread. */
+ ret = send_stream_to_thread(stream, ctx);
+ if (ret < 0) {
+ /*
+ * If we are unable to send the stream to the thread, there is
+ * a big problem so just stop everything.
+ */
+ goto error;
+ }
+
+ /* Remove node from the channel stream list. */
+ cds_list_del(&stream->send_node);
+ }
+
+error:
+ return ret;
+}
+
+/*
+ * Write metadata to the given channel using ustctl to convert the string to
+ * the ringbuffer.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int push_metadata(struct lttng_consumer_channel *metadata,
+ const char *metadata_str, uint64_t target_offset, uint64_t len)
+{
+ int ret;
+
+ assert(metadata);
+ assert(metadata_str);
+
+ DBG("UST consumer writing metadata to channel %s", metadata->name);
+
+ assert(target_offset == metadata->contig_metadata_written);
+ ret = ustctl_write_metadata_to_channel(metadata->uchan, metadata_str, len);
+ if (ret < 0) {
+ ERR("ustctl write metadata fail with ret %d, len %ld", ret, len);
+ goto error;
+ }
+ metadata->contig_metadata_written += len;
+
+ ustctl_flush_buffer(metadata->metadata_stream->ustream, 1);
+
+error:
+ return ret;
+}
+
+/*
+ * Close metadata stream wakeup_fd using the given key to retrieve the channel.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int close_metadata(uint64_t chan_key)
+{
+ int ret;
+ struct lttng_consumer_channel *channel;
+
+ DBG("UST consumer close metadata key %lu", chan_key);
+
+ channel = consumer_find_channel(chan_key);
+ if (!channel) {
+ ERR("UST consumer close metadata %lu not found", chan_key);
+ ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ goto error;
+ }
+
+ ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
+ if (ret < 0) {
+ ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
+ ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+ goto error;
+ }
+
+error:
+ return ret;
+}
+
+/*
+ * RCU read side lock MUST be acquired before calling this function.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
+{
+ int ret;
+ struct lttng_consumer_channel *metadata;
+
+ DBG("UST consumer setup metadata key %lu", key);
+
+ metadata = consumer_find_channel(key);
+ if (!metadata) {
+ ERR("UST consumer push metadata %" PRIu64 " not found", key);
+ ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ goto error;
+ }
+
+ /*
+ * Send metadata stream to relayd if one available. Availability is
+ * known if the stream is still in the list of the channel.
+ */
+ if (cds_list_empty(&metadata->streams.head)) {
+ ERR("Metadata channel key %" PRIu64 ", no stream available.", key);
+ ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+ goto error;
+ }
+
+ /* Send metadata stream to relayd if needed. */
+ ret = send_stream_to_relayd(metadata->metadata_stream);
+ if (ret < 0) {
+ ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+ goto error;
+ }
+
+ ret = send_streams_to_thread(metadata, ctx);
+ if (ret < 0) {
+ /*
+ * If we are unable to send the stream to the thread, there is
+ * a big problem so just stop everything.
+ */
+ ret = LTTCOMM_CONSUMERD_FATAL;
+ goto error;
+ }
+ /* List MUST be empty after or else it could be reused. */
+ assert(cds_list_empty(&metadata->streams.head));
+
+ ret = 0;
+
+error:
+ return ret;
+}
+
/*
* Receive command from session daemon and process it.
*
consumer_flag_relayd_for_destroy(relayd);
}
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
- /* Somehow, the session daemon is not responding anymore. */
- goto end_nosignal;
- }
-
- goto end_nosignal;
+ goto end_msg_sessiond;
}
case LTTNG_CONSUMER_UPDATE_STREAM:
{
case LTTNG_CONSUMER_GET_CHANNEL:
{
int ret, relayd_err = 0;
- unsigned long key = msg.u.get_channel.key;
+ uint64_t key = msg.u.get_channel.key;
struct lttng_consumer_channel *channel;
- struct lttng_consumer_stream *stream, *stmp;
channel = consumer_find_channel(key);
if (!channel) {
goto error_fatal;
}
- /* Send streams to the corresponding thread. */
- cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
- send_node) {
- /* Sending the stream to the thread. */
- ret = send_stream_to_thread(stream, ctx);
- if (ret < 0) {
- /*
- * If we are unable to send the stream to the thread, there is
- * a big problem so just stop everything.
- */
- goto error_fatal;
- }
-
- /* Remove node from the channel stream list. */
- cds_list_del(&stream->send_node);
+ ret = send_streams_to_thread(channel, ctx);
+ if (ret < 0) {
+ /*
+ * If we are unable to send the stream to the thread, there is
+ * a big problem so just stop everything.
+ */
+ goto error_fatal;
}
-
/* List MUST be empty after or else it could be reused. */
assert(cds_list_empty(&channel->streams.head));
- /* Inform sessiond that everything is done and OK on our side. */
- ret = consumer_send_status_msg(sock, LTTNG_OK);
- if (ret < 0) {
- /* Somehow, the session daemon is not responding anymore. */
- goto end_nosignal;
+ goto end_msg_sessiond;
+ }
+ case LTTNG_CONSUMER_DESTROY_CHANNEL:
+ {
+ uint64_t key = msg.u.destroy_channel.key;
+ struct lttng_consumer_channel *channel;
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ ERR("UST consumer get channel key %lu not found", key);
+ ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ goto end_msg_sessiond;
}
- break;
+ destroy_channel(channel);
+
+ goto end_msg_sessiond;
}
- case LTTNG_CONSUMER_DESTROY_CHANNEL:
+ case LTTNG_CONSUMER_CLOSE_METADATA:
+ {
+ int ret;
+
+ ret = close_metadata(msg.u.close_metadata.key);
+ if (ret != 0) {
+ ret_code = ret;
+ }
+
+ goto end_msg_sessiond;
+ }
+ case LTTNG_CONSUMER_PUSH_METADATA:
{
int ret;
- unsigned long key = msg.u.destroy_channel.key;
+ uint64_t len = msg.u.push_metadata.len;
+ uint64_t target_offset = msg.u.push_metadata.target_offset;
+ uint64_t key = msg.u.push_metadata.key;
struct lttng_consumer_channel *channel;
+ char *metadata_str;
- DBG("UST consumer destroy channel key %lu", key);
+ DBG("UST consumer push metadata key %lu of len %lu", key, len);
channel = consumer_find_channel(key);
if (!channel) {
- ERR("UST consumer destroy channel %lu not found", key);
+ ERR("UST consumer push metadata %lu not found", key);
ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
- } else {
- /* Protocol error if the stream list is NOT empty. */
- assert(!cds_list_empty(&channel->streams.head));
- consumer_del_channel(channel);
}
+ metadata_str = zmalloc(len * sizeof(char));
+ if (!metadata_str) {
+ PERROR("zmalloc metadata string");
+ ret_code = LTTCOMM_CONSUMERD_ENOMEM;
+ goto end_msg_sessiond;
+ }
+
+ /* Tell session daemon we are ready to receive the metadata. */
ret = consumer_send_status_msg(sock, LTTNG_OK);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
+ goto error_fatal;
+ }
+
+ /* Wait for more data. */
+ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ goto end_nosignal;
+ }
+
+ /* Receive metadata string. */
+ ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
+ if (ret < 0) {
+ /* Session daemon is dead so return gracefully. */
goto end_nosignal;
}
+
+ ret = push_metadata(channel, metadata_str, target_offset, len);
+ free(metadata_str);
+ if (ret < 0) {
+ /* Unable to handle metadata. Notify session daemon. */
+ ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
+ goto end_msg_sessiond;
+ }
+
+ goto end_msg_sessiond;
+ }
+ case LTTNG_CONSUMER_SETUP_METADATA:
+ {
+ int ret;
+
+ ret = setup_metadata(ctx, msg.u.setup_metadata.key);
+ if (ret) {
+ ret_code = ret;
+ }
+ goto end_msg_sessiond;
}
default:
break;
* The mmap operation should write subbuf_size amount of data when network
* streaming or the full padding (len) size when we are _not_ streaming.
*/
- if ((ret != subbuf_size && stream->net_seq_idx != -1) ||
- (ret != len && stream->net_seq_idx == -1)) {
+ if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
+ (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
/*
* Display the error but continue processing to try to release the
* subbuffer. This is a DBG statement since any unexpected kill or
char full_path[PATH_MAX];
/* Opening the tracefile in write mode */
- if (stream->net_seq_idx != -1) {
+ if (stream->net_seq_idx != (uint64_t) -1ULL) {
goto end;
}
end:
return ret;
}
+
+/*
+ * Close every metadata stream wait fd of the metadata hash table. This
+ * function MUST be used very carefully so not to run into a race between the
+ * metadata thread handling streams and this function closing their wait fd.
+ *
+ * For UST, this is used when the session daemon hangs up. Its the metadata
+ * producer so calling this is safe because we are assured that no state change
+ * can occur in the metadata thread for the streams in the hash table.
+ */
+void lttng_ustconsumer_close_metadata(struct lttng_ht *metadata_ht)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+
+ assert(metadata_ht);
+ assert(metadata_ht->ht);
+
+ DBG("UST consumer closing all metadata streams");
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream,
+ node.node) {
+ int fd = stream->wait_fd;
+
+ /*
+ * Whatever happens here we have to continue to try to close every
+ * streams. Let's report at least the error on failure.
+ */
+ ret = ustctl_stream_close_wakeup_fd(stream->ustream);
+ if (ret) {
+ ERR("Unable to close metadata stream fd %d ret %d", fd, ret);
+ }
+ DBG("Metadata wait fd %d closed", fd);
+ }
+ rcu_read_unlock();
+}
unsigned long *off);
void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream);
int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream);
+void lttng_ustconsumer_close_metadata(struct lttng_ht *ht);
#else /* HAVE_LIBLTTNG_UST_CTL */
{
return NULL;
}
+static inline
+void lttng_ustconsumer_close_metadata(struct lttng_ht *ht)
+{
+}
#endif /* HAVE_LIBLTTNG_UST_CTL */
#endif /* _LTTNG_USTCONSUMER_H */