Commit | Line | Data |
---|---|---|
c3e14096 DG |
1 | # -*- coding: utf-8 -*- |
2 | # | |
3 | # Copyright (C) 2014 - David Goulet <dgoulet@efficios.com> | |
4 | # | |
5 | # This library is free software; you can redistribute it and/or modify it under | |
6 | # the terms of the GNU Lesser General Public License as published by the Free | |
7 | # Software Foundation; version 2.1 of the License. | |
8 | # | |
9 | # This library is distributed in the hope that it will be useful, but WITHOUT | |
10 | # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | |
11 | # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more | |
12 | # details. | |
13 | # | |
14 | # You should have received a copy of the GNU Lesser General Public License | |
15 | # along with this library; if not, write to the Free Software Foundation, Inc., | |
16 | # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |
17 | ||
18 | from __future__ import unicode_literals | |
19 | ||
20 | import ctypes | |
21 | import errno | |
22 | import logging | |
23 | import os | |
24 | import sys | |
25 | import threading | |
26 | import struct | |
27 | import select | |
28 | ||
29 | from select import epoll, EPOLLIN, EPOLLERR, EPOLLHUP | |
30 | from socket import * | |
31 | from time import sleep | |
32 | ||
33 | __all__ = ["lttng-agent"] | |
34 | __author__ = "David Goulet <dgoulet@efficios.com>" | |
35 | ||
36 | class LTTngAgent(): | |
37 | """ | |
38 | LTTng agent python code. A LTTng Agent is responsible to spawn two threads, | |
39 | the current UID and root session daemon. Those two threads register to the | |
40 | right daemon and handle the tracing. | |
41 | ||
42 | This class needs to be instantiate once and once the init returns, tracing | |
43 | is ready to happen. | |
44 | """ | |
45 | ||
46 | SESSIOND_ADDR = "127.0.0.1" | |
47 | SEM_COUNT = 2 | |
48 | # Timeout for the sempahore in seconds. | |
49 | SEM_TIMEOUT = 5 | |
50 | SEM_WAIT_PERIOD = 0.2 | |
51 | ||
52 | def __init__(self): | |
53 | # Session daemon register semaphore. | |
54 | self.register_sem = threading.Semaphore(LTTngAgent.SEM_COUNT); | |
55 | ||
56 | self.client_user = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem) | |
57 | self.client_user.start() | |
58 | ||
59 | self.client_root = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem) | |
60 | self.client_root.log_handler.is_root = True | |
61 | self.client_root.start() | |
62 | ||
63 | acquire = 0 | |
64 | timeout = LTTngAgent.SEM_TIMEOUT | |
65 | while True: | |
66 | # Quit if timeout has reached 0 or below. | |
67 | if acquire == LTTngAgent.SEM_COUNT or timeout <= 0: | |
68 | break; | |
69 | ||
70 | # Acquire semaphore for *user* thread. | |
71 | if not self.register_sem.acquire(False): | |
72 | sleep(LTTngAgent.SEM_WAIT_PERIOD) | |
73 | timeout -= LTTngAgent.SEM_WAIT_PERIOD | |
74 | else: | |
75 | acquire += 1 | |
76 | ||
77 | def __del__(self): | |
78 | self.destroy() | |
79 | ||
80 | def destroy(self): | |
81 | self.client_user.destroy() | |
82 | self.client_user.join() | |
83 | ||
84 | self.client_root.destroy() | |
85 | self.client_root.join() | |
86 | ||
87 | class LTTngCmdError(RuntimeError): | |
88 | """ | |
89 | Command error thrown if an error is encountered in a command from the | |
90 | session daemon. | |
91 | """ | |
92 | ||
93 | def __init__(self, code): | |
94 | super().__init__('LTTng command error: code {}'.format(code)) | |
95 | self._code = code | |
96 | ||
97 | def get_code(self): | |
98 | return self._code | |
99 | ||
100 | class LTTngUnknownCmdError(RuntimeError): | |
101 | pass | |
102 | ||
103 | class LTTngLoggingHandler(logging.Handler): | |
104 | """ | |
105 | Class handler for the Python logging API. | |
106 | """ | |
107 | ||
108 | def __init__(self): | |
109 | logging.Handler.__init__(self, level = logging.NOTSET) | |
110 | ||
111 | # Refcount tracking how many events have been enabled. This value above | |
112 | # 0 means that the handler is attached to the root logger. | |
113 | self.refcount = 0 | |
114 | ||
115 | # Dict of enabled event. We track them so we know if it's ok to disable | |
116 | # the received event. | |
117 | self.enabled_events = {} | |
118 | ||
119 | # Am I root ? | |
120 | self.is_root = False | |
121 | ||
122 | # Using the logging formatter to extract the asctime only. | |
123 | self.log_fmt = logging.Formatter("%(asctime)s") | |
124 | self.setFormatter(self.log_fmt) | |
125 | ||
126 | # ctypes lib for lttng-ust | |
127 | try: | |
128 | self.lttng_ust = ctypes.cdll.LoadLibrary("LIBDIR_STR/liblttng-ust-python-agent.so") | |
129 | except OSError as e: | |
130 | print("Unable to find libust for Python.") | |
131 | ||
132 | def emit(self, record): | |
133 | """ | |
134 | Fire LTTng UST tracepoint with the given record. | |
135 | """ | |
136 | asctime = self.format(record) | |
137 | ||
138 | self.lttng_ust.py_tracepoint(asctime.encode(), | |
139 | record.getMessage().encode(), record.name.encode(), | |
140 | record.funcName.encode(), record.lineno, record.levelno, | |
141 | record.thread, record.threadName.encode()) | |
142 | ||
143 | def enable_event(self, name): | |
144 | """ | |
145 | Enable an event name which will ultimately add an handler to the root | |
146 | logger if none is present. | |
147 | """ | |
148 | # Don't update the refcount if the event has been enabled prior. | |
149 | if name in self.enabled_events: | |
150 | return | |
151 | ||
152 | # Get the root logger and attach our handler. | |
153 | root_logger = logging.getLogger() | |
154 | # First thing first, we need to set the root logger to the loglevel | |
155 | # NOTSET so we can catch everything. The default is 30. | |
156 | root_logger.setLevel(logging.NOTSET) | |
157 | ||
158 | self.refcount += 1 | |
159 | if self.refcount == 1: | |
160 | root_logger.addHandler(self) | |
161 | ||
162 | self.enabled_events[name] = True | |
163 | ||
164 | def disable_event(self, name): | |
165 | """ | |
166 | Disable an event name which will ultimately add an handler to the root | |
167 | logger if none is present. | |
168 | """ | |
169 | ||
170 | if name not in self.enabled_events: | |
171 | # Event was not enabled prior, do nothing. | |
172 | return | |
173 | ||
174 | # Get the root logger and attach our handler. | |
175 | root_logger = logging.getLogger() | |
176 | ||
177 | self.refcount -= 1 | |
178 | if self.refcount == 0: | |
179 | root_logger.removeHandler(self) | |
180 | del self.enabled_events[name] | |
181 | ||
182 | def list_logger(self): | |
183 | """ | |
184 | Return a list of logger name. | |
185 | """ | |
186 | return logging.Logger.manager.loggerDict.keys() | |
187 | ||
188 | class LTTngSessiondCmd(): | |
189 | """ | |
190 | Class handling session daemon command. | |
191 | """ | |
192 | ||
193 | # Command values from the agent protocol | |
194 | CMD_LIST = 1 | |
195 | CMD_ENABLE = 2 | |
196 | CMD_DISABLE = 3 | |
197 | CMD_REG_DONE = 4 | |
198 | ||
199 | # Return code | |
200 | CODE_SUCCESS = 1 | |
201 | CODE_INVALID_CMD = 2 | |
202 | ||
203 | # Python Logger LTTng domain value taken from lttng/domain.h | |
204 | DOMAIN = 5 | |
205 | ||
206 | # Protocol version | |
207 | MAJOR_VERSION = 1 | |
208 | MINOR_VERSION = 0 | |
209 | ||
210 | def execute(self): | |
211 | """ | |
212 | This is part of the command interface. Must be implemented. | |
213 | """ | |
214 | raise NotImplementedError | |
215 | ||
216 | class LTTngCommandReply(): | |
217 | """ | |
218 | Object that contains the information that should be replied to the session | |
219 | daemon after a command execution. | |
220 | """ | |
221 | ||
222 | def __init__(self, payload = None, reply = True): | |
223 | self.payload = payload | |
224 | self.reply = reply | |
225 | ||
226 | class LTTngCommandEnable(LTTngSessiondCmd): | |
227 | """ | |
228 | Handle the enable event command from the session daemon. | |
229 | """ | |
230 | ||
231 | def __init__(self, log_handler, data): | |
232 | self.log_handler = log_handler | |
233 | # 4 bytes for loglevel and 4 bytes for loglevel_type thus 8. | |
234 | name_offset = 8; | |
235 | ||
236 | data_size = len(data) | |
237 | if data_size == 0: | |
238 | raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) | |
239 | ||
240 | try: | |
241 | self.loglevel, self.loglevel_type, self.name = \ | |
242 | struct.unpack('>II%us' % (data_size - name_offset), data) | |
243 | # Remove trailing NULL bytes from name. | |
244 | self.name = self.name.decode().rstrip('\x00') | |
245 | except struct.error: | |
246 | raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) | |
247 | ||
248 | def execute(self): | |
249 | self.log_handler.enable_event(self.name) | |
250 | return LTTngCommandReply() | |
251 | ||
252 | class LTTngCommandDisable(LTTngSessiondCmd): | |
253 | """ | |
254 | Handle the disable event command from the session daemon. | |
255 | """ | |
256 | ||
257 | def __init__(self, log_handler, data): | |
258 | self.log_handler = log_handler | |
259 | ||
260 | data_size = len(data) | |
261 | if data_size == 0: | |
262 | raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) | |
263 | ||
264 | try: | |
265 | self.name = struct.unpack('>%us' % (data_size), data)[0] | |
266 | # Remove trailing NULL bytes from name. | |
267 | self.name = self.name.decode().rstrip('\x00') | |
268 | except struct.error: | |
269 | raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) | |
270 | ||
271 | def execute(self): | |
272 | self.log_handler.disable_event(self.name) | |
273 | return LTTngCommandReply() | |
274 | ||
275 | class LTTngCommandRegDone(LTTngSessiondCmd): | |
276 | """ | |
277 | Handle register done command. This is sent back after a successful | |
278 | registration from the session daemon. We basically release the given | |
279 | semaphore so the agent can return to the caller. | |
280 | """ | |
281 | ||
282 | def __init__(self, sem): | |
283 | self.sem = sem | |
284 | ||
285 | def execute(self): | |
286 | self.sem.release() | |
287 | return LTTngCommandReply(reply = False) | |
288 | ||
289 | class LTTngCommandList(LTTngSessiondCmd): | |
290 | """ | |
291 | Handle the list command from the session daemon on the given socket. | |
292 | """ | |
293 | ||
294 | def __init__(self, log_handler): | |
295 | self.log_handler = log_handler | |
296 | ||
297 | def execute(self): | |
298 | data_size = 0 | |
299 | data = logger_data = bytearray() | |
300 | ||
301 | loggers = self.log_handler.list_logger() | |
302 | # First, pack nb_event that must preceed the data. | |
303 | logger_data += struct.pack('>I', len(loggers)) | |
304 | ||
305 | # Populate payload with logger name. | |
306 | for logger in loggers: | |
307 | # Increment data size plus the NULL byte at the end of the name. | |
308 | data_size += len(logger) + 1 | |
309 | # Pack logger name and NULL byte. | |
310 | logger_data += struct.pack('>%usB' % (len(logger)), \ | |
311 | bytes(bytearray(str.encode(logger))), 0) | |
312 | ||
313 | # Pack uint32_t data_size followed by nb event (number of logger) | |
314 | data = struct.pack('>I', data_size) | |
315 | data += logger_data | |
316 | return LTTngCommandReply(payload = data) | |
317 | ||
318 | class LTTngTCPClient(threading.Thread): | |
319 | """ | |
320 | TCP client that register and receives command from the session daemon. | |
321 | """ | |
322 | ||
323 | SYSTEM_PORT_FILE = "/var/run/lttng/agent.port" | |
324 | USER_PORT_FILE = os.path.join(os.path.expanduser("~"), ".lttng/agent.port") | |
325 | ||
326 | # The time in seconds this client should wait before trying again to | |
327 | # register back to the session daemon. | |
328 | WAIT_TIME = 3 | |
329 | ||
330 | def __init__(self, host, sem): | |
331 | threading.Thread.__init__(self) | |
332 | ||
333 | # Which host to connect to. The port is fetch dynamically. | |
334 | self.sessiond_host = host | |
335 | ||
336 | # The session daemon register done semaphore. Needs to be released when | |
337 | # receiving a CMD_REG_DONE command. | |
338 | self.register_sem = sem | |
339 | self.register_sem.acquire() | |
340 | ||
341 | # Indicate that we have to quit thus stop the main loop. | |
342 | self.quit_flag = False | |
343 | # Quit pipe. The thread poll on it to know when to quit. | |
344 | self.quit_pipe = os.pipe() | |
345 | ||
346 | # Socket on which we communicate with the session daemon. | |
347 | self.sessiond_sock = None | |
348 | # LTTng Logging Handler | |
349 | self.log_handler = LTTngLoggingHandler() | |
350 | ||
351 | def cleanup_socket(self, epfd = None): | |
352 | # Ease our life a bit. | |
353 | sock = self.sessiond_sock | |
354 | if not sock: | |
355 | return | |
356 | ||
357 | try: | |
358 | if epfd is not None: | |
359 | epfd.unregister(sock) | |
360 | sock.shutdown(SHUT_RDWR) | |
361 | sock.close() | |
362 | except select.error: | |
363 | # Cleanup fail, we can't do anything much... | |
364 | pass | |
365 | except IOError: | |
366 | pass | |
367 | ||
368 | self.sessiond_sock = None | |
369 | ||
370 | def destroy(self): | |
371 | self.quit_flag = True | |
372 | try: | |
373 | fp = os.fdopen(self.quit_pipe[1], 'w') | |
374 | fp.write("42") | |
375 | fp.close() | |
376 | except OSError as e: | |
377 | pass | |
378 | ||
379 | def register(self): | |
380 | """ | |
381 | Register to session daemon using the previously connected socket of the | |
382 | class. | |
383 | ||
384 | Command ABI: | |
385 | uint32 domain | |
386 | uint32 pid | |
387 | """ | |
388 | data = struct.pack('>IIII', LTTngSessiondCmd.DOMAIN, os.getpid(), \ | |
389 | LTTngSessiondCmd.MAJOR_VERSION, LTTngSessiondCmd.MINOR_VERSION) | |
390 | self.sessiond_sock.send(data) | |
391 | ||
392 | def run(self): | |
393 | """ | |
394 | Start the TCP client thread by registering to the session daemon and polling | |
395 | on that socket for commands. | |
396 | """ | |
397 | ||
398 | epfd = epoll() | |
399 | epfd.register(self.quit_pipe[0], EPOLLIN) | |
400 | ||
401 | # Main loop to handle session daemon command and disconnection. | |
402 | while not self.quit_flag: | |
403 | try: | |
404 | # First, connect to the session daemon. | |
405 | self.connect_sessiond() | |
406 | ||
407 | # Register to session daemon after a successful connection. | |
408 | self.register() | |
409 | # Add registered socket to poll set. | |
410 | epfd.register(self.sessiond_sock, EPOLLIN | EPOLLERR | EPOLLHUP) | |
411 | ||
412 | self.quit_flag = self.wait_cmd(epfd) | |
413 | except IOError as e: | |
414 | # Whatever happens here, we have to close down everything and | |
415 | # retry to connect to the session daemon since either the | |
416 | # socket is closed or invalid data was sent. | |
417 | self.cleanup_socket(epfd) | |
418 | self.register_sem.release() | |
419 | sleep(LTTngTCPClient.WAIT_TIME) | |
420 | continue | |
421 | ||
422 | self.cleanup_socket(epfd) | |
423 | os.close(self.quit_pipe[0]) | |
424 | epfd.close() | |
425 | ||
426 | def recv_header(self, sock): | |
427 | """ | |
428 | Receive the command header from the given socket. Set the internal | |
429 | state of this object with the header data. | |
430 | ||
431 | Header ABI is defined like this: | |
432 | uint64 data_size | |
433 | uint32 cmd | |
434 | uint32 cmd_version | |
435 | """ | |
436 | s_pack = struct.Struct('>QII') | |
437 | ||
438 | pack_data = sock.recv(s_pack.size) | |
439 | data_received = len(pack_data) | |
440 | if data_received == 0: | |
441 | raise IOError(errno.ESHUTDOWN) | |
442 | ||
443 | try: | |
444 | return s_pack.unpack(pack_data) | |
445 | except struct.error: | |
446 | raise IOError(errno.EINVAL) | |
447 | ||
448 | def create_command(self, cmd_type, version, data): | |
449 | """ | |
450 | Return the right command object using the given command type. The | |
451 | command version is unused since we only have once for now. | |
452 | """ | |
453 | ||
454 | cmd_dict = { | |
455 | LTTngSessiondCmd.CMD_LIST: \ | |
456 | lambda: LTTngCommandList(self.log_handler), | |
457 | LTTngSessiondCmd.CMD_ENABLE: \ | |
458 | lambda: LTTngCommandEnable(self.log_handler, data), | |
459 | LTTngSessiondCmd.CMD_DISABLE: \ | |
460 | lambda: LTTngCommandDisable(self.log_handler, data), | |
461 | LTTngSessiondCmd.CMD_REG_DONE: \ | |
462 | lambda: LTTngCommandRegDone(self.register_sem), | |
463 | } | |
464 | ||
465 | if cmd_type in cmd_dict: | |
466 | return cmd_dict[cmd_type]() | |
467 | else: | |
468 | raise LTTngUnknownCmdError() | |
469 | ||
470 | def pack_code(self, code): | |
471 | return struct.pack('>I', code) | |
472 | ||
473 | def handle_command(self, data, cmd_type, cmd_version): | |
474 | """ | |
475 | Handle the given command type with the received payload. This function | |
476 | sends back data to the session daemon using to the return value of the | |
477 | command. | |
478 | """ | |
479 | payload = bytearray() | |
480 | ||
481 | try: | |
482 | cmd = self.create_command(cmd_type, cmd_version, data) | |
483 | cmd_reply = cmd.execute() | |
484 | # Set success code in data | |
485 | payload += self.pack_code(LTTngSessiondCmd.CODE_SUCCESS) | |
486 | if cmd_reply.payload is not None: | |
487 | payload += cmd_reply.payload | |
488 | except LTTngCmdError as e: | |
489 | # Set error code in payload | |
490 | payload += self.pack_code(e.get_code()) | |
491 | except LTTngUnknownCmdError: | |
492 | # Set error code in payload | |
493 | payload += self.pack_code(LTTngSessiondCmd.CODE_INVALID_CMD) | |
494 | ||
495 | # Send response only if asked for. | |
496 | if cmd_reply.reply: | |
497 | self.sessiond_sock.send(payload) | |
498 | ||
499 | def wait_cmd(self, epfd): | |
500 | """ | |
501 | """ | |
502 | ||
503 | while True: | |
504 | try: | |
505 | # Poll on socket for command. | |
506 | events = epfd.poll() | |
507 | except select.error as e: | |
508 | raise IOError(e.errno, e.message) | |
509 | ||
510 | for fileno, event in events: | |
511 | if fileno == self.quit_pipe[0]: | |
512 | return True | |
513 | elif event & (EPOLLERR | EPOLLHUP): | |
514 | raise IOError(errno.ESHUTDOWN) | |
515 | elif event & EPOLLIN: | |
516 | data = bytearray() | |
517 | ||
518 | data_size, cmd, cmd_version = self.recv_header(self.sessiond_sock) | |
519 | if data_size: | |
520 | data += self.sessiond_sock.recv(data_size) | |
521 | ||
522 | self.handle_command(data, cmd, cmd_version) | |
523 | else: | |
524 | raise IOError(errno.ESHUTDOWN) | |
525 | ||
526 | def get_port_from_file(self, path): | |
527 | """ | |
528 | Open the session daemon agent port file and returns the value. If none | |
529 | found, 0 is returned. | |
530 | """ | |
531 | ||
532 | # By default, the port is set to 0 so if we can not find the agent port | |
533 | # file simply don't try to connect. A value set to 0 indicates that. | |
534 | port = 0 | |
535 | ||
536 | try: | |
537 | f = open(path, "r") | |
538 | r_port = int(f.readline()) | |
539 | if r_port > 0 or r_port <= 65535: | |
540 | port = r_port | |
541 | f.close() | |
542 | except IOError as e: | |
543 | pass | |
544 | except ValueError as e: | |
545 | pass | |
546 | ||
547 | return port | |
548 | ||
549 | def connect_sessiond(self): | |
550 | """ | |
551 | Connect sessiond_sock to running session daemon using the port file. | |
552 | """ | |
553 | # Create session daemon TCP socket | |
554 | if not self.sessiond_sock: | |
555 | self.sessiond_sock = socket(AF_INET, SOCK_STREAM) | |
556 | ||
557 | if self.log_handler.is_root: | |
558 | port = self.get_port_from_file(LTTngTCPClient.SYSTEM_PORT_FILE) | |
559 | else: | |
560 | port = self.get_port_from_file(LTTngTCPClient.USER_PORT_FILE) | |
561 | ||
562 | # No session daemon available | |
563 | if port == 0: | |
564 | raise IOError(errno.ECONNREFUSED) | |
565 | ||
566 | # Can raise an IOError so caller must catch it. | |
567 | self.sessiond_sock.connect((self.sessiond_host, port)) |