Commit | Line | Data |
---|---|---|
de4dee04 PP |
1 | # -*- coding: utf-8 -*- |
2 | # | |
c0c0989a | 3 | # SPDX-License-Identifier: LGPL-2.1-only |
de4dee04 | 4 | # |
c0c0989a MJ |
5 | # Copyright (C) 2015 Philippe Proulx <pproulx@efficios.com> |
6 | # Copyright (C) 2014 David Goulet <dgoulet@efficios.com> | |
de4dee04 PP |
7 | |
8 | from __future__ import unicode_literals | |
9 | from __future__ import print_function | |
10 | from __future__ import division | |
11 | import lttngust.debug as dbg | |
12 | import lttngust.loghandler | |
e7bf4968 | 13 | import lttngust.compat |
de4dee04 PP |
14 | import lttngust.cmd |
15 | from io import open | |
16 | import threading | |
17 | import logging | |
18 | import socket | |
19 | import time | |
20 | import sys | |
21 | import os | |
22 | ||
23 | ||
24 | try: | |
25 | # Python 2 | |
26 | import Queue as queue | |
27 | except ImportError: | |
28 | # Python 3 | |
29 | import queue | |
30 | ||
31 | ||
32 | _PROTO_DOMAIN = 5 | |
b52ff352 | 33 | _PROTO_MAJOR = 2 |
de4dee04 PP |
34 | _PROTO_MINOR = 0 |
35 | ||
36 | ||
37 | def _get_env_value_ms(key, default_s): | |
38 | try: | |
39 | val = int(os.getenv(key, default_s * 1000)) / 1000 | |
40 | except: | |
41 | val = -1 | |
42 | ||
43 | if val < 0: | |
44 | fmt = 'invalid ${} value; {} seconds will be used' | |
45 | dbg._pwarning(fmt.format(key, default_s)) | |
46 | val = default_s | |
47 | ||
48 | return val | |
49 | ||
50 | ||
51 | _REG_TIMEOUT = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5) | |
52 | _RETRY_REG_DELAY = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3) | |
53 | ||
54 | ||
55 | class _TcpClient(object): | |
56 | def __init__(self, name, host, port, reg_queue): | |
57 | super(self.__class__, self).__init__() | |
58 | self._name = name | |
59 | self._host = host | |
60 | self._port = port | |
61 | ||
62 | try: | |
63 | self._log_handler = lttngust.loghandler._Handler() | |
64 | except (OSError) as e: | |
65 | dbg._pwarning('cannot load library: {}'.format(e)) | |
66 | raise e | |
67 | ||
68 | self._root_logger = logging.getLogger() | |
69 | self._root_logger.setLevel(logging.NOTSET) | |
70 | self._ref_count = 0 | |
71 | self._sessiond_sock = None | |
72 | self._reg_queue = reg_queue | |
73 | self._server_cmd_handlers = { | |
74 | lttngust.cmd._ServerCmdRegistrationDone: self._handle_server_cmd_reg_done, | |
75 | lttngust.cmd._ServerCmdEnable: self._handle_server_cmd_enable, | |
76 | lttngust.cmd._ServerCmdDisable: self._handle_server_cmd_disable, | |
77 | lttngust.cmd._ServerCmdList: self._handle_server_cmd_list, | |
78 | } | |
79 | ||
80 | def _debug(self, msg): | |
81 | return 'client "{}": {}'.format(self._name, msg) | |
82 | ||
83 | def run(self): | |
84 | while True: | |
85 | try: | |
86 | # connect to the session daemon | |
87 | dbg._pdebug(self._debug('connecting to session daemon')) | |
88 | self._connect_to_sessiond() | |
89 | ||
90 | # register to the session daemon after a successful connection | |
91 | dbg._pdebug(self._debug('registering to session daemon')) | |
92 | self._register() | |
93 | ||
94 | # wait for commands from the session daemon | |
95 | self._wait_server_cmd() | |
96 | except (Exception) as e: | |
97 | # Whatever happens here, we have to close the socket and | |
98 | # retry to connect to the session daemon since either | |
99 | # the socket was closed, a network timeout occured, or | |
100 | # invalid data was received. | |
101 | dbg._pdebug(self._debug('got exception: {}'.format(e))) | |
102 | self._cleanup_socket() | |
103 | dbg._pdebug(self._debug('sleeping for {} s'.format(_RETRY_REG_DELAY))) | |
104 | time.sleep(_RETRY_REG_DELAY) | |
105 | ||
106 | def _recv_server_cmd_header(self): | |
107 | data = self._sessiond_sock.recv(lttngust.cmd._SERVER_CMD_HEADER_SIZE) | |
108 | ||
109 | if not data: | |
110 | dbg._pdebug(self._debug('received empty server command header')) | |
111 | return None | |
112 | ||
113 | assert(len(data) == lttngust.cmd._SERVER_CMD_HEADER_SIZE) | |
114 | dbg._pdebug(self._debug('received server command header ({} bytes)'.format(len(data)))) | |
115 | ||
116 | return lttngust.cmd._server_cmd_header_from_data(data) | |
117 | ||
118 | def _recv_server_cmd(self): | |
119 | server_cmd_header = self._recv_server_cmd_header() | |
120 | ||
121 | if server_cmd_header is None: | |
122 | return None | |
123 | ||
124 | dbg._pdebug(self._debug('server command header: data size: {} bytes'.format(server_cmd_header.data_size))) | |
125 | dbg._pdebug(self._debug('server command header: command ID: {}'.format(server_cmd_header.cmd_id))) | |
126 | dbg._pdebug(self._debug('server command header: command version: {}'.format(server_cmd_header.cmd_version))) | |
127 | data = bytes() | |
128 | ||
129 | if server_cmd_header.data_size > 0: | |
130 | data = self._sessiond_sock.recv(server_cmd_header.data_size) | |
131 | assert(len(data) == server_cmd_header.data_size) | |
132 | ||
133 | return lttngust.cmd._server_cmd_from_data(server_cmd_header, data) | |
134 | ||
135 | def _send_cmd_reply(self, cmd_reply): | |
136 | data = cmd_reply.get_data() | |
137 | dbg._pdebug(self._debug('sending command reply ({} bytes)'.format(len(data)))) | |
138 | self._sessiond_sock.sendall(data) | |
139 | ||
140 | def _handle_server_cmd_reg_done(self, server_cmd): | |
141 | dbg._pdebug(self._debug('got "registration done" server command')) | |
142 | ||
143 | if self._reg_queue is not None: | |
144 | dbg._pdebug(self._debug('notifying _init_threads()')) | |
145 | ||
146 | try: | |
147 | self._reg_queue.put(True) | |
148 | except (Exception) as e: | |
149 | # read side could be closed by now; ignore it | |
150 | pass | |
151 | ||
152 | self._reg_queue = None | |
153 | ||
154 | def _handle_server_cmd_enable(self, server_cmd): | |
155 | dbg._pdebug(self._debug('got "enable" server command')) | |
156 | self._ref_count += 1 | |
157 | ||
158 | if self._ref_count == 1: | |
159 | dbg._pdebug(self._debug('adding our handler to the root logger')) | |
160 | self._root_logger.addHandler(self._log_handler) | |
161 | ||
162 | dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count))) | |
163 | ||
164 | return lttngust.cmd._ClientCmdReplyEnable() | |
165 | ||
166 | def _handle_server_cmd_disable(self, server_cmd): | |
167 | dbg._pdebug(self._debug('got "disable" server command')) | |
168 | self._ref_count -= 1 | |
169 | ||
170 | if self._ref_count < 0: | |
171 | # disable command could be sent again when a session is destroyed | |
172 | self._ref_count = 0 | |
173 | ||
174 | if self._ref_count == 0: | |
175 | dbg._pdebug(self._debug('removing our handler from the root logger')) | |
176 | self._root_logger.removeHandler(self._log_handler) | |
177 | ||
178 | dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count))) | |
179 | ||
180 | return lttngust.cmd._ClientCmdReplyDisable() | |
181 | ||
182 | def _handle_server_cmd_list(self, server_cmd): | |
183 | dbg._pdebug(self._debug('got "list" server command')) | |
184 | names = logging.Logger.manager.loggerDict.keys() | |
185 | dbg._pdebug(self._debug('found {} loggers'.format(len(names)))) | |
186 | cmd_reply = lttngust.cmd._ClientCmdReplyList(names=names) | |
187 | ||
188 | return cmd_reply | |
189 | ||
190 | def _handle_server_cmd(self, server_cmd): | |
191 | cmd_reply = None | |
192 | ||
193 | if server_cmd is None: | |
194 | dbg._pdebug(self._debug('bad server command')) | |
195 | status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD | |
196 | cmd_reply = lttngust.cmd._ClientCmdReply(status) | |
197 | elif type(server_cmd) in self._server_cmd_handlers: | |
198 | cmd_reply = self._server_cmd_handlers[type(server_cmd)](server_cmd) | |
199 | else: | |
200 | dbg._pdebug(self._debug('unknown server command')) | |
201 | status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD | |
202 | cmd_reply = lttngust.cmd._ClientCmdReply(status) | |
203 | ||
204 | if cmd_reply is not None: | |
205 | self._send_cmd_reply(cmd_reply) | |
206 | ||
207 | def _wait_server_cmd(self): | |
208 | while True: | |
209 | try: | |
210 | server_cmd = self._recv_server_cmd() | |
211 | except socket.timeout: | |
212 | # simply retry here; the protocol has no KA and we could | |
213 | # wait for hours | |
214 | continue | |
215 | ||
216 | self._handle_server_cmd(server_cmd) | |
217 | ||
218 | def _cleanup_socket(self): | |
219 | try: | |
220 | self._sessiond_sock.shutdown(socket.SHUT_RDWR) | |
221 | self._sessiond_sock.close() | |
222 | except: | |
223 | pass | |
224 | ||
225 | self._sessiond_sock = None | |
226 | ||
227 | def _connect_to_sessiond(self): | |
228 | # create session daemon TCP socket | |
229 | if self._sessiond_sock is None: | |
230 | self._sessiond_sock = socket.socket(socket.AF_INET, | |
231 | socket.SOCK_STREAM) | |
232 | ||
233 | # Use str(self._host) here. Since this host could be a string | |
234 | # literal, and since we're importing __future__.unicode_literals, | |
235 | # we want to make sure the host is a native string in Python 2. | |
236 | # This avoids an indirect module import (unicode module to | |
237 | # decode the unicode string, eventually imported by the | |
238 | # socket module if needed), which is not allowed in a thread | |
239 | # directly created by a module in Python 2 (our case). | |
240 | # | |
241 | # tl;dr: Do NOT remove str() here, or this call in Python 2 | |
242 | # _will_ block on an interpreter's mutex until the waiting | |
243 | # register queue timeouts. | |
244 | self._sessiond_sock.connect((str(self._host), self._port)) | |
245 | ||
246 | def _register(self): | |
247 | cmd = lttngust.cmd._ClientRegisterCmd(_PROTO_DOMAIN, os.getpid(), | |
248 | _PROTO_MAJOR, _PROTO_MINOR) | |
249 | data = cmd.get_data() | |
250 | self._sessiond_sock.sendall(data) | |
251 | ||
252 | ||
253 | def _get_port_from_file(path): | |
254 | port = None | |
255 | dbg._pdebug('reading port from file "{}"'.format(path)) | |
256 | ||
257 | try: | |
258 | f = open(path) | |
259 | r_port = int(f.readline()) | |
260 | f.close() | |
261 | ||
262 | if r_port > 0 or r_port <= 65535: | |
263 | port = r_port | |
264 | except: | |
265 | pass | |
266 | ||
267 | return port | |
268 | ||
269 | ||
270 | def _get_user_home_path(): | |
271 | # $LTTNG_HOME overrides $HOME if it exists | |
272 | return os.getenv('LTTNG_HOME', os.path.expanduser('~')) | |
273 | ||
274 | ||
275 | _initialized = False | |
276 | _SESSIOND_HOST = '127.0.0.1' | |
277 | ||
278 | ||
279 | def _client_thread_target(name, port, reg_queue): | |
280 | dbg._pdebug('creating client "{}" using TCP port {}'.format(name, port)) | |
281 | client = _TcpClient(name, _SESSIOND_HOST, port, reg_queue) | |
282 | dbg._pdebug('starting client "{}"'.format(name)) | |
283 | client.run() | |
284 | ||
285 | ||
286 | def _init_threads(): | |
287 | global _initialized | |
288 | ||
289 | dbg._pdebug('entering') | |
290 | ||
291 | if _initialized: | |
292 | dbg._pdebug('agent is already initialized') | |
293 | return | |
294 | ||
295 | # This makes sure that the appropriate modules for encoding and | |
296 | # decoding strings/bytes are imported now, since no import should | |
297 | # happen within a thread at import time (our case). | |
298 | 'lttng'.encode().decode() | |
299 | ||
300 | _initialized = True | |
301 | sys_port = _get_port_from_file('/var/run/lttng/agent.port') | |
302 | user_port_file = os.path.join(_get_user_home_path(), '.lttng', 'agent.port') | |
303 | user_port = _get_port_from_file(user_port_file) | |
304 | reg_queue = queue.Queue() | |
305 | reg_expecting = 0 | |
306 | ||
307 | dbg._pdebug('system session daemon port: {}'.format(sys_port)) | |
308 | dbg._pdebug('user session daemon port: {}'.format(user_port)) | |
309 | ||
382cbd15 PP |
310 | if sys_port == user_port and sys_port is not None: |
311 | # The two session daemon ports are the same. This is not normal. | |
312 | # Connect to only one. | |
313 | dbg._pdebug('both user and system session daemon have the same port') | |
314 | sys_port = None | |
315 | ||
de4dee04 PP |
316 | try: |
317 | if sys_port is not None: | |
318 | dbg._pdebug('creating system client thread') | |
319 | t = threading.Thread(target=_client_thread_target, | |
320 | args=('system', sys_port, reg_queue)) | |
321 | t.name = 'system' | |
322 | t.daemon = True | |
323 | t.start() | |
324 | dbg._pdebug('created and started system client thread') | |
325 | reg_expecting += 1 | |
326 | ||
327 | if user_port is not None: | |
328 | dbg._pdebug('creating user client thread') | |
329 | t = threading.Thread(target=_client_thread_target, | |
330 | args=('user', user_port, reg_queue)) | |
331 | t.name = 'user' | |
332 | t.daemon = True | |
333 | t.start() | |
334 | dbg._pdebug('created and started user client thread') | |
335 | reg_expecting += 1 | |
336 | except: | |
337 | # cannot create threads for some reason; stop this initialization | |
338 | dbg._pwarning('cannot create client threads') | |
339 | return | |
340 | ||
341 | if reg_expecting == 0: | |
342 | # early exit: looks like there's not even one valid port | |
343 | dbg._pwarning('no valid LTTng session daemon port found (is the session daemon started?)') | |
344 | return | |
345 | ||
346 | cur_timeout = _REG_TIMEOUT | |
347 | ||
348 | # We block here to make sure the agent is properly registered to | |
349 | # the session daemon. If we timeout, the client threads will still | |
350 | # continue to try to connect and register to the session daemon, | |
351 | # but there is no guarantee that all following logging statements | |
352 | # will make it to LTTng-UST. | |
353 | # | |
354 | # When a client thread receives a "registration done" confirmation | |
355 | # from the session daemon it's connected to, it puts True in | |
356 | # reg_queue. | |
357 | while True: | |
358 | try: | |
359 | dbg._pdebug('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting, | |
360 | cur_timeout)) | |
e7bf4968 | 361 | t1 = lttngust.compat._clock() |
de4dee04 | 362 | reg_queue.get(timeout=cur_timeout) |
e7bf4968 | 363 | t2 = lttngust.compat._clock() |
de4dee04 PP |
364 | reg_expecting -= 1 |
365 | dbg._pdebug('unblocked') | |
366 | ||
367 | if reg_expecting == 0: | |
368 | # done! | |
369 | dbg._pdebug('successfully registered to session daemon(s)') | |
370 | break | |
371 | ||
372 | cur_timeout -= (t2 - t1) | |
373 | ||
374 | if cur_timeout <= 0: | |
375 | # timeout | |
376 | dbg._pdebug('ran out of time') | |
377 | break | |
378 | except queue.Empty: | |
379 | dbg._pdebug('ran out of time') | |
380 | break | |
381 | ||
382 | dbg._pdebug('leaving') | |
383 | ||
384 | ||
385 | _init_threads() |