| 1 | # -*- coding: utf-8 -*- |
| 2 | # |
| 3 | # Copyright (C) 2015 - Philippe Proulx <pproulx@efficios.com> |
| 4 | # Copyright (C) 2014 - David Goulet <dgoulet@efficios.com> |
| 5 | # |
| 6 | # This library is free software; you can redistribute it and/or modify it under |
| 7 | # the terms of the GNU Lesser General Public License as published by the Free |
| 8 | # Software Foundation; version 2.1 of the License. |
| 9 | # |
| 10 | # This library is distributed in the hope that it will be useful, but WITHOUT |
| 11 | # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| 12 | # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
| 13 | # details. |
| 14 | # |
| 15 | # You should have received a copy of the GNU Lesser General Public License |
| 16 | # along with this library; if not, write to the Free Software Foundation, Inc., |
| 17 | # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
| 18 | |
| 19 | from __future__ import unicode_literals |
| 20 | from __future__ import print_function |
| 21 | from __future__ import division |
| 22 | import lttngust.debug as dbg |
| 23 | import lttngust.loghandler |
| 24 | import lttngust.cmd |
| 25 | from io import open |
| 26 | import threading |
| 27 | import logging |
| 28 | import socket |
| 29 | import time |
| 30 | import sys |
| 31 | import os |
| 32 | |
| 33 | |
| 34 | try: |
| 35 | # Python 2 |
| 36 | import Queue as queue |
| 37 | except ImportError: |
| 38 | # Python 3 |
| 39 | import queue |
| 40 | |
| 41 | |
| 42 | _PROTO_DOMAIN = 5 |
| 43 | _PROTO_MAJOR = 2 |
| 44 | _PROTO_MINOR = 0 |
| 45 | |
| 46 | |
| 47 | def _get_env_value_ms(key, default_s): |
| 48 | try: |
| 49 | val = int(os.getenv(key, default_s * 1000)) / 1000 |
| 50 | except: |
| 51 | val = -1 |
| 52 | |
| 53 | if val < 0: |
| 54 | fmt = 'invalid ${} value; {} seconds will be used' |
| 55 | dbg._pwarning(fmt.format(key, default_s)) |
| 56 | val = default_s |
| 57 | |
| 58 | return val |
| 59 | |
| 60 | |
| 61 | _REG_TIMEOUT = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5) |
| 62 | _RETRY_REG_DELAY = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3) |
| 63 | |
| 64 | |
| 65 | class _TcpClient(object): |
| 66 | def __init__(self, name, host, port, reg_queue): |
| 67 | super(self.__class__, self).__init__() |
| 68 | self._name = name |
| 69 | self._host = host |
| 70 | self._port = port |
| 71 | |
| 72 | try: |
| 73 | self._log_handler = lttngust.loghandler._Handler() |
| 74 | except (OSError) as e: |
| 75 | dbg._pwarning('cannot load library: {}'.format(e)) |
| 76 | raise e |
| 77 | |
| 78 | self._root_logger = logging.getLogger() |
| 79 | self._root_logger.setLevel(logging.NOTSET) |
| 80 | self._ref_count = 0 |
| 81 | self._sessiond_sock = None |
| 82 | self._reg_queue = reg_queue |
| 83 | self._server_cmd_handlers = { |
| 84 | lttngust.cmd._ServerCmdRegistrationDone: self._handle_server_cmd_reg_done, |
| 85 | lttngust.cmd._ServerCmdEnable: self._handle_server_cmd_enable, |
| 86 | lttngust.cmd._ServerCmdDisable: self._handle_server_cmd_disable, |
| 87 | lttngust.cmd._ServerCmdList: self._handle_server_cmd_list, |
| 88 | } |
| 89 | |
| 90 | def _debug(self, msg): |
| 91 | return 'client "{}": {}'.format(self._name, msg) |
| 92 | |
| 93 | def run(self): |
| 94 | while True: |
| 95 | try: |
| 96 | # connect to the session daemon |
| 97 | dbg._pdebug(self._debug('connecting to session daemon')) |
| 98 | self._connect_to_sessiond() |
| 99 | |
| 100 | # register to the session daemon after a successful connection |
| 101 | dbg._pdebug(self._debug('registering to session daemon')) |
| 102 | self._register() |
| 103 | |
| 104 | # wait for commands from the session daemon |
| 105 | self._wait_server_cmd() |
| 106 | except (Exception) as e: |
| 107 | # Whatever happens here, we have to close the socket and |
| 108 | # retry to connect to the session daemon since either |
| 109 | # the socket was closed, a network timeout occured, or |
| 110 | # invalid data was received. |
| 111 | dbg._pdebug(self._debug('got exception: {}'.format(e))) |
| 112 | self._cleanup_socket() |
| 113 | dbg._pdebug(self._debug('sleeping for {} s'.format(_RETRY_REG_DELAY))) |
| 114 | time.sleep(_RETRY_REG_DELAY) |
| 115 | |
| 116 | def _recv_server_cmd_header(self): |
| 117 | data = self._sessiond_sock.recv(lttngust.cmd._SERVER_CMD_HEADER_SIZE) |
| 118 | |
| 119 | if not data: |
| 120 | dbg._pdebug(self._debug('received empty server command header')) |
| 121 | return None |
| 122 | |
| 123 | assert(len(data) == lttngust.cmd._SERVER_CMD_HEADER_SIZE) |
| 124 | dbg._pdebug(self._debug('received server command header ({} bytes)'.format(len(data)))) |
| 125 | |
| 126 | return lttngust.cmd._server_cmd_header_from_data(data) |
| 127 | |
| 128 | def _recv_server_cmd(self): |
| 129 | server_cmd_header = self._recv_server_cmd_header() |
| 130 | |
| 131 | if server_cmd_header is None: |
| 132 | return None |
| 133 | |
| 134 | dbg._pdebug(self._debug('server command header: data size: {} bytes'.format(server_cmd_header.data_size))) |
| 135 | dbg._pdebug(self._debug('server command header: command ID: {}'.format(server_cmd_header.cmd_id))) |
| 136 | dbg._pdebug(self._debug('server command header: command version: {}'.format(server_cmd_header.cmd_version))) |
| 137 | data = bytes() |
| 138 | |
| 139 | if server_cmd_header.data_size > 0: |
| 140 | data = self._sessiond_sock.recv(server_cmd_header.data_size) |
| 141 | assert(len(data) == server_cmd_header.data_size) |
| 142 | |
| 143 | return lttngust.cmd._server_cmd_from_data(server_cmd_header, data) |
| 144 | |
| 145 | def _send_cmd_reply(self, cmd_reply): |
| 146 | data = cmd_reply.get_data() |
| 147 | dbg._pdebug(self._debug('sending command reply ({} bytes)'.format(len(data)))) |
| 148 | self._sessiond_sock.sendall(data) |
| 149 | |
| 150 | def _handle_server_cmd_reg_done(self, server_cmd): |
| 151 | dbg._pdebug(self._debug('got "registration done" server command')) |
| 152 | |
| 153 | if self._reg_queue is not None: |
| 154 | dbg._pdebug(self._debug('notifying _init_threads()')) |
| 155 | |
| 156 | try: |
| 157 | self._reg_queue.put(True) |
| 158 | except (Exception) as e: |
| 159 | # read side could be closed by now; ignore it |
| 160 | pass |
| 161 | |
| 162 | self._reg_queue = None |
| 163 | |
| 164 | def _handle_server_cmd_enable(self, server_cmd): |
| 165 | dbg._pdebug(self._debug('got "enable" server command')) |
| 166 | self._ref_count += 1 |
| 167 | |
| 168 | if self._ref_count == 1: |
| 169 | dbg._pdebug(self._debug('adding our handler to the root logger')) |
| 170 | self._root_logger.addHandler(self._log_handler) |
| 171 | |
| 172 | dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count))) |
| 173 | |
| 174 | return lttngust.cmd._ClientCmdReplyEnable() |
| 175 | |
| 176 | def _handle_server_cmd_disable(self, server_cmd): |
| 177 | dbg._pdebug(self._debug('got "disable" server command')) |
| 178 | self._ref_count -= 1 |
| 179 | |
| 180 | if self._ref_count < 0: |
| 181 | # disable command could be sent again when a session is destroyed |
| 182 | self._ref_count = 0 |
| 183 | |
| 184 | if self._ref_count == 0: |
| 185 | dbg._pdebug(self._debug('removing our handler from the root logger')) |
| 186 | self._root_logger.removeHandler(self._log_handler) |
| 187 | |
| 188 | dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count))) |
| 189 | |
| 190 | return lttngust.cmd._ClientCmdReplyDisable() |
| 191 | |
| 192 | def _handle_server_cmd_list(self, server_cmd): |
| 193 | dbg._pdebug(self._debug('got "list" server command')) |
| 194 | names = logging.Logger.manager.loggerDict.keys() |
| 195 | dbg._pdebug(self._debug('found {} loggers'.format(len(names)))) |
| 196 | cmd_reply = lttngust.cmd._ClientCmdReplyList(names=names) |
| 197 | |
| 198 | return cmd_reply |
| 199 | |
| 200 | def _handle_server_cmd(self, server_cmd): |
| 201 | cmd_reply = None |
| 202 | |
| 203 | if server_cmd is None: |
| 204 | dbg._pdebug(self._debug('bad server command')) |
| 205 | status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD |
| 206 | cmd_reply = lttngust.cmd._ClientCmdReply(status) |
| 207 | elif type(server_cmd) in self._server_cmd_handlers: |
| 208 | cmd_reply = self._server_cmd_handlers[type(server_cmd)](server_cmd) |
| 209 | else: |
| 210 | dbg._pdebug(self._debug('unknown server command')) |
| 211 | status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD |
| 212 | cmd_reply = lttngust.cmd._ClientCmdReply(status) |
| 213 | |
| 214 | if cmd_reply is not None: |
| 215 | self._send_cmd_reply(cmd_reply) |
| 216 | |
| 217 | def _wait_server_cmd(self): |
| 218 | while True: |
| 219 | try: |
| 220 | server_cmd = self._recv_server_cmd() |
| 221 | except socket.timeout: |
| 222 | # simply retry here; the protocol has no KA and we could |
| 223 | # wait for hours |
| 224 | continue |
| 225 | |
| 226 | self._handle_server_cmd(server_cmd) |
| 227 | |
| 228 | def _cleanup_socket(self): |
| 229 | try: |
| 230 | self._sessiond_sock.shutdown(socket.SHUT_RDWR) |
| 231 | self._sessiond_sock.close() |
| 232 | except: |
| 233 | pass |
| 234 | |
| 235 | self._sessiond_sock = None |
| 236 | |
| 237 | def _connect_to_sessiond(self): |
| 238 | # create session daemon TCP socket |
| 239 | if self._sessiond_sock is None: |
| 240 | self._sessiond_sock = socket.socket(socket.AF_INET, |
| 241 | socket.SOCK_STREAM) |
| 242 | |
| 243 | # Use str(self._host) here. Since this host could be a string |
| 244 | # literal, and since we're importing __future__.unicode_literals, |
| 245 | # we want to make sure the host is a native string in Python 2. |
| 246 | # This avoids an indirect module import (unicode module to |
| 247 | # decode the unicode string, eventually imported by the |
| 248 | # socket module if needed), which is not allowed in a thread |
| 249 | # directly created by a module in Python 2 (our case). |
| 250 | # |
| 251 | # tl;dr: Do NOT remove str() here, or this call in Python 2 |
| 252 | # _will_ block on an interpreter's mutex until the waiting |
| 253 | # register queue timeouts. |
| 254 | self._sessiond_sock.connect((str(self._host), self._port)) |
| 255 | |
| 256 | def _register(self): |
| 257 | cmd = lttngust.cmd._ClientRegisterCmd(_PROTO_DOMAIN, os.getpid(), |
| 258 | _PROTO_MAJOR, _PROTO_MINOR) |
| 259 | data = cmd.get_data() |
| 260 | self._sessiond_sock.sendall(data) |
| 261 | |
| 262 | |
| 263 | def _get_port_from_file(path): |
| 264 | port = None |
| 265 | dbg._pdebug('reading port from file "{}"'.format(path)) |
| 266 | |
| 267 | try: |
| 268 | f = open(path) |
| 269 | r_port = int(f.readline()) |
| 270 | f.close() |
| 271 | |
| 272 | if r_port > 0 or r_port <= 65535: |
| 273 | port = r_port |
| 274 | except: |
| 275 | pass |
| 276 | |
| 277 | return port |
| 278 | |
| 279 | |
| 280 | def _get_user_home_path(): |
| 281 | # $LTTNG_HOME overrides $HOME if it exists |
| 282 | return os.getenv('LTTNG_HOME', os.path.expanduser('~')) |
| 283 | |
| 284 | |
| 285 | _initialized = False |
| 286 | _SESSIOND_HOST = '127.0.0.1' |
| 287 | |
| 288 | |
| 289 | def _client_thread_target(name, port, reg_queue): |
| 290 | dbg._pdebug('creating client "{}" using TCP port {}'.format(name, port)) |
| 291 | client = _TcpClient(name, _SESSIOND_HOST, port, reg_queue) |
| 292 | dbg._pdebug('starting client "{}"'.format(name)) |
| 293 | client.run() |
| 294 | |
| 295 | |
| 296 | def _init_threads(): |
| 297 | global _initialized |
| 298 | |
| 299 | dbg._pdebug('entering') |
| 300 | |
| 301 | if _initialized: |
| 302 | dbg._pdebug('agent is already initialized') |
| 303 | return |
| 304 | |
| 305 | # This makes sure that the appropriate modules for encoding and |
| 306 | # decoding strings/bytes are imported now, since no import should |
| 307 | # happen within a thread at import time (our case). |
| 308 | 'lttng'.encode().decode() |
| 309 | |
| 310 | _initialized = True |
| 311 | sys_port = _get_port_from_file('/var/run/lttng/agent.port') |
| 312 | user_port_file = os.path.join(_get_user_home_path(), '.lttng', 'agent.port') |
| 313 | user_port = _get_port_from_file(user_port_file) |
| 314 | reg_queue = queue.Queue() |
| 315 | reg_expecting = 0 |
| 316 | |
| 317 | dbg._pdebug('system session daemon port: {}'.format(sys_port)) |
| 318 | dbg._pdebug('user session daemon port: {}'.format(user_port)) |
| 319 | |
| 320 | if sys_port == user_port and sys_port is not None: |
| 321 | # The two session daemon ports are the same. This is not normal. |
| 322 | # Connect to only one. |
| 323 | dbg._pdebug('both user and system session daemon have the same port') |
| 324 | sys_port = None |
| 325 | |
| 326 | try: |
| 327 | if sys_port is not None: |
| 328 | dbg._pdebug('creating system client thread') |
| 329 | t = threading.Thread(target=_client_thread_target, |
| 330 | args=('system', sys_port, reg_queue)) |
| 331 | t.name = 'system' |
| 332 | t.daemon = True |
| 333 | t.start() |
| 334 | dbg._pdebug('created and started system client thread') |
| 335 | reg_expecting += 1 |
| 336 | |
| 337 | if user_port is not None: |
| 338 | dbg._pdebug('creating user client thread') |
| 339 | t = threading.Thread(target=_client_thread_target, |
| 340 | args=('user', user_port, reg_queue)) |
| 341 | t.name = 'user' |
| 342 | t.daemon = True |
| 343 | t.start() |
| 344 | dbg._pdebug('created and started user client thread') |
| 345 | reg_expecting += 1 |
| 346 | except: |
| 347 | # cannot create threads for some reason; stop this initialization |
| 348 | dbg._pwarning('cannot create client threads') |
| 349 | return |
| 350 | |
| 351 | if reg_expecting == 0: |
| 352 | # early exit: looks like there's not even one valid port |
| 353 | dbg._pwarning('no valid LTTng session daemon port found (is the session daemon started?)') |
| 354 | return |
| 355 | |
| 356 | cur_timeout = _REG_TIMEOUT |
| 357 | |
| 358 | # We block here to make sure the agent is properly registered to |
| 359 | # the session daemon. If we timeout, the client threads will still |
| 360 | # continue to try to connect and register to the session daemon, |
| 361 | # but there is no guarantee that all following logging statements |
| 362 | # will make it to LTTng-UST. |
| 363 | # |
| 364 | # When a client thread receives a "registration done" confirmation |
| 365 | # from the session daemon it's connected to, it puts True in |
| 366 | # reg_queue. |
| 367 | while True: |
| 368 | try: |
| 369 | dbg._pdebug('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting, |
| 370 | cur_timeout)) |
| 371 | t1 = time.clock() |
| 372 | reg_queue.get(timeout=cur_timeout) |
| 373 | t2 = time.clock() |
| 374 | reg_expecting -= 1 |
| 375 | dbg._pdebug('unblocked') |
| 376 | |
| 377 | if reg_expecting == 0: |
| 378 | # done! |
| 379 | dbg._pdebug('successfully registered to session daemon(s)') |
| 380 | break |
| 381 | |
| 382 | cur_timeout -= (t2 - t1) |
| 383 | |
| 384 | if cur_timeout <= 0: |
| 385 | # timeout |
| 386 | dbg._pdebug('ran out of time') |
| 387 | break |
| 388 | except queue.Empty: |
| 389 | dbg._pdebug('ran out of time') |
| 390 | break |
| 391 | |
| 392 | dbg._pdebug('leaving') |
| 393 | |
| 394 | |
| 395 | _init_threads() |