1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2015 - Philippe Proulx <pproulx@efficios.com>
4 # Copyright (C) 2014 - David Goulet <dgoulet@efficios.com>
6 # This library is free software; you can redistribute it and/or modify it under
7 # the terms of the GNU Lesser General Public License as published by the Free
8 # Software Foundation; version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 # You should have received a copy of the GNU Lesser General Public License
16 # along with this library; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 from __future__
import unicode_literals
20 from __future__
import print_function
21 from __future__
import division
22 import lttngust
.debug
as dbg
23 import lttngust
.loghandler
24 import lttngust
.compat
48 def _get_env_value_ms(key
, default_s
):
50 val
= int(os
.getenv(key
, default_s
* 1000)) / 1000
55 fmt
= 'invalid ${} value; {} seconds will be used'
56 dbg
._pwarning
(fmt
.format(key
, default_s
))
62 _REG_TIMEOUT
= _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5)
63 _RETRY_REG_DELAY
= _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3)
66 class _TcpClient(object):
67 def __init__(self
, name
, host
, port
, reg_queue
):
68 super(self
.__class
__, self
).__init
__()
74 self
._log
_handler
= lttngust
.loghandler
._Handler
()
75 except (OSError) as e
:
76 dbg
._pwarning
('cannot load library: {}'.format(e
))
79 self
._root
_logger
= logging
.getLogger()
80 self
._root
_logger
.setLevel(logging
.NOTSET
)
82 self
._sessiond
_sock
= None
83 self
._reg
_queue
= reg_queue
84 self
._server
_cmd
_handlers
= {
85 lttngust
.cmd
._ServerCmdRegistrationDone
: self
._handle
_server
_cmd
_reg
_done
,
86 lttngust
.cmd
._ServerCmdEnable
: self
._handle
_server
_cmd
_enable
,
87 lttngust
.cmd
._ServerCmdDisable
: self
._handle
_server
_cmd
_disable
,
88 lttngust
.cmd
._ServerCmdList
: self
._handle
_server
_cmd
_list
,
91 def _debug(self
, msg
):
92 return 'client "{}": {}'.format(self
._name
, msg
)
97 # connect to the session daemon
98 dbg
._pdebug
(self
._debug
('connecting to session daemon'))
99 self
._connect
_to
_sessiond
()
101 # register to the session daemon after a successful connection
102 dbg
._pdebug
(self
._debug
('registering to session daemon'))
105 # wait for commands from the session daemon
106 self
._wait
_server
_cmd
()
107 except (Exception) as e
:
108 # Whatever happens here, we have to close the socket and
109 # retry to connect to the session daemon since either
110 # the socket was closed, a network timeout occured, or
111 # invalid data was received.
112 dbg
._pdebug
(self
._debug
('got exception: {}'.format(e
)))
113 self
._cleanup
_socket
()
114 dbg
._pdebug
(self
._debug
('sleeping for {} s'.format(_RETRY_REG_DELAY
)))
115 time
.sleep(_RETRY_REG_DELAY
)
117 def _recv_server_cmd_header(self
):
118 data
= self
._sessiond
_sock
.recv(lttngust
.cmd
._SERVER
_CMD
_HEADER
_SIZE
)
121 dbg
._pdebug
(self
._debug
('received empty server command header'))
124 assert(len(data
) == lttngust
.cmd
._SERVER
_CMD
_HEADER
_SIZE
)
125 dbg
._pdebug
(self
._debug
('received server command header ({} bytes)'.format(len(data
))))
127 return lttngust
.cmd
._server
_cmd
_header
_from
_data
(data
)
129 def _recv_server_cmd(self
):
130 server_cmd_header
= self
._recv
_server
_cmd
_header
()
132 if server_cmd_header
is None:
135 dbg
._pdebug
(self
._debug
('server command header: data size: {} bytes'.format(server_cmd_header
.data_size
)))
136 dbg
._pdebug
(self
._debug
('server command header: command ID: {}'.format(server_cmd_header
.cmd_id
)))
137 dbg
._pdebug
(self
._debug
('server command header: command version: {}'.format(server_cmd_header
.cmd_version
)))
140 if server_cmd_header
.data_size
> 0:
141 data
= self
._sessiond
_sock
.recv(server_cmd_header
.data_size
)
142 assert(len(data
) == server_cmd_header
.data_size
)
144 return lttngust
.cmd
._server
_cmd
_from
_data
(server_cmd_header
, data
)
146 def _send_cmd_reply(self
, cmd_reply
):
147 data
= cmd_reply
.get_data()
148 dbg
._pdebug
(self
._debug
('sending command reply ({} bytes)'.format(len(data
))))
149 self
._sessiond
_sock
.sendall(data
)
151 def _handle_server_cmd_reg_done(self
, server_cmd
):
152 dbg
._pdebug
(self
._debug
('got "registration done" server command'))
154 if self
._reg
_queue
is not None:
155 dbg
._pdebug
(self
._debug
('notifying _init_threads()'))
158 self
._reg
_queue
.put(True)
159 except (Exception) as e
:
160 # read side could be closed by now; ignore it
163 self
._reg
_queue
= None
165 def _handle_server_cmd_enable(self
, server_cmd
):
166 dbg
._pdebug
(self
._debug
('got "enable" server command'))
169 if self
._ref
_count
== 1:
170 dbg
._pdebug
(self
._debug
('adding our handler to the root logger'))
171 self
._root
_logger
.addHandler(self
._log
_handler
)
173 dbg
._pdebug
(self
._debug
('ref count is {}'.format(self
._ref
_count
)))
175 return lttngust
.cmd
._ClientCmdReplyEnable
()
177 def _handle_server_cmd_disable(self
, server_cmd
):
178 dbg
._pdebug
(self
._debug
('got "disable" server command'))
181 if self
._ref
_count
< 0:
182 # disable command could be sent again when a session is destroyed
185 if self
._ref
_count
== 0:
186 dbg
._pdebug
(self
._debug
('removing our handler from the root logger'))
187 self
._root
_logger
.removeHandler(self
._log
_handler
)
189 dbg
._pdebug
(self
._debug
('ref count is {}'.format(self
._ref
_count
)))
191 return lttngust
.cmd
._ClientCmdReplyDisable
()
193 def _handle_server_cmd_list(self
, server_cmd
):
194 dbg
._pdebug
(self
._debug
('got "list" server command'))
195 names
= logging
.Logger
.manager
.loggerDict
.keys()
196 dbg
._pdebug
(self
._debug
('found {} loggers'.format(len(names
))))
197 cmd_reply
= lttngust
.cmd
._ClientCmdReplyList
(names
=names
)
201 def _handle_server_cmd(self
, server_cmd
):
204 if server_cmd
is None:
205 dbg
._pdebug
(self
._debug
('bad server command'))
206 status
= lttngust
.cmd
._CLIENT
_CMD
_REPLY
_STATUS
_INVALID
_CMD
207 cmd_reply
= lttngust
.cmd
._ClientCmdReply
(status
)
208 elif type(server_cmd
) in self
._server
_cmd
_handlers
:
209 cmd_reply
= self
._server
_cmd
_handlers
[type(server_cmd
)](server_cmd
)
211 dbg
._pdebug
(self
._debug
('unknown server command'))
212 status
= lttngust
.cmd
._CLIENT
_CMD
_REPLY
_STATUS
_INVALID
_CMD
213 cmd_reply
= lttngust
.cmd
._ClientCmdReply
(status
)
215 if cmd_reply
is not None:
216 self
._send
_cmd
_reply
(cmd_reply
)
218 def _wait_server_cmd(self
):
221 server_cmd
= self
._recv
_server
_cmd
()
222 except socket
.timeout
:
223 # simply retry here; the protocol has no KA and we could
227 self
._handle
_server
_cmd
(server_cmd
)
229 def _cleanup_socket(self
):
231 self
._sessiond
_sock
.shutdown(socket
.SHUT_RDWR
)
232 self
._sessiond
_sock
.close()
236 self
._sessiond
_sock
= None
238 def _connect_to_sessiond(self
):
239 # create session daemon TCP socket
240 if self
._sessiond
_sock
is None:
241 self
._sessiond
_sock
= socket
.socket(socket
.AF_INET
,
244 # Use str(self._host) here. Since this host could be a string
245 # literal, and since we're importing __future__.unicode_literals,
246 # we want to make sure the host is a native string in Python 2.
247 # This avoids an indirect module import (unicode module to
248 # decode the unicode string, eventually imported by the
249 # socket module if needed), which is not allowed in a thread
250 # directly created by a module in Python 2 (our case).
252 # tl;dr: Do NOT remove str() here, or this call in Python 2
253 # _will_ block on an interpreter's mutex until the waiting
254 # register queue timeouts.
255 self
._sessiond
_sock
.connect((str(self
._host
), self
._port
))
258 cmd
= lttngust
.cmd
._ClientRegisterCmd
(_PROTO_DOMAIN
, os
.getpid(),
259 _PROTO_MAJOR
, _PROTO_MINOR
)
260 data
= cmd
.get_data()
261 self
._sessiond
_sock
.sendall(data
)
264 def _get_port_from_file(path
):
266 dbg
._pdebug
('reading port from file "{}"'.format(path
))
270 r_port
= int(f
.readline())
273 if r_port
> 0 or r_port
<= 65535:
281 def _get_user_home_path():
282 # $LTTNG_HOME overrides $HOME if it exists
283 return os
.getenv('LTTNG_HOME', os
.path
.expanduser('~'))
287 _SESSIOND_HOST
= '127.0.0.1'
290 def _client_thread_target(name
, port
, reg_queue
):
291 dbg
._pdebug
('creating client "{}" using TCP port {}'.format(name
, port
))
292 client
= _TcpClient(name
, _SESSIOND_HOST
, port
, reg_queue
)
293 dbg
._pdebug
('starting client "{}"'.format(name
))
300 dbg
._pdebug
('entering')
303 dbg
._pdebug
('agent is already initialized')
306 # This makes sure that the appropriate modules for encoding and
307 # decoding strings/bytes are imported now, since no import should
308 # happen within a thread at import time (our case).
309 'lttng'.encode().decode()
312 sys_port
= _get_port_from_file('/var/run/lttng/agent.port')
313 user_port_file
= os
.path
.join(_get_user_home_path(), '.lttng', 'agent.port')
314 user_port
= _get_port_from_file(user_port_file
)
315 reg_queue
= queue
.Queue()
318 dbg
._pdebug
('system session daemon port: {}'.format(sys_port
))
319 dbg
._pdebug
('user session daemon port: {}'.format(user_port
))
321 if sys_port
== user_port
and sys_port
is not None:
322 # The two session daemon ports are the same. This is not normal.
323 # Connect to only one.
324 dbg
._pdebug
('both user and system session daemon have the same port')
328 if sys_port
is not None:
329 dbg
._pdebug
('creating system client thread')
330 t
= threading
.Thread(target
=_client_thread_target
,
331 args
=('system', sys_port
, reg_queue
))
335 dbg
._pdebug
('created and started system client thread')
338 if user_port
is not None:
339 dbg
._pdebug
('creating user client thread')
340 t
= threading
.Thread(target
=_client_thread_target
,
341 args
=('user', user_port
, reg_queue
))
345 dbg
._pdebug
('created and started user client thread')
348 # cannot create threads for some reason; stop this initialization
349 dbg
._pwarning
('cannot create client threads')
352 if reg_expecting
== 0:
353 # early exit: looks like there's not even one valid port
354 dbg
._pwarning
('no valid LTTng session daemon port found (is the session daemon started?)')
357 cur_timeout
= _REG_TIMEOUT
359 # We block here to make sure the agent is properly registered to
360 # the session daemon. If we timeout, the client threads will still
361 # continue to try to connect and register to the session daemon,
362 # but there is no guarantee that all following logging statements
363 # will make it to LTTng-UST.
365 # When a client thread receives a "registration done" confirmation
366 # from the session daemon it's connected to, it puts True in
370 dbg
._pdebug
('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting
,
372 t1
= lttngust
.compat
._clock
()
373 reg_queue
.get(timeout
=cur_timeout
)
374 t2
= lttngust
.compat
._clock
()
376 dbg
._pdebug
('unblocked')
378 if reg_expecting
== 0:
380 dbg
._pdebug
('successfully registered to session daemon(s)')
383 cur_timeout
-= (t2
- t1
)
387 dbg
._pdebug
('ran out of time')
390 dbg
._pdebug
('ran out of time')
393 dbg
._pdebug
('leaving')