Commit | Line | Data |
---|---|---|
de4dee04 PP |
1 | # -*- coding: utf-8 -*- |
2 | # | |
3 | # Copyright (C) 2015 - Philippe Proulx <pproulx@efficios.com> | |
4 | # Copyright (C) 2014 - David Goulet <dgoulet@efficios.com> | |
5 | # | |
6 | # This library is free software; you can redistribute it and/or modify it under | |
7 | # the terms of the GNU Lesser General Public License as published by the Free | |
8 | # Software Foundation; version 2.1 of the License. | |
9 | # | |
10 | # This library is distributed in the hope that it will be useful, but WITHOUT | |
11 | # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | |
12 | # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more | |
13 | # details. | |
14 | # | |
15 | # You should have received a copy of the GNU Lesser General Public License | |
16 | # along with this library; if not, write to the Free Software Foundation, Inc., | |
17 | # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |
18 | ||
19 | from __future__ import unicode_literals | |
20 | from __future__ import print_function | |
21 | from __future__ import division | |
22 | import lttngust.debug as dbg | |
23 | import lttngust.loghandler | |
e7bf4968 | 24 | import lttngust.compat |
de4dee04 PP |
25 | import lttngust.cmd |
26 | from io import open | |
27 | import threading | |
28 | import logging | |
29 | import socket | |
30 | import time | |
31 | import sys | |
32 | import os | |
33 | ||
34 | ||
35 | try: | |
36 | # Python 2 | |
37 | import Queue as queue | |
38 | except ImportError: | |
39 | # Python 3 | |
40 | import queue | |
41 | ||
42 | ||
43 | _PROTO_DOMAIN = 5 | |
b52ff352 | 44 | _PROTO_MAJOR = 2 |
de4dee04 PP |
45 | _PROTO_MINOR = 0 |
46 | ||
47 | ||
48 | def _get_env_value_ms(key, default_s): | |
49 | try: | |
50 | val = int(os.getenv(key, default_s * 1000)) / 1000 | |
51 | except: | |
52 | val = -1 | |
53 | ||
54 | if val < 0: | |
55 | fmt = 'invalid ${} value; {} seconds will be used' | |
56 | dbg._pwarning(fmt.format(key, default_s)) | |
57 | val = default_s | |
58 | ||
59 | return val | |
60 | ||
61 | ||
62 | _REG_TIMEOUT = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5) | |
63 | _RETRY_REG_DELAY = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3) | |
64 | ||
65 | ||
66 | class _TcpClient(object): | |
67 | def __init__(self, name, host, port, reg_queue): | |
68 | super(self.__class__, self).__init__() | |
69 | self._name = name | |
70 | self._host = host | |
71 | self._port = port | |
72 | ||
73 | try: | |
74 | self._log_handler = lttngust.loghandler._Handler() | |
75 | except (OSError) as e: | |
76 | dbg._pwarning('cannot load library: {}'.format(e)) | |
77 | raise e | |
78 | ||
79 | self._root_logger = logging.getLogger() | |
80 | self._root_logger.setLevel(logging.NOTSET) | |
81 | self._ref_count = 0 | |
82 | self._sessiond_sock = None | |
83 | self._reg_queue = reg_queue | |
84 | self._server_cmd_handlers = { | |
85 | lttngust.cmd._ServerCmdRegistrationDone: self._handle_server_cmd_reg_done, | |
86 | lttngust.cmd._ServerCmdEnable: self._handle_server_cmd_enable, | |
87 | lttngust.cmd._ServerCmdDisable: self._handle_server_cmd_disable, | |
88 | lttngust.cmd._ServerCmdList: self._handle_server_cmd_list, | |
89 | } | |
90 | ||
91 | def _debug(self, msg): | |
92 | return 'client "{}": {}'.format(self._name, msg) | |
93 | ||
94 | def run(self): | |
95 | while True: | |
96 | try: | |
97 | # connect to the session daemon | |
98 | dbg._pdebug(self._debug('connecting to session daemon')) | |
99 | self._connect_to_sessiond() | |
100 | ||
101 | # register to the session daemon after a successful connection | |
102 | dbg._pdebug(self._debug('registering to session daemon')) | |
103 | self._register() | |
104 | ||
105 | # wait for commands from the session daemon | |
106 | self._wait_server_cmd() | |
107 | except (Exception) as e: | |
108 | # Whatever happens here, we have to close the socket and | |
109 | # retry to connect to the session daemon since either | |
110 | # the socket was closed, a network timeout occured, or | |
111 | # invalid data was received. | |
112 | dbg._pdebug(self._debug('got exception: {}'.format(e))) | |
113 | self._cleanup_socket() | |
114 | dbg._pdebug(self._debug('sleeping for {} s'.format(_RETRY_REG_DELAY))) | |
115 | time.sleep(_RETRY_REG_DELAY) | |
116 | ||
117 | def _recv_server_cmd_header(self): | |
118 | data = self._sessiond_sock.recv(lttngust.cmd._SERVER_CMD_HEADER_SIZE) | |
119 | ||
120 | if not data: | |
121 | dbg._pdebug(self._debug('received empty server command header')) | |
122 | return None | |
123 | ||
124 | assert(len(data) == lttngust.cmd._SERVER_CMD_HEADER_SIZE) | |
125 | dbg._pdebug(self._debug('received server command header ({} bytes)'.format(len(data)))) | |
126 | ||
127 | return lttngust.cmd._server_cmd_header_from_data(data) | |
128 | ||
129 | def _recv_server_cmd(self): | |
130 | server_cmd_header = self._recv_server_cmd_header() | |
131 | ||
132 | if server_cmd_header is None: | |
133 | return None | |
134 | ||
135 | dbg._pdebug(self._debug('server command header: data size: {} bytes'.format(server_cmd_header.data_size))) | |
136 | dbg._pdebug(self._debug('server command header: command ID: {}'.format(server_cmd_header.cmd_id))) | |
137 | dbg._pdebug(self._debug('server command header: command version: {}'.format(server_cmd_header.cmd_version))) | |
138 | data = bytes() | |
139 | ||
140 | if server_cmd_header.data_size > 0: | |
141 | data = self._sessiond_sock.recv(server_cmd_header.data_size) | |
142 | assert(len(data) == server_cmd_header.data_size) | |
143 | ||
144 | return lttngust.cmd._server_cmd_from_data(server_cmd_header, data) | |
145 | ||
146 | def _send_cmd_reply(self, cmd_reply): | |
147 | data = cmd_reply.get_data() | |
148 | dbg._pdebug(self._debug('sending command reply ({} bytes)'.format(len(data)))) | |
149 | self._sessiond_sock.sendall(data) | |
150 | ||
151 | def _handle_server_cmd_reg_done(self, server_cmd): | |
152 | dbg._pdebug(self._debug('got "registration done" server command')) | |
153 | ||
154 | if self._reg_queue is not None: | |
155 | dbg._pdebug(self._debug('notifying _init_threads()')) | |
156 | ||
157 | try: | |
158 | self._reg_queue.put(True) | |
159 | except (Exception) as e: | |
160 | # read side could be closed by now; ignore it | |
161 | pass | |
162 | ||
163 | self._reg_queue = None | |
164 | ||
165 | def _handle_server_cmd_enable(self, server_cmd): | |
166 | dbg._pdebug(self._debug('got "enable" server command')) | |
167 | self._ref_count += 1 | |
168 | ||
169 | if self._ref_count == 1: | |
170 | dbg._pdebug(self._debug('adding our handler to the root logger')) | |
171 | self._root_logger.addHandler(self._log_handler) | |
172 | ||
173 | dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count))) | |
174 | ||
175 | return lttngust.cmd._ClientCmdReplyEnable() | |
176 | ||
177 | def _handle_server_cmd_disable(self, server_cmd): | |
178 | dbg._pdebug(self._debug('got "disable" server command')) | |
179 | self._ref_count -= 1 | |
180 | ||
181 | if self._ref_count < 0: | |
182 | # disable command could be sent again when a session is destroyed | |
183 | self._ref_count = 0 | |
184 | ||
185 | if self._ref_count == 0: | |
186 | dbg._pdebug(self._debug('removing our handler from the root logger')) | |
187 | self._root_logger.removeHandler(self._log_handler) | |
188 | ||
189 | dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count))) | |
190 | ||
191 | return lttngust.cmd._ClientCmdReplyDisable() | |
192 | ||
193 | def _handle_server_cmd_list(self, server_cmd): | |
194 | dbg._pdebug(self._debug('got "list" server command')) | |
195 | names = logging.Logger.manager.loggerDict.keys() | |
196 | dbg._pdebug(self._debug('found {} loggers'.format(len(names)))) | |
197 | cmd_reply = lttngust.cmd._ClientCmdReplyList(names=names) | |
198 | ||
199 | return cmd_reply | |
200 | ||
201 | def _handle_server_cmd(self, server_cmd): | |
202 | cmd_reply = None | |
203 | ||
204 | if server_cmd is None: | |
205 | dbg._pdebug(self._debug('bad server command')) | |
206 | status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD | |
207 | cmd_reply = lttngust.cmd._ClientCmdReply(status) | |
208 | elif type(server_cmd) in self._server_cmd_handlers: | |
209 | cmd_reply = self._server_cmd_handlers[type(server_cmd)](server_cmd) | |
210 | else: | |
211 | dbg._pdebug(self._debug('unknown server command')) | |
212 | status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD | |
213 | cmd_reply = lttngust.cmd._ClientCmdReply(status) | |
214 | ||
215 | if cmd_reply is not None: | |
216 | self._send_cmd_reply(cmd_reply) | |
217 | ||
218 | def _wait_server_cmd(self): | |
219 | while True: | |
220 | try: | |
221 | server_cmd = self._recv_server_cmd() | |
222 | except socket.timeout: | |
223 | # simply retry here; the protocol has no KA and we could | |
224 | # wait for hours | |
225 | continue | |
226 | ||
227 | self._handle_server_cmd(server_cmd) | |
228 | ||
229 | def _cleanup_socket(self): | |
230 | try: | |
231 | self._sessiond_sock.shutdown(socket.SHUT_RDWR) | |
232 | self._sessiond_sock.close() | |
233 | except: | |
234 | pass | |
235 | ||
236 | self._sessiond_sock = None | |
237 | ||
238 | def _connect_to_sessiond(self): | |
239 | # create session daemon TCP socket | |
240 | if self._sessiond_sock is None: | |
241 | self._sessiond_sock = socket.socket(socket.AF_INET, | |
242 | socket.SOCK_STREAM) | |
243 | ||
244 | # Use str(self._host) here. Since this host could be a string | |
245 | # literal, and since we're importing __future__.unicode_literals, | |
246 | # we want to make sure the host is a native string in Python 2. | |
247 | # This avoids an indirect module import (unicode module to | |
248 | # decode the unicode string, eventually imported by the | |
249 | # socket module if needed), which is not allowed in a thread | |
250 | # directly created by a module in Python 2 (our case). | |
251 | # | |
252 | # tl;dr: Do NOT remove str() here, or this call in Python 2 | |
253 | # _will_ block on an interpreter's mutex until the waiting | |
254 | # register queue timeouts. | |
255 | self._sessiond_sock.connect((str(self._host), self._port)) | |
256 | ||
257 | def _register(self): | |
258 | cmd = lttngust.cmd._ClientRegisterCmd(_PROTO_DOMAIN, os.getpid(), | |
259 | _PROTO_MAJOR, _PROTO_MINOR) | |
260 | data = cmd.get_data() | |
261 | self._sessiond_sock.sendall(data) | |
262 | ||
263 | ||
264 | def _get_port_from_file(path): | |
265 | port = None | |
266 | dbg._pdebug('reading port from file "{}"'.format(path)) | |
267 | ||
268 | try: | |
269 | f = open(path) | |
270 | r_port = int(f.readline()) | |
271 | f.close() | |
272 | ||
273 | if r_port > 0 or r_port <= 65535: | |
274 | port = r_port | |
275 | except: | |
276 | pass | |
277 | ||
278 | return port | |
279 | ||
280 | ||
281 | def _get_user_home_path(): | |
282 | # $LTTNG_HOME overrides $HOME if it exists | |
283 | return os.getenv('LTTNG_HOME', os.path.expanduser('~')) | |
284 | ||
285 | ||
286 | _initialized = False | |
287 | _SESSIOND_HOST = '127.0.0.1' | |
288 | ||
289 | ||
290 | def _client_thread_target(name, port, reg_queue): | |
291 | dbg._pdebug('creating client "{}" using TCP port {}'.format(name, port)) | |
292 | client = _TcpClient(name, _SESSIOND_HOST, port, reg_queue) | |
293 | dbg._pdebug('starting client "{}"'.format(name)) | |
294 | client.run() | |
295 | ||
296 | ||
297 | def _init_threads(): | |
298 | global _initialized | |
299 | ||
300 | dbg._pdebug('entering') | |
301 | ||
302 | if _initialized: | |
303 | dbg._pdebug('agent is already initialized') | |
304 | return | |
305 | ||
306 | # This makes sure that the appropriate modules for encoding and | |
307 | # decoding strings/bytes are imported now, since no import should | |
308 | # happen within a thread at import time (our case). | |
309 | 'lttng'.encode().decode() | |
310 | ||
311 | _initialized = True | |
312 | sys_port = _get_port_from_file('/var/run/lttng/agent.port') | |
313 | user_port_file = os.path.join(_get_user_home_path(), '.lttng', 'agent.port') | |
314 | user_port = _get_port_from_file(user_port_file) | |
315 | reg_queue = queue.Queue() | |
316 | reg_expecting = 0 | |
317 | ||
318 | dbg._pdebug('system session daemon port: {}'.format(sys_port)) | |
319 | dbg._pdebug('user session daemon port: {}'.format(user_port)) | |
320 | ||
382cbd15 PP |
321 | if sys_port == user_port and sys_port is not None: |
322 | # The two session daemon ports are the same. This is not normal. | |
323 | # Connect to only one. | |
324 | dbg._pdebug('both user and system session daemon have the same port') | |
325 | sys_port = None | |
326 | ||
de4dee04 PP |
327 | try: |
328 | if sys_port is not None: | |
329 | dbg._pdebug('creating system client thread') | |
330 | t = threading.Thread(target=_client_thread_target, | |
331 | args=('system', sys_port, reg_queue)) | |
332 | t.name = 'system' | |
333 | t.daemon = True | |
334 | t.start() | |
335 | dbg._pdebug('created and started system client thread') | |
336 | reg_expecting += 1 | |
337 | ||
338 | if user_port is not None: | |
339 | dbg._pdebug('creating user client thread') | |
340 | t = threading.Thread(target=_client_thread_target, | |
341 | args=('user', user_port, reg_queue)) | |
342 | t.name = 'user' | |
343 | t.daemon = True | |
344 | t.start() | |
345 | dbg._pdebug('created and started user client thread') | |
346 | reg_expecting += 1 | |
347 | except: | |
348 | # cannot create threads for some reason; stop this initialization | |
349 | dbg._pwarning('cannot create client threads') | |
350 | return | |
351 | ||
352 | if reg_expecting == 0: | |
353 | # early exit: looks like there's not even one valid port | |
354 | dbg._pwarning('no valid LTTng session daemon port found (is the session daemon started?)') | |
355 | return | |
356 | ||
357 | cur_timeout = _REG_TIMEOUT | |
358 | ||
359 | # We block here to make sure the agent is properly registered to | |
360 | # the session daemon. If we timeout, the client threads will still | |
361 | # continue to try to connect and register to the session daemon, | |
362 | # but there is no guarantee that all following logging statements | |
363 | # will make it to LTTng-UST. | |
364 | # | |
365 | # When a client thread receives a "registration done" confirmation | |
366 | # from the session daemon it's connected to, it puts True in | |
367 | # reg_queue. | |
368 | while True: | |
369 | try: | |
370 | dbg._pdebug('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting, | |
371 | cur_timeout)) | |
e7bf4968 | 372 | t1 = lttngust.compat._clock() |
de4dee04 | 373 | reg_queue.get(timeout=cur_timeout) |
e7bf4968 | 374 | t2 = lttngust.compat._clock() |
de4dee04 PP |
375 | reg_expecting -= 1 |
376 | dbg._pdebug('unblocked') | |
377 | ||
378 | if reg_expecting == 0: | |
379 | # done! | |
380 | dbg._pdebug('successfully registered to session daemon(s)') | |
381 | break | |
382 | ||
383 | cur_timeout -= (t2 - t1) | |
384 | ||
385 | if cur_timeout <= 0: | |
386 | # timeout | |
387 | dbg._pdebug('ran out of time') | |
388 | break | |
389 | except queue.Empty: | |
390 | dbg._pdebug('ran out of time') | |
391 | break | |
392 | ||
393 | dbg._pdebug('leaving') | |
394 | ||
395 | ||
396 | _init_threads() |