1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2015 - Philippe Proulx <pproulx@efficios.com>
4 # Copyright (C) 2014 - David Goulet <dgoulet@efficios.com>
6 # This library is free software; you can redistribute it and/or modify it under
7 # the terms of the GNU Lesser General Public License as published by the Free
8 # Software Foundation; version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 # You should have received a copy of the GNU Lesser General Public License
16 # along with this library; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 from __future__
import unicode_literals
20 from __future__
import print_function
21 from __future__
import division
22 import lttngust
.debug
as dbg
23 import lttngust
.loghandler
47 def _get_env_value_ms(key
, default_s
):
49 val
= int(os
.getenv(key
, default_s
* 1000)) / 1000
54 fmt
= 'invalid ${} value; {} seconds will be used'
55 dbg
._pwarning
(fmt
.format(key
, default_s
))
61 _REG_TIMEOUT
= _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5)
62 _RETRY_REG_DELAY
= _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3)
65 class _TcpClient(object):
66 def __init__(self
, name
, host
, port
, reg_queue
):
67 super(self
.__class
__, self
).__init
__()
73 self
._log
_handler
= lttngust
.loghandler
._Handler
()
74 except (OSError) as e
:
75 dbg
._pwarning
('cannot load library: {}'.format(e
))
78 self
._root
_logger
= logging
.getLogger()
79 self
._root
_logger
.setLevel(logging
.NOTSET
)
81 self
._sessiond
_sock
= None
82 self
._reg
_queue
= reg_queue
83 self
._server
_cmd
_handlers
= {
84 lttngust
.cmd
._ServerCmdRegistrationDone
: self
._handle
_server
_cmd
_reg
_done
,
85 lttngust
.cmd
._ServerCmdEnable
: self
._handle
_server
_cmd
_enable
,
86 lttngust
.cmd
._ServerCmdDisable
: self
._handle
_server
_cmd
_disable
,
87 lttngust
.cmd
._ServerCmdList
: self
._handle
_server
_cmd
_list
,
90 def _debug(self
, msg
):
91 return 'client "{}": {}'.format(self
._name
, msg
)
96 # connect to the session daemon
97 dbg
._pdebug
(self
._debug
('connecting to session daemon'))
98 self
._connect
_to
_sessiond
()
100 # register to the session daemon after a successful connection
101 dbg
._pdebug
(self
._debug
('registering to session daemon'))
104 # wait for commands from the session daemon
105 self
._wait
_server
_cmd
()
106 except (Exception) as e
:
107 # Whatever happens here, we have to close the socket and
108 # retry to connect to the session daemon since either
109 # the socket was closed, a network timeout occured, or
110 # invalid data was received.
111 dbg
._pdebug
(self
._debug
('got exception: {}'.format(e
)))
112 self
._cleanup
_socket
()
113 dbg
._pdebug
(self
._debug
('sleeping for {} s'.format(_RETRY_REG_DELAY
)))
114 time
.sleep(_RETRY_REG_DELAY
)
116 def _recv_server_cmd_header(self
):
117 data
= self
._sessiond
_sock
.recv(lttngust
.cmd
._SERVER
_CMD
_HEADER
_SIZE
)
120 dbg
._pdebug
(self
._debug
('received empty server command header'))
123 assert(len(data
) == lttngust
.cmd
._SERVER
_CMD
_HEADER
_SIZE
)
124 dbg
._pdebug
(self
._debug
('received server command header ({} bytes)'.format(len(data
))))
126 return lttngust
.cmd
._server
_cmd
_header
_from
_data
(data
)
128 def _recv_server_cmd(self
):
129 server_cmd_header
= self
._recv
_server
_cmd
_header
()
131 if server_cmd_header
is None:
134 dbg
._pdebug
(self
._debug
('server command header: data size: {} bytes'.format(server_cmd_header
.data_size
)))
135 dbg
._pdebug
(self
._debug
('server command header: command ID: {}'.format(server_cmd_header
.cmd_id
)))
136 dbg
._pdebug
(self
._debug
('server command header: command version: {}'.format(server_cmd_header
.cmd_version
)))
139 if server_cmd_header
.data_size
> 0:
140 data
= self
._sessiond
_sock
.recv(server_cmd_header
.data_size
)
141 assert(len(data
) == server_cmd_header
.data_size
)
143 return lttngust
.cmd
._server
_cmd
_from
_data
(server_cmd_header
, data
)
145 def _send_cmd_reply(self
, cmd_reply
):
146 data
= cmd_reply
.get_data()
147 dbg
._pdebug
(self
._debug
('sending command reply ({} bytes)'.format(len(data
))))
148 self
._sessiond
_sock
.sendall(data
)
150 def _handle_server_cmd_reg_done(self
, server_cmd
):
151 dbg
._pdebug
(self
._debug
('got "registration done" server command'))
153 if self
._reg
_queue
is not None:
154 dbg
._pdebug
(self
._debug
('notifying _init_threads()'))
157 self
._reg
_queue
.put(True)
158 except (Exception) as e
:
159 # read side could be closed by now; ignore it
162 self
._reg
_queue
= None
164 def _handle_server_cmd_enable(self
, server_cmd
):
165 dbg
._pdebug
(self
._debug
('got "enable" server command'))
168 if self
._ref
_count
== 1:
169 dbg
._pdebug
(self
._debug
('adding our handler to the root logger'))
170 self
._root
_logger
.addHandler(self
._log
_handler
)
172 dbg
._pdebug
(self
._debug
('ref count is {}'.format(self
._ref
_count
)))
174 return lttngust
.cmd
._ClientCmdReplyEnable
()
176 def _handle_server_cmd_disable(self
, server_cmd
):
177 dbg
._pdebug
(self
._debug
('got "disable" server command'))
180 if self
._ref
_count
< 0:
181 # disable command could be sent again when a session is destroyed
184 if self
._ref
_count
== 0:
185 dbg
._pdebug
(self
._debug
('removing our handler from the root logger'))
186 self
._root
_logger
.removeHandler(self
._log
_handler
)
188 dbg
._pdebug
(self
._debug
('ref count is {}'.format(self
._ref
_count
)))
190 return lttngust
.cmd
._ClientCmdReplyDisable
()
192 def _handle_server_cmd_list(self
, server_cmd
):
193 dbg
._pdebug
(self
._debug
('got "list" server command'))
194 names
= logging
.Logger
.manager
.loggerDict
.keys()
195 dbg
._pdebug
(self
._debug
('found {} loggers'.format(len(names
))))
196 cmd_reply
= lttngust
.cmd
._ClientCmdReplyList
(names
=names
)
200 def _handle_server_cmd(self
, server_cmd
):
203 if server_cmd
is None:
204 dbg
._pdebug
(self
._debug
('bad server command'))
205 status
= lttngust
.cmd
._CLIENT
_CMD
_REPLY
_STATUS
_INVALID
_CMD
206 cmd_reply
= lttngust
.cmd
._ClientCmdReply
(status
)
207 elif type(server_cmd
) in self
._server
_cmd
_handlers
:
208 cmd_reply
= self
._server
_cmd
_handlers
[type(server_cmd
)](server_cmd
)
210 dbg
._pdebug
(self
._debug
('unknown server command'))
211 status
= lttngust
.cmd
._CLIENT
_CMD
_REPLY
_STATUS
_INVALID
_CMD
212 cmd_reply
= lttngust
.cmd
._ClientCmdReply
(status
)
214 if cmd_reply
is not None:
215 self
._send
_cmd
_reply
(cmd_reply
)
217 def _wait_server_cmd(self
):
220 server_cmd
= self
._recv
_server
_cmd
()
221 except socket
.timeout
:
222 # simply retry here; the protocol has no KA and we could
226 self
._handle
_server
_cmd
(server_cmd
)
228 def _cleanup_socket(self
):
230 self
._sessiond
_sock
.shutdown(socket
.SHUT_RDWR
)
231 self
._sessiond
_sock
.close()
235 self
._sessiond
_sock
= None
237 def _connect_to_sessiond(self
):
238 # create session daemon TCP socket
239 if self
._sessiond
_sock
is None:
240 self
._sessiond
_sock
= socket
.socket(socket
.AF_INET
,
243 # Use str(self._host) here. Since this host could be a string
244 # literal, and since we're importing __future__.unicode_literals,
245 # we want to make sure the host is a native string in Python 2.
246 # This avoids an indirect module import (unicode module to
247 # decode the unicode string, eventually imported by the
248 # socket module if needed), which is not allowed in a thread
249 # directly created by a module in Python 2 (our case).
251 # tl;dr: Do NOT remove str() here, or this call in Python 2
252 # _will_ block on an interpreter's mutex until the waiting
253 # register queue timeouts.
254 self
._sessiond
_sock
.connect((str(self
._host
), self
._port
))
257 cmd
= lttngust
.cmd
._ClientRegisterCmd
(_PROTO_DOMAIN
, os
.getpid(),
258 _PROTO_MAJOR
, _PROTO_MINOR
)
259 data
= cmd
.get_data()
260 self
._sessiond
_sock
.sendall(data
)
263 def _get_port_from_file(path
):
265 dbg
._pdebug
('reading port from file "{}"'.format(path
))
269 r_port
= int(f
.readline())
272 if r_port
> 0 or r_port
<= 65535:
280 def _get_user_home_path():
281 # $LTTNG_HOME overrides $HOME if it exists
282 return os
.getenv('LTTNG_HOME', os
.path
.expanduser('~'))
286 _SESSIOND_HOST
= '127.0.0.1'
289 def _client_thread_target(name
, port
, reg_queue
):
290 dbg
._pdebug
('creating client "{}" using TCP port {}'.format(name
, port
))
291 client
= _TcpClient(name
, _SESSIOND_HOST
, port
, reg_queue
)
292 dbg
._pdebug
('starting client "{}"'.format(name
))
299 dbg
._pdebug
('entering')
302 dbg
._pdebug
('agent is already initialized')
305 # This makes sure that the appropriate modules for encoding and
306 # decoding strings/bytes are imported now, since no import should
307 # happen within a thread at import time (our case).
308 'lttng'.encode().decode()
311 sys_port
= _get_port_from_file('/var/run/lttng/agent.port')
312 user_port_file
= os
.path
.join(_get_user_home_path(), '.lttng', 'agent.port')
313 user_port
= _get_port_from_file(user_port_file
)
314 reg_queue
= queue
.Queue()
317 dbg
._pdebug
('system session daemon port: {}'.format(sys_port
))
318 dbg
._pdebug
('user session daemon port: {}'.format(user_port
))
320 if sys_port
== user_port
and sys_port
is not None:
321 # The two session daemon ports are the same. This is not normal.
322 # Connect to only one.
323 dbg
._pdebug
('both user and system session daemon have the same port')
327 if sys_port
is not None:
328 dbg
._pdebug
('creating system client thread')
329 t
= threading
.Thread(target
=_client_thread_target
,
330 args
=('system', sys_port
, reg_queue
))
334 dbg
._pdebug
('created and started system client thread')
337 if user_port
is not None:
338 dbg
._pdebug
('creating user client thread')
339 t
= threading
.Thread(target
=_client_thread_target
,
340 args
=('user', user_port
, reg_queue
))
344 dbg
._pdebug
('created and started user client thread')
347 # cannot create threads for some reason; stop this initialization
348 dbg
._pwarning
('cannot create client threads')
351 if reg_expecting
== 0:
352 # early exit: looks like there's not even one valid port
353 dbg
._pwarning
('no valid LTTng session daemon port found (is the session daemon started?)')
356 cur_timeout
= _REG_TIMEOUT
358 # We block here to make sure the agent is properly registered to
359 # the session daemon. If we timeout, the client threads will still
360 # continue to try to connect and register to the session daemon,
361 # but there is no guarantee that all following logging statements
362 # will make it to LTTng-UST.
364 # When a client thread receives a "registration done" confirmation
365 # from the session daemon it's connected to, it puts True in
369 dbg
._pdebug
('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting
,
372 reg_queue
.get(timeout
=cur_timeout
)
375 dbg
._pdebug
('unblocked')
377 if reg_expecting
== 0:
379 dbg
._pdebug
('successfully registered to session daemon(s)')
382 cur_timeout
-= (t2
- t1
)
386 dbg
._pdebug
('ran out of time')
389 dbg
._pdebug
('ran out of time')
392 dbg
._pdebug
('leaving')