1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2014 - David Goulet <dgoulet@efficios.com>
5 # This library is free software; you can redistribute it and/or modify it under
6 # the terms of the GNU Lesser General Public License as published by the Free
7 # Software Foundation; version 2.1 of the License.
9 # This library is distributed in the hope that it will be useful, but WITHOUT
10 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 # You should have received a copy of the GNU Lesser General Public License
15 # along with this library; if not, write to the Free Software Foundation, Inc.,
16 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18 from __future__
import unicode_literals
29 from select
import epoll
, EPOLLIN
, EPOLLERR
, EPOLLHUP
31 from time
import sleep
33 __all__
= ["lttng-agent"]
34 __author__
= "David Goulet <dgoulet@efficios.com>"
38 LTTng agent python code. A LTTng Agent is responsible to spawn two threads,
39 the current UID and root session daemon. Those two threads register to the
40 right daemon and handle the tracing.
42 This class needs to be instantiate once and once the init returns, tracing
46 SESSIOND_ADDR
= "127.0.0.1"
48 # Timeout for the sempahore in seconds.
53 # Session daemon register semaphore.
54 self
.register_sem
= threading
.Semaphore(LTTngAgent
.SEM_COUNT
);
56 self
.client_user
= LTTngTCPClient(LTTngAgent
.SESSIOND_ADDR
, self
.register_sem
)
57 self
.client_user
.start()
59 self
.client_root
= LTTngTCPClient(LTTngAgent
.SESSIOND_ADDR
, self
.register_sem
)
60 self
.client_root
.log_handler
.is_root
= True
61 self
.client_root
.start()
64 timeout
= LTTngAgent
.SEM_TIMEOUT
66 # Quit if timeout has reached 0 or below.
67 if acquire
== LTTngAgent
.SEM_COUNT
or timeout
<= 0:
70 # Acquire semaphore for *user* thread.
71 if not self
.register_sem
.acquire(False):
72 sleep(LTTngAgent
.SEM_WAIT_PERIOD
)
73 timeout
-= LTTngAgent
.SEM_WAIT_PERIOD
81 self
.client_user
.destroy()
82 self
.client_user
.join()
84 self
.client_root
.destroy()
85 self
.client_root
.join()
87 class LTTngCmdError(RuntimeError):
89 Command error thrown if an error is encountered in a command from the
93 def __init__(self
, code
):
94 super().__init
__('LTTng command error: code {}'.format(code
))
100 class LTTngUnknownCmdError(RuntimeError):
103 class LTTngLoggingHandler(logging
.Handler
):
105 Class handler for the Python logging API.
109 logging
.Handler
.__init
__(self
, level
= logging
.NOTSET
)
111 # Refcount tracking how many events have been enabled. This value above
112 # 0 means that the handler is attached to the root logger.
115 # Dict of enabled event. We track them so we know if it's ok to disable
116 # the received event.
117 self
.enabled_events
= {}
122 # Using the logging formatter to extract the asctime only.
123 self
.log_fmt
= logging
.Formatter("%(asctime)s")
124 self
.setFormatter(self
.log_fmt
)
126 # ctypes lib for lttng-ust
128 self
.lttng_ust
= ctypes
.cdll
.LoadLibrary("LIBDIR_STR/liblttng-ust-python-agent.so")
130 print("Unable to find libust for Python.")
132 def emit(self
, record
):
134 Fire LTTng UST tracepoint with the given record.
136 asctime
= self
.format(record
)
138 self
.lttng_ust
.py_tracepoint(asctime
.encode(),
139 record
.getMessage().encode(), record
.name
.encode(),
140 record
.funcName
.encode(), record
.lineno
, record
.levelno
,
141 record
.thread
, record
.threadName
.encode())
143 def enable_event(self
, name
):
145 Enable an event name which will ultimately add an handler to the root
146 logger if none is present.
148 # Don't update the refcount if the event has been enabled prior.
149 if name
in self
.enabled_events
:
152 # Get the root logger and attach our handler.
153 root_logger
= logging
.getLogger()
154 # First thing first, we need to set the root logger to the loglevel
155 # NOTSET so we can catch everything. The default is 30.
156 root_logger
.setLevel(logging
.NOTSET
)
159 if self
.refcount
== 1:
160 root_logger
.addHandler(self
)
162 self
.enabled_events
[name
] = True
164 def disable_event(self
, name
):
166 Disable an event name which will ultimately add an handler to the root
167 logger if none is present.
170 if name
not in self
.enabled_events
:
171 # Event was not enabled prior, do nothing.
174 # Get the root logger and attach our handler.
175 root_logger
= logging
.getLogger()
178 if self
.refcount
== 0:
179 root_logger
.removeHandler(self
)
180 del self
.enabled_events
[name
]
182 def list_logger(self
):
184 Return a list of logger name.
186 return logging
.Logger
.manager
.loggerDict
.keys()
188 class LTTngSessiondCmd():
190 Class handling session daemon command.
193 # Command values from the agent protocol
203 # Python Logger LTTng domain value taken from lttng/domain.h
212 This is part of the command interface. Must be implemented.
214 raise NotImplementedError
216 class LTTngCommandReply():
218 Object that contains the information that should be replied to the session
219 daemon after a command execution.
222 def __init__(self
, payload
= None, reply
= True):
223 self
.payload
= payload
226 class LTTngCommandEnable(LTTngSessiondCmd
):
228 Handle the enable event command from the session daemon.
231 def __init__(self
, log_handler
, data
):
232 self
.log_handler
= log_handler
233 # 4 bytes for loglevel and 4 bytes for loglevel_type thus 8.
236 data_size
= len(data
)
238 raise LTTngCmdError(LTTngSessiondCmd
.CODE_INVALID_CMD
)
241 self
.loglevel
, self
.loglevel_type
, self
.name
= \
242 struct
.unpack('>II%us' % (data_size
- name_offset
), data
)
243 # Remove trailing NULL bytes from name.
244 self
.name
= self
.name
.decode().rstrip('\x00')
246 raise LTTngCmdError(LTTngSessiondCmd
.CODE_INVALID_CMD
)
249 self
.log_handler
.enable_event(self
.name
)
250 return LTTngCommandReply()
252 class LTTngCommandDisable(LTTngSessiondCmd
):
254 Handle the disable event command from the session daemon.
257 def __init__(self
, log_handler
, data
):
258 self
.log_handler
= log_handler
260 data_size
= len(data
)
262 raise LTTngCmdError(LTTngSessiondCmd
.CODE_INVALID_CMD
)
265 self
.name
= struct
.unpack('>%us' % (data_size
), data
)[0]
266 # Remove trailing NULL bytes from name.
267 self
.name
= self
.name
.decode().rstrip('\x00')
269 raise LTTngCmdError(LTTngSessiondCmd
.CODE_INVALID_CMD
)
272 self
.log_handler
.disable_event(self
.name
)
273 return LTTngCommandReply()
275 class LTTngCommandRegDone(LTTngSessiondCmd
):
277 Handle register done command. This is sent back after a successful
278 registration from the session daemon. We basically release the given
279 semaphore so the agent can return to the caller.
282 def __init__(self
, sem
):
287 return LTTngCommandReply(reply
= False)
289 class LTTngCommandList(LTTngSessiondCmd
):
291 Handle the list command from the session daemon on the given socket.
294 def __init__(self
, log_handler
):
295 self
.log_handler
= log_handler
299 data
= logger_data
= bytearray()
301 loggers
= self
.log_handler
.list_logger()
302 # First, pack nb_event that must preceed the data.
303 logger_data
+= struct
.pack('>I', len(loggers
))
305 # Populate payload with logger name.
306 for logger
in loggers
:
307 # Increment data size plus the NULL byte at the end of the name.
308 data_size
+= len(logger
) + 1
309 # Pack logger name and NULL byte.
310 logger_data
+= struct
.pack('>%usB' % (len(logger
)), \
311 bytes(bytearray(str.encode(logger
))), 0)
313 # Pack uint32_t data_size followed by nb event (number of logger)
314 data
= struct
.pack('>I', data_size
)
316 return LTTngCommandReply(payload
= data
)
318 class LTTngTCPClient(threading
.Thread
):
320 TCP client that register and receives command from the session daemon.
323 SYSTEM_PORT_FILE
= "/var/run/lttng/agent.port"
324 USER_PORT_FILE
= os
.path
.join(os
.path
.expanduser("~"), ".lttng/agent.port")
326 # The time in seconds this client should wait before trying again to
327 # register back to the session daemon.
330 def __init__(self
, host
, sem
):
331 threading
.Thread
.__init
__(self
)
333 # Which host to connect to. The port is fetch dynamically.
334 self
.sessiond_host
= host
336 # The session daemon register done semaphore. Needs to be released when
337 # receiving a CMD_REG_DONE command.
338 self
.register_sem
= sem
339 self
.register_sem
.acquire()
341 # Indicate that we have to quit thus stop the main loop.
342 self
.quit_flag
= False
343 # Quit pipe. The thread poll on it to know when to quit.
344 self
.quit_pipe
= os
.pipe()
346 # Socket on which we communicate with the session daemon.
347 self
.sessiond_sock
= None
348 # LTTng Logging Handler
349 self
.log_handler
= LTTngLoggingHandler()
351 def cleanup_socket(self
, epfd
= None):
352 # Ease our life a bit.
353 sock
= self
.sessiond_sock
359 epfd
.unregister(sock
)
360 sock
.shutdown(SHUT_RDWR
)
363 # Cleanup fail, we can't do anything much...
368 self
.sessiond_sock
= None
371 self
.quit_flag
= True
373 fp
= os
.fdopen(self
.quit_pipe
[1], 'w')
381 Register to session daemon using the previously connected socket of the
388 data
= struct
.pack('>IIII', LTTngSessiondCmd
.DOMAIN
, os
.getpid(), \
389 LTTngSessiondCmd
.MAJOR_VERSION
, LTTngSessiondCmd
.MINOR_VERSION
)
390 self
.sessiond_sock
.send(data
)
394 Start the TCP client thread by registering to the session daemon and polling
395 on that socket for commands.
399 epfd
.register(self
.quit_pipe
[0], EPOLLIN
)
401 # Main loop to handle session daemon command and disconnection.
402 while not self
.quit_flag
:
404 # First, connect to the session daemon.
405 self
.connect_sessiond()
407 # Register to session daemon after a successful connection.
409 # Add registered socket to poll set.
410 epfd
.register(self
.sessiond_sock
, EPOLLIN | EPOLLERR | EPOLLHUP
)
412 self
.quit_flag
= self
.wait_cmd(epfd
)
414 # Whatever happens here, we have to close down everything and
415 # retry to connect to the session daemon since either the
416 # socket is closed or invalid data was sent.
417 self
.cleanup_socket(epfd
)
418 self
.register_sem
.release()
419 sleep(LTTngTCPClient
.WAIT_TIME
)
422 self
.cleanup_socket(epfd
)
423 os
.close(self
.quit_pipe
[0])
426 def recv_header(self
, sock
):
428 Receive the command header from the given socket. Set the internal
429 state of this object with the header data.
431 Header ABI is defined like this:
436 s_pack
= struct
.Struct('>QII')
438 pack_data
= sock
.recv(s_pack
.size
)
439 data_received
= len(pack_data
)
440 if data_received
== 0:
441 raise IOError(errno
.ESHUTDOWN
)
444 return s_pack
.unpack(pack_data
)
446 raise IOError(errno
.EINVAL
)
448 def create_command(self
, cmd_type
, version
, data
):
450 Return the right command object using the given command type. The
451 command version is unused since we only have once for now.
455 LTTngSessiondCmd
.CMD_LIST
: \
456 lambda: LTTngCommandList(self
.log_handler
),
457 LTTngSessiondCmd
.CMD_ENABLE
: \
458 lambda: LTTngCommandEnable(self
.log_handler
, data
),
459 LTTngSessiondCmd
.CMD_DISABLE
: \
460 lambda: LTTngCommandDisable(self
.log_handler
, data
),
461 LTTngSessiondCmd
.CMD_REG_DONE
: \
462 lambda: LTTngCommandRegDone(self
.register_sem
),
465 if cmd_type
in cmd_dict
:
466 return cmd_dict
[cmd_type
]()
468 raise LTTngUnknownCmdError()
470 def pack_code(self
, code
):
471 return struct
.pack('>I', code
)
473 def handle_command(self
, data
, cmd_type
, cmd_version
):
475 Handle the given command type with the received payload. This function
476 sends back data to the session daemon using to the return value of the
479 payload
= bytearray()
482 cmd
= self
.create_command(cmd_type
, cmd_version
, data
)
483 cmd_reply
= cmd
.execute()
484 # Set success code in data
485 payload
+= self
.pack_code(LTTngSessiondCmd
.CODE_SUCCESS
)
486 if cmd_reply
.payload
is not None:
487 payload
+= cmd_reply
.payload
488 except LTTngCmdError
as e
:
489 # Set error code in payload
490 payload
+= self
.pack_code(e
.get_code())
491 except LTTngUnknownCmdError
:
492 # Set error code in payload
493 payload
+= self
.pack_code(LTTngSessiondCmd
.CODE_INVALID_CMD
)
495 # Send response only if asked for.
497 self
.sessiond_sock
.send(payload
)
499 def wait_cmd(self
, epfd
):
505 # Poll on socket for command.
507 except select
.error
as e
:
508 raise IOError(e
.errno
, e
.message
)
510 for fileno
, event
in events
:
511 if fileno
== self
.quit_pipe
[0]:
513 elif event
& (EPOLLERR | EPOLLHUP
):
514 raise IOError(errno
.ESHUTDOWN
)
515 elif event
& EPOLLIN
:
518 data_size
, cmd
, cmd_version
= self
.recv_header(self
.sessiond_sock
)
520 data
+= self
.sessiond_sock
.recv(data_size
)
522 self
.handle_command(data
, cmd
, cmd_version
)
524 raise IOError(errno
.ESHUTDOWN
)
526 def get_port_from_file(self
, path
):
528 Open the session daemon agent port file and returns the value. If none
529 found, 0 is returned.
532 # By default, the port is set to 0 so if we can not find the agent port
533 # file simply don't try to connect. A value set to 0 indicates that.
538 r_port
= int(f
.readline())
539 if r_port
> 0 or r_port
<= 65535:
544 except ValueError as e
:
549 def connect_sessiond(self
):
551 Connect sessiond_sock to running session daemon using the port file.
553 # Create session daemon TCP socket
554 if not self
.sessiond_sock
:
555 self
.sessiond_sock
= socket(AF_INET
, SOCK_STREAM
)
557 if self
.log_handler
.is_root
:
558 port
= self
.get_port_from_file(LTTngTCPClient
.SYSTEM_PORT_FILE
)
560 port
= self
.get_port_from_file(LTTngTCPClient
.USER_PORT_FILE
)
562 # No session daemon available
564 raise IOError(errno
.ECONNREFUSED
)
566 # Can raise an IOError so caller must catch it.
567 self
.sessiond_sock
.connect((self
.sessiond_host
, port
))