Commit | Line | Data |
---|---|---|
de4dee04 PP |
1 | # -*- coding: utf-8 -*- |
2 | # | |
3 | # Copyright (C) 2015 - Philippe Proulx <pproulx@efficios.com> | |
4 | # Copyright (C) 2014 - David Goulet <dgoulet@efficios.com> | |
5 | # | |
6 | # This library is free software; you can redistribute it and/or modify it under | |
7 | # the terms of the GNU Lesser General Public License as published by the Free | |
8 | # Software Foundation; version 2.1 of the License. | |
9 | # | |
10 | # This library is distributed in the hope that it will be useful, but WITHOUT | |
11 | # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | |
12 | # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more | |
13 | # details. | |
14 | # | |
15 | # You should have received a copy of the GNU Lesser General Public License | |
16 | # along with this library; if not, write to the Free Software Foundation, Inc., | |
17 | # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |
18 | ||
19 | from __future__ import unicode_literals | |
20 | from __future__ import print_function | |
21 | from __future__ import division | |
22 | import lttngust.debug as dbg | |
23 | import lttngust.loghandler | |
24 | import lttngust.cmd | |
25 | from io import open | |
26 | import threading | |
27 | import logging | |
28 | import socket | |
29 | import time | |
30 | import sys | |
31 | import os | |
32 | ||
33 | ||
34 | try: | |
35 | # Python 2 | |
36 | import Queue as queue | |
37 | except ImportError: | |
38 | # Python 3 | |
39 | import queue | |
40 | ||
41 | ||
42 | _PROTO_DOMAIN = 5 | |
b52ff352 | 43 | _PROTO_MAJOR = 2 |
de4dee04 PP |
44 | _PROTO_MINOR = 0 |
45 | ||
46 | ||
47 | def _get_env_value_ms(key, default_s): | |
48 | try: | |
49 | val = int(os.getenv(key, default_s * 1000)) / 1000 | |
50 | except: | |
51 | val = -1 | |
52 | ||
53 | if val < 0: | |
54 | fmt = 'invalid ${} value; {} seconds will be used' | |
55 | dbg._pwarning(fmt.format(key, default_s)) | |
56 | val = default_s | |
57 | ||
58 | return val | |
59 | ||
60 | ||
61 | _REG_TIMEOUT = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5) | |
62 | _RETRY_REG_DELAY = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3) | |
63 | ||
64 | ||
65 | class _TcpClient(object): | |
66 | def __init__(self, name, host, port, reg_queue): | |
67 | super(self.__class__, self).__init__() | |
68 | self._name = name | |
69 | self._host = host | |
70 | self._port = port | |
71 | ||
72 | try: | |
73 | self._log_handler = lttngust.loghandler._Handler() | |
74 | except (OSError) as e: | |
75 | dbg._pwarning('cannot load library: {}'.format(e)) | |
76 | raise e | |
77 | ||
78 | self._root_logger = logging.getLogger() | |
79 | self._root_logger.setLevel(logging.NOTSET) | |
80 | self._ref_count = 0 | |
81 | self._sessiond_sock = None | |
82 | self._reg_queue = reg_queue | |
83 | self._server_cmd_handlers = { | |
84 | lttngust.cmd._ServerCmdRegistrationDone: self._handle_server_cmd_reg_done, | |
85 | lttngust.cmd._ServerCmdEnable: self._handle_server_cmd_enable, | |
86 | lttngust.cmd._ServerCmdDisable: self._handle_server_cmd_disable, | |
87 | lttngust.cmd._ServerCmdList: self._handle_server_cmd_list, | |
88 | } | |
89 | ||
90 | def _debug(self, msg): | |
91 | return 'client "{}": {}'.format(self._name, msg) | |
92 | ||
93 | def run(self): | |
94 | while True: | |
95 | try: | |
96 | # connect to the session daemon | |
97 | dbg._pdebug(self._debug('connecting to session daemon')) | |
98 | self._connect_to_sessiond() | |
99 | ||
100 | # register to the session daemon after a successful connection | |
101 | dbg._pdebug(self._debug('registering to session daemon')) | |
102 | self._register() | |
103 | ||
104 | # wait for commands from the session daemon | |
105 | self._wait_server_cmd() | |
106 | except (Exception) as e: | |
107 | # Whatever happens here, we have to close the socket and | |
108 | # retry to connect to the session daemon since either | |
109 | # the socket was closed, a network timeout occured, or | |
110 | # invalid data was received. | |
111 | dbg._pdebug(self._debug('got exception: {}'.format(e))) | |
112 | self._cleanup_socket() | |
113 | dbg._pdebug(self._debug('sleeping for {} s'.format(_RETRY_REG_DELAY))) | |
114 | time.sleep(_RETRY_REG_DELAY) | |
115 | ||
116 | def _recv_server_cmd_header(self): | |
117 | data = self._sessiond_sock.recv(lttngust.cmd._SERVER_CMD_HEADER_SIZE) | |
118 | ||
119 | if not data: | |
120 | dbg._pdebug(self._debug('received empty server command header')) | |
121 | return None | |
122 | ||
123 | assert(len(data) == lttngust.cmd._SERVER_CMD_HEADER_SIZE) | |
124 | dbg._pdebug(self._debug('received server command header ({} bytes)'.format(len(data)))) | |
125 | ||
126 | return lttngust.cmd._server_cmd_header_from_data(data) | |
127 | ||
128 | def _recv_server_cmd(self): | |
129 | server_cmd_header = self._recv_server_cmd_header() | |
130 | ||
131 | if server_cmd_header is None: | |
132 | return None | |
133 | ||
134 | dbg._pdebug(self._debug('server command header: data size: {} bytes'.format(server_cmd_header.data_size))) | |
135 | dbg._pdebug(self._debug('server command header: command ID: {}'.format(server_cmd_header.cmd_id))) | |
136 | dbg._pdebug(self._debug('server command header: command version: {}'.format(server_cmd_header.cmd_version))) | |
137 | data = bytes() | |
138 | ||
139 | if server_cmd_header.data_size > 0: | |
140 | data = self._sessiond_sock.recv(server_cmd_header.data_size) | |
141 | assert(len(data) == server_cmd_header.data_size) | |
142 | ||
143 | return lttngust.cmd._server_cmd_from_data(server_cmd_header, data) | |
144 | ||
145 | def _send_cmd_reply(self, cmd_reply): | |
146 | data = cmd_reply.get_data() | |
147 | dbg._pdebug(self._debug('sending command reply ({} bytes)'.format(len(data)))) | |
148 | self._sessiond_sock.sendall(data) | |
149 | ||
150 | def _handle_server_cmd_reg_done(self, server_cmd): | |
151 | dbg._pdebug(self._debug('got "registration done" server command')) | |
152 | ||
153 | if self._reg_queue is not None: | |
154 | dbg._pdebug(self._debug('notifying _init_threads()')) | |
155 | ||
156 | try: | |
157 | self._reg_queue.put(True) | |
158 | except (Exception) as e: | |
159 | # read side could be closed by now; ignore it | |
160 | pass | |
161 | ||
162 | self._reg_queue = None | |
163 | ||
164 | def _handle_server_cmd_enable(self, server_cmd): | |
165 | dbg._pdebug(self._debug('got "enable" server command')) | |
166 | self._ref_count += 1 | |
167 | ||
168 | if self._ref_count == 1: | |
169 | dbg._pdebug(self._debug('adding our handler to the root logger')) | |
170 | self._root_logger.addHandler(self._log_handler) | |
171 | ||
172 | dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count))) | |
173 | ||
174 | return lttngust.cmd._ClientCmdReplyEnable() | |
175 | ||
176 | def _handle_server_cmd_disable(self, server_cmd): | |
177 | dbg._pdebug(self._debug('got "disable" server command')) | |
178 | self._ref_count -= 1 | |
179 | ||
180 | if self._ref_count < 0: | |
181 | # disable command could be sent again when a session is destroyed | |
182 | self._ref_count = 0 | |
183 | ||
184 | if self._ref_count == 0: | |
185 | dbg._pdebug(self._debug('removing our handler from the root logger')) | |
186 | self._root_logger.removeHandler(self._log_handler) | |
187 | ||
188 | dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count))) | |
189 | ||
190 | return lttngust.cmd._ClientCmdReplyDisable() | |
191 | ||
192 | def _handle_server_cmd_list(self, server_cmd): | |
193 | dbg._pdebug(self._debug('got "list" server command')) | |
194 | names = logging.Logger.manager.loggerDict.keys() | |
195 | dbg._pdebug(self._debug('found {} loggers'.format(len(names)))) | |
196 | cmd_reply = lttngust.cmd._ClientCmdReplyList(names=names) | |
197 | ||
198 | return cmd_reply | |
199 | ||
200 | def _handle_server_cmd(self, server_cmd): | |
201 | cmd_reply = None | |
202 | ||
203 | if server_cmd is None: | |
204 | dbg._pdebug(self._debug('bad server command')) | |
205 | status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD | |
206 | cmd_reply = lttngust.cmd._ClientCmdReply(status) | |
207 | elif type(server_cmd) in self._server_cmd_handlers: | |
208 | cmd_reply = self._server_cmd_handlers[type(server_cmd)](server_cmd) | |
209 | else: | |
210 | dbg._pdebug(self._debug('unknown server command')) | |
211 | status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD | |
212 | cmd_reply = lttngust.cmd._ClientCmdReply(status) | |
213 | ||
214 | if cmd_reply is not None: | |
215 | self._send_cmd_reply(cmd_reply) | |
216 | ||
217 | def _wait_server_cmd(self): | |
218 | while True: | |
219 | try: | |
220 | server_cmd = self._recv_server_cmd() | |
221 | except socket.timeout: | |
222 | # simply retry here; the protocol has no KA and we could | |
223 | # wait for hours | |
224 | continue | |
225 | ||
226 | self._handle_server_cmd(server_cmd) | |
227 | ||
228 | def _cleanup_socket(self): | |
229 | try: | |
230 | self._sessiond_sock.shutdown(socket.SHUT_RDWR) | |
231 | self._sessiond_sock.close() | |
232 | except: | |
233 | pass | |
234 | ||
235 | self._sessiond_sock = None | |
236 | ||
237 | def _connect_to_sessiond(self): | |
238 | # create session daemon TCP socket | |
239 | if self._sessiond_sock is None: | |
240 | self._sessiond_sock = socket.socket(socket.AF_INET, | |
241 | socket.SOCK_STREAM) | |
242 | ||
243 | # Use str(self._host) here. Since this host could be a string | |
244 | # literal, and since we're importing __future__.unicode_literals, | |
245 | # we want to make sure the host is a native string in Python 2. | |
246 | # This avoids an indirect module import (unicode module to | |
247 | # decode the unicode string, eventually imported by the | |
248 | # socket module if needed), which is not allowed in a thread | |
249 | # directly created by a module in Python 2 (our case). | |
250 | # | |
251 | # tl;dr: Do NOT remove str() here, or this call in Python 2 | |
252 | # _will_ block on an interpreter's mutex until the waiting | |
253 | # register queue timeouts. | |
254 | self._sessiond_sock.connect((str(self._host), self._port)) | |
255 | ||
256 | def _register(self): | |
257 | cmd = lttngust.cmd._ClientRegisterCmd(_PROTO_DOMAIN, os.getpid(), | |
258 | _PROTO_MAJOR, _PROTO_MINOR) | |
259 | data = cmd.get_data() | |
260 | self._sessiond_sock.sendall(data) | |
261 | ||
262 | ||
263 | def _get_port_from_file(path): | |
264 | port = None | |
265 | dbg._pdebug('reading port from file "{}"'.format(path)) | |
266 | ||
267 | try: | |
268 | f = open(path) | |
269 | r_port = int(f.readline()) | |
270 | f.close() | |
271 | ||
272 | if r_port > 0 or r_port <= 65535: | |
273 | port = r_port | |
274 | except: | |
275 | pass | |
276 | ||
277 | return port | |
278 | ||
279 | ||
280 | def _get_user_home_path(): | |
281 | # $LTTNG_HOME overrides $HOME if it exists | |
282 | return os.getenv('LTTNG_HOME', os.path.expanduser('~')) | |
283 | ||
284 | ||
285 | _initialized = False | |
286 | _SESSIOND_HOST = '127.0.0.1' | |
287 | ||
288 | ||
289 | def _client_thread_target(name, port, reg_queue): | |
290 | dbg._pdebug('creating client "{}" using TCP port {}'.format(name, port)) | |
291 | client = _TcpClient(name, _SESSIOND_HOST, port, reg_queue) | |
292 | dbg._pdebug('starting client "{}"'.format(name)) | |
293 | client.run() | |
294 | ||
295 | ||
296 | def _init_threads(): | |
297 | global _initialized | |
298 | ||
299 | dbg._pdebug('entering') | |
300 | ||
301 | if _initialized: | |
302 | dbg._pdebug('agent is already initialized') | |
303 | return | |
304 | ||
305 | # This makes sure that the appropriate modules for encoding and | |
306 | # decoding strings/bytes are imported now, since no import should | |
307 | # happen within a thread at import time (our case). | |
308 | 'lttng'.encode().decode() | |
309 | ||
310 | _initialized = True | |
311 | sys_port = _get_port_from_file('/var/run/lttng/agent.port') | |
312 | user_port_file = os.path.join(_get_user_home_path(), '.lttng', 'agent.port') | |
313 | user_port = _get_port_from_file(user_port_file) | |
314 | reg_queue = queue.Queue() | |
315 | reg_expecting = 0 | |
316 | ||
317 | dbg._pdebug('system session daemon port: {}'.format(sys_port)) | |
318 | dbg._pdebug('user session daemon port: {}'.format(user_port)) | |
319 | ||
382cbd15 PP |
320 | if sys_port == user_port and sys_port is not None: |
321 | # The two session daemon ports are the same. This is not normal. | |
322 | # Connect to only one. | |
323 | dbg._pdebug('both user and system session daemon have the same port') | |
324 | sys_port = None | |
325 | ||
de4dee04 PP |
326 | try: |
327 | if sys_port is not None: | |
328 | dbg._pdebug('creating system client thread') | |
329 | t = threading.Thread(target=_client_thread_target, | |
330 | args=('system', sys_port, reg_queue)) | |
331 | t.name = 'system' | |
332 | t.daemon = True | |
333 | t.start() | |
334 | dbg._pdebug('created and started system client thread') | |
335 | reg_expecting += 1 | |
336 | ||
337 | if user_port is not None: | |
338 | dbg._pdebug('creating user client thread') | |
339 | t = threading.Thread(target=_client_thread_target, | |
340 | args=('user', user_port, reg_queue)) | |
341 | t.name = 'user' | |
342 | t.daemon = True | |
343 | t.start() | |
344 | dbg._pdebug('created and started user client thread') | |
345 | reg_expecting += 1 | |
346 | except: | |
347 | # cannot create threads for some reason; stop this initialization | |
348 | dbg._pwarning('cannot create client threads') | |
349 | return | |
350 | ||
351 | if reg_expecting == 0: | |
352 | # early exit: looks like there's not even one valid port | |
353 | dbg._pwarning('no valid LTTng session daemon port found (is the session daemon started?)') | |
354 | return | |
355 | ||
356 | cur_timeout = _REG_TIMEOUT | |
357 | ||
358 | # We block here to make sure the agent is properly registered to | |
359 | # the session daemon. If we timeout, the client threads will still | |
360 | # continue to try to connect and register to the session daemon, | |
361 | # but there is no guarantee that all following logging statements | |
362 | # will make it to LTTng-UST. | |
363 | # | |
364 | # When a client thread receives a "registration done" confirmation | |
365 | # from the session daemon it's connected to, it puts True in | |
366 | # reg_queue. | |
367 | while True: | |
368 | try: | |
369 | dbg._pdebug('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting, | |
370 | cur_timeout)) | |
371 | t1 = time.clock() | |
372 | reg_queue.get(timeout=cur_timeout) | |
373 | t2 = time.clock() | |
374 | reg_expecting -= 1 | |
375 | dbg._pdebug('unblocked') | |
376 | ||
377 | if reg_expecting == 0: | |
378 | # done! | |
379 | dbg._pdebug('successfully registered to session daemon(s)') | |
380 | break | |
381 | ||
382 | cur_timeout -= (t2 - t1) | |
383 | ||
384 | if cur_timeout <= 0: | |
385 | # timeout | |
386 | dbg._pdebug('ran out of time') | |
387 | break | |
388 | except queue.Empty: | |
389 | dbg._pdebug('ran out of time') | |
390 | break | |
391 | ||
392 | dbg._pdebug('leaving') | |
393 | ||
394 | ||
395 | _init_threads() |