--- /dev/null
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2014 - David Goulet <dgoulet@efficios.com>
+#
+# This library is free software; you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; version 2.1 of the License.
+#
+# This library is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this library; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+from __future__ import unicode_literals
+
+import ctypes
+import errno
+import logging
+import os
+import sys
+import threading
+import struct
+import select
+
+from select import epoll, EPOLLIN, EPOLLERR, EPOLLHUP
+from socket import *
+from time import sleep
+
+__all__ = ["lttng-agent"]
+__author__ = "David Goulet <dgoulet@efficios.com>"
+
+class LTTngAgent():
+ """
+ LTTng agent python code. A LTTng Agent is responsible to spawn two threads,
+ the current UID and root session daemon. Those two threads register to the
+ right daemon and handle the tracing.
+
+ This class needs to be instantiate once and once the init returns, tracing
+ is ready to happen.
+ """
+
+ SESSIOND_ADDR = "127.0.0.1"
+ SEM_COUNT = 2
+ # Timeout for the sempahore in seconds.
+ SEM_TIMEOUT = 5
+ SEM_WAIT_PERIOD = 0.2
+
+ def __init__(self):
+ # Session daemon register semaphore.
+ self.register_sem = threading.Semaphore(LTTngAgent.SEM_COUNT);
+
+ self.client_user = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem)
+ self.client_user.start()
+
+ self.client_root = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem)
+ self.client_root.log_handler.is_root = True
+ self.client_root.start()
+
+ acquire = 0
+ timeout = LTTngAgent.SEM_TIMEOUT
+ while True:
+ # Quit if timeout has reached 0 or below.
+ if acquire == LTTngAgent.SEM_COUNT or timeout <= 0:
+ break;
+
+ # Acquire semaphore for *user* thread.
+ if not self.register_sem.acquire(False):
+ sleep(LTTngAgent.SEM_WAIT_PERIOD)
+ timeout -= LTTngAgent.SEM_WAIT_PERIOD
+ else:
+ acquire += 1
+
+ def __del__(self):
+ self.destroy()
+
+ def destroy(self):
+ self.client_user.destroy()
+ self.client_user.join()
+
+ self.client_root.destroy()
+ self.client_root.join()
+
+class LTTngCmdError(RuntimeError):
+ """
+ Command error thrown if an error is encountered in a command from the
+ session daemon.
+ """
+
+ def __init__(self, code):
+ super().__init__('LTTng command error: code {}'.format(code))
+ self._code = code
+
+ def get_code(self):
+ return self._code
+
+class LTTngUnknownCmdError(RuntimeError):
+ pass
+
+class LTTngLoggingHandler(logging.Handler):
+ """
+ Class handler for the Python logging API.
+ """
+
+ def __init__(self):
+ logging.Handler.__init__(self, level = logging.NOTSET)
+
+ # Refcount tracking how many events have been enabled. This value above
+ # 0 means that the handler is attached to the root logger.
+ self.refcount = 0
+
+ # Dict of enabled event. We track them so we know if it's ok to disable
+ # the received event.
+ self.enabled_events = {}
+
+ # Am I root ?
+ self.is_root = False
+
+ # Using the logging formatter to extract the asctime only.
+ self.log_fmt = logging.Formatter("%(asctime)s")
+ self.setFormatter(self.log_fmt)
+
+ # ctypes lib for lttng-ust
+ try:
+ self.lttng_ust = ctypes.cdll.LoadLibrary("LIBDIR_STR/liblttng-ust-python-agent.so")
+ except OSError as e:
+ print("Unable to find libust for Python.")
+
+ def emit(self, record):
+ """
+ Fire LTTng UST tracepoint with the given record.
+ """
+ asctime = self.format(record)
+
+ self.lttng_ust.py_tracepoint(asctime.encode(),
+ record.getMessage().encode(), record.name.encode(),
+ record.funcName.encode(), record.lineno, record.levelno,
+ record.thread, record.threadName.encode())
+
+ def enable_event(self, name):
+ """
+ Enable an event name which will ultimately add an handler to the root
+ logger if none is present.
+ """
+ # Don't update the refcount if the event has been enabled prior.
+ if name in self.enabled_events:
+ return
+
+ # Get the root logger and attach our handler.
+ root_logger = logging.getLogger()
+ # First thing first, we need to set the root logger to the loglevel
+ # NOTSET so we can catch everything. The default is 30.
+ root_logger.setLevel(logging.NOTSET)
+
+ self.refcount += 1
+ if self.refcount == 1:
+ root_logger.addHandler(self)
+
+ self.enabled_events[name] = True
+
+ def disable_event(self, name):
+ """
+ Disable an event name which will ultimately add an handler to the root
+ logger if none is present.
+ """
+
+ if name not in self.enabled_events:
+ # Event was not enabled prior, do nothing.
+ return
+
+ # Get the root logger and attach our handler.
+ root_logger = logging.getLogger()
+
+ self.refcount -= 1
+ if self.refcount == 0:
+ root_logger.removeHandler(self)
+ del self.enabled_events[name]
+
+ def list_logger(self):
+ """
+ Return a list of logger name.
+ """
+ return logging.Logger.manager.loggerDict.keys()
+
+class LTTngSessiondCmd():
+ """
+ Class handling session daemon command.
+ """
+
+ # Command values from the agent protocol
+ CMD_LIST = 1
+ CMD_ENABLE = 2
+ CMD_DISABLE = 3
+ CMD_REG_DONE = 4
+
+ # Return code
+ CODE_SUCCESS = 1
+ CODE_INVALID_CMD = 2
+
+ # Python Logger LTTng domain value taken from lttng/domain.h
+ DOMAIN = 5
+
+ # Protocol version
+ MAJOR_VERSION = 1
+ MINOR_VERSION = 0
+
+ def execute(self):
+ """
+ This is part of the command interface. Must be implemented.
+ """
+ raise NotImplementedError
+
+class LTTngCommandReply():
+ """
+ Object that contains the information that should be replied to the session
+ daemon after a command execution.
+ """
+
+ def __init__(self, payload = None, reply = True):
+ self.payload = payload
+ self.reply = reply
+
+class LTTngCommandEnable(LTTngSessiondCmd):
+ """
+ Handle the enable event command from the session daemon.
+ """
+
+ def __init__(self, log_handler, data):
+ self.log_handler = log_handler
+ # 4 bytes for loglevel and 4 bytes for loglevel_type thus 8.
+ name_offset = 8;
+
+ data_size = len(data)
+ if data_size == 0:
+ raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
+
+ try:
+ self.loglevel, self.loglevel_type, self.name = \
+ struct.unpack('>II%us' % (data_size - name_offset), data)
+ # Remove trailing NULL bytes from name.
+ self.name = self.name.decode().rstrip('\x00')
+ except struct.error:
+ raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
+
+ def execute(self):
+ self.log_handler.enable_event(self.name)
+ return LTTngCommandReply()
+
+class LTTngCommandDisable(LTTngSessiondCmd):
+ """
+ Handle the disable event command from the session daemon.
+ """
+
+ def __init__(self, log_handler, data):
+ self.log_handler = log_handler
+
+ data_size = len(data)
+ if data_size == 0:
+ raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
+
+ try:
+ self.name = struct.unpack('>%us' % (data_size), data)[0]
+ # Remove trailing NULL bytes from name.
+ self.name = self.name.decode().rstrip('\x00')
+ except struct.error:
+ raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
+
+ def execute(self):
+ self.log_handler.disable_event(self.name)
+ return LTTngCommandReply()
+
+class LTTngCommandRegDone(LTTngSessiondCmd):
+ """
+ Handle register done command. This is sent back after a successful
+ registration from the session daemon. We basically release the given
+ semaphore so the agent can return to the caller.
+ """
+
+ def __init__(self, sem):
+ self.sem = sem
+
+ def execute(self):
+ self.sem.release()
+ return LTTngCommandReply(reply = False)
+
+class LTTngCommandList(LTTngSessiondCmd):
+ """
+ Handle the list command from the session daemon on the given socket.
+ """
+
+ def __init__(self, log_handler):
+ self.log_handler = log_handler
+
+ def execute(self):
+ data_size = 0
+ data = logger_data = bytearray()
+
+ loggers = self.log_handler.list_logger()
+ # First, pack nb_event that must preceed the data.
+ logger_data += struct.pack('>I', len(loggers))
+
+ # Populate payload with logger name.
+ for logger in loggers:
+ # Increment data size plus the NULL byte at the end of the name.
+ data_size += len(logger) + 1
+ # Pack logger name and NULL byte.
+ logger_data += struct.pack('>%usB' % (len(logger)), \
+ bytes(bytearray(str.encode(logger))), 0)
+
+ # Pack uint32_t data_size followed by nb event (number of logger)
+ data = struct.pack('>I', data_size)
+ data += logger_data
+ return LTTngCommandReply(payload = data)
+
+class LTTngTCPClient(threading.Thread):
+ """
+ TCP client that register and receives command from the session daemon.
+ """
+
+ SYSTEM_PORT_FILE = "/var/run/lttng/agent.port"
+ USER_PORT_FILE = os.path.join(os.path.expanduser("~"), ".lttng/agent.port")
+
+ # The time in seconds this client should wait before trying again to
+ # register back to the session daemon.
+ WAIT_TIME = 3
+
+ def __init__(self, host, sem):
+ threading.Thread.__init__(self)
+
+ # Which host to connect to. The port is fetch dynamically.
+ self.sessiond_host = host
+
+ # The session daemon register done semaphore. Needs to be released when
+ # receiving a CMD_REG_DONE command.
+ self.register_sem = sem
+ self.register_sem.acquire()
+
+ # Indicate that we have to quit thus stop the main loop.
+ self.quit_flag = False
+ # Quit pipe. The thread poll on it to know when to quit.
+ self.quit_pipe = os.pipe()
+
+ # Socket on which we communicate with the session daemon.
+ self.sessiond_sock = None
+ # LTTng Logging Handler
+ self.log_handler = LTTngLoggingHandler()
+
+ def cleanup_socket(self, epfd = None):
+ # Ease our life a bit.
+ sock = self.sessiond_sock
+ if not sock:
+ return
+
+ try:
+ if epfd is not None:
+ epfd.unregister(sock)
+ sock.shutdown(SHUT_RDWR)
+ sock.close()
+ except select.error:
+ # Cleanup fail, we can't do anything much...
+ pass
+ except IOError:
+ pass
+
+ self.sessiond_sock = None
+
+ def destroy(self):
+ self.quit_flag = True
+ try:
+ fp = os.fdopen(self.quit_pipe[1], 'w')
+ fp.write("42")
+ fp.close()
+ except OSError as e:
+ pass
+
+ def register(self):
+ """
+ Register to session daemon using the previously connected socket of the
+ class.
+
+ Command ABI:
+ uint32 domain
+ uint32 pid
+ """
+ data = struct.pack('>IIII', LTTngSessiondCmd.DOMAIN, os.getpid(), \
+ LTTngSessiondCmd.MAJOR_VERSION, LTTngSessiondCmd.MINOR_VERSION)
+ self.sessiond_sock.send(data)
+
+ def run(self):
+ """
+ Start the TCP client thread by registering to the session daemon and polling
+ on that socket for commands.
+ """
+
+ epfd = epoll()
+ epfd.register(self.quit_pipe[0], EPOLLIN)
+
+ # Main loop to handle session daemon command and disconnection.
+ while not self.quit_flag:
+ try:
+ # First, connect to the session daemon.
+ self.connect_sessiond()
+
+ # Register to session daemon after a successful connection.
+ self.register()
+ # Add registered socket to poll set.
+ epfd.register(self.sessiond_sock, EPOLLIN | EPOLLERR | EPOLLHUP)
+
+ self.quit_flag = self.wait_cmd(epfd)
+ except IOError as e:
+ # Whatever happens here, we have to close down everything and
+ # retry to connect to the session daemon since either the
+ # socket is closed or invalid data was sent.
+ self.cleanup_socket(epfd)
+ self.register_sem.release()
+ sleep(LTTngTCPClient.WAIT_TIME)
+ continue
+
+ self.cleanup_socket(epfd)
+ os.close(self.quit_pipe[0])
+ epfd.close()
+
+ def recv_header(self, sock):
+ """
+ Receive the command header from the given socket. Set the internal
+ state of this object with the header data.
+
+ Header ABI is defined like this:
+ uint64 data_size
+ uint32 cmd
+ uint32 cmd_version
+ """
+ s_pack = struct.Struct('>QII')
+
+ pack_data = sock.recv(s_pack.size)
+ data_received = len(pack_data)
+ if data_received == 0:
+ raise IOError(errno.ESHUTDOWN)
+
+ try:
+ return s_pack.unpack(pack_data)
+ except struct.error:
+ raise IOError(errno.EINVAL)
+
+ def create_command(self, cmd_type, version, data):
+ """
+ Return the right command object using the given command type. The
+ command version is unused since we only have once for now.
+ """
+
+ cmd_dict = {
+ LTTngSessiondCmd.CMD_LIST: \
+ lambda: LTTngCommandList(self.log_handler),
+ LTTngSessiondCmd.CMD_ENABLE: \
+ lambda: LTTngCommandEnable(self.log_handler, data),
+ LTTngSessiondCmd.CMD_DISABLE: \
+ lambda: LTTngCommandDisable(self.log_handler, data),
+ LTTngSessiondCmd.CMD_REG_DONE: \
+ lambda: LTTngCommandRegDone(self.register_sem),
+ }
+
+ if cmd_type in cmd_dict:
+ return cmd_dict[cmd_type]()
+ else:
+ raise LTTngUnknownCmdError()
+
+ def pack_code(self, code):
+ return struct.pack('>I', code)
+
+ def handle_command(self, data, cmd_type, cmd_version):
+ """
+ Handle the given command type with the received payload. This function
+ sends back data to the session daemon using to the return value of the
+ command.
+ """
+ payload = bytearray()
+
+ try:
+ cmd = self.create_command(cmd_type, cmd_version, data)
+ cmd_reply = cmd.execute()
+ # Set success code in data
+ payload += self.pack_code(LTTngSessiondCmd.CODE_SUCCESS)
+ if cmd_reply.payload is not None:
+ payload += cmd_reply.payload
+ except LTTngCmdError as e:
+ # Set error code in payload
+ payload += self.pack_code(e.get_code())
+ except LTTngUnknownCmdError:
+ # Set error code in payload
+ payload += self.pack_code(LTTngSessiondCmd.CODE_INVALID_CMD)
+
+ # Send response only if asked for.
+ if cmd_reply.reply:
+ self.sessiond_sock.send(payload)
+
+ def wait_cmd(self, epfd):
+ """
+ """
+
+ while True:
+ try:
+ # Poll on socket for command.
+ events = epfd.poll()
+ except select.error as e:
+ raise IOError(e.errno, e.message)
+
+ for fileno, event in events:
+ if fileno == self.quit_pipe[0]:
+ return True
+ elif event & (EPOLLERR | EPOLLHUP):
+ raise IOError(errno.ESHUTDOWN)
+ elif event & EPOLLIN:
+ data = bytearray()
+
+ data_size, cmd, cmd_version = self.recv_header(self.sessiond_sock)
+ if data_size:
+ data += self.sessiond_sock.recv(data_size)
+
+ self.handle_command(data, cmd, cmd_version)
+ else:
+ raise IOError(errno.ESHUTDOWN)
+
+ def get_port_from_file(self, path):
+ """
+ Open the session daemon agent port file and returns the value. If none
+ found, 0 is returned.
+ """
+
+ # By default, the port is set to 0 so if we can not find the agent port
+ # file simply don't try to connect. A value set to 0 indicates that.
+ port = 0
+
+ try:
+ f = open(path, "r")
+ r_port = int(f.readline())
+ if r_port > 0 or r_port <= 65535:
+ port = r_port
+ f.close()
+ except IOError as e:
+ pass
+ except ValueError as e:
+ pass
+
+ return port
+
+ def connect_sessiond(self):
+ """
+ Connect sessiond_sock to running session daemon using the port file.
+ """
+ # Create session daemon TCP socket
+ if not self.sessiond_sock:
+ self.sessiond_sock = socket(AF_INET, SOCK_STREAM)
+
+ if self.log_handler.is_root:
+ port = self.get_port_from_file(LTTngTCPClient.SYSTEM_PORT_FILE)
+ else:
+ port = self.get_port_from_file(LTTngTCPClient.USER_PORT_FILE)
+
+ # No session daemon available
+ if port == 0:
+ raise IOError(errno.ECONNREFUSED)
+
+ # Can raise an IOError so caller must catch it.
+ self.sessiond_sock.connect((self.sessiond_host, port))