2 * Copyright (c) - 2013 Julien Desfossez <jdesfossez@efficios.com>
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License as published by as
6 * published by the Free Software Foundation; only version 2 of the License.
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
26 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <netinet/in.h>
37 #include <lttng/lttng.h>
39 #include <urcu/list.h>
40 #include <bin/lttng-sessiond/session.h>
41 #include <common/common.h>
43 #include <bin/lttng-relayd/lttng-viewer-abi.h>
44 #include <common/index/ctf-index.h>
46 #include <common/compat/endian.h>
48 #define SESSION1 "test1"
49 #define RELAYD_URL "net://localhost"
50 #define LIVE_TIMER 2000000
52 /* Number of TAP tests in this file */
54 #define mmap_size 524288
56 int ust_consumerd32_fd
;
57 int ust_consumerd64_fd
;
59 static int control_sock
;
60 struct live_session
*session
;
62 static int first_packet_offset
;
63 static int first_packet_len
;
64 static int first_packet_stream_id
;
66 struct viewer_stream
{
68 uint64_t ctf_trace_id
;
77 struct viewer_stream
*streams
;
78 uint64_t live_timer_interval
;
79 uint64_t stream_count
;
83 ssize_t
lttng_live_recv(int fd
, void *buf
, size_t len
)
86 size_t copied
= 0, to_copy
= len
;
89 ret
= recv(fd
, buf
+ copied
, to_copy
, 0);
91 assert(ret
<= to_copy
);
95 } while ((ret
> 0 && to_copy
> 0)
96 || (ret
< 0 && errno
== EINTR
));
99 /* ret = 0 means orderly shutdown, ret < 0 is error. */
104 ssize_t
lttng_live_send(int fd
, const void *buf
, size_t len
)
109 ret
= send(fd
, buf
, len
, MSG_NOSIGNAL
);
110 } while (ret
< 0 && errno
== EINTR
);
115 int connect_viewer(char *hostname
)
117 struct hostent
*host
;
118 struct sockaddr_in server_addr
;
121 host
= gethostbyname(hostname
);
127 if ((control_sock
= socket(AF_INET
, SOCK_STREAM
, 0)) == -1) {
133 server_addr
.sin_family
= AF_INET
;
134 server_addr
.sin_port
= htons(5344);
135 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
136 bzero(&(server_addr
.sin_zero
), 8);
138 if (connect(control_sock
, (struct sockaddr
*) &server_addr
,
139 sizeof(struct sockaddr
)) == -1) {
145 server_addr
.sin_family
= AF_INET
;
146 server_addr
.sin_port
= htons(5345);
147 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
148 bzero(&(server_addr
.sin_zero
), 8);
156 int establish_connection(void)
158 struct lttng_viewer_cmd cmd
;
159 struct lttng_viewer_connect connect
;
162 cmd
.cmd
= htobe32(LTTNG_VIEWER_CONNECT
);
163 cmd
.data_size
= sizeof(connect
);
166 memset(&connect
, 0, sizeof(connect
));
167 connect
.major
= htobe32(VERSION_MAJOR
);
168 connect
.minor
= htobe32(VERSION_MINOR
);
169 connect
.type
= htobe32(LTTNG_VIEWER_CLIENT_COMMAND
);
171 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
173 fprintf(stderr
, "Error sending cmd\n");
176 ret_len
= lttng_live_send(control_sock
, &connect
, sizeof(connect
));
178 fprintf(stderr
, "Error sending version\n");
182 ret_len
= lttng_live_recv(control_sock
, &connect
, sizeof(connect
));
184 fprintf(stderr
, "[error] Remote side has closed connection\n");
188 fprintf(stderr
, "Error receiving version\n");
198 * Returns the number of sessions, should be 1 during the unit test.
200 int list_sessions(int *session_id
)
202 struct lttng_viewer_cmd cmd
;
203 struct lttng_viewer_list_sessions list
;
204 struct lttng_viewer_session lsession
;
207 int first_session
= 0;
209 cmd
.cmd
= htobe32(LTTNG_VIEWER_LIST_SESSIONS
);
213 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
215 fprintf(stderr
, "Error sending cmd\n");
219 ret_len
= lttng_live_recv(control_sock
, &list
, sizeof(list
));
221 fprintf(stderr
, "[error] Remote side has closed connection\n");
225 fprintf(stderr
, "Error receiving session list\n");
229 for (i
= 0; i
< be32toh(list
.sessions_count
); i
++) {
230 ret_len
= lttng_live_recv(control_sock
, &lsession
, sizeof(lsession
));
232 fprintf(stderr
, "Error receiving session\n");
235 if (lsession
.streams
> 0 && first_session
<= 0) {
236 first_session
= be64toh(lsession
.id
);
237 *session_id
= first_session
;
241 return be32toh(list
.sessions_count
);
247 int create_viewer_session(void)
249 struct lttng_viewer_cmd cmd
;
250 struct lttng_viewer_create_session_response resp
;
253 cmd
.cmd
= htobe32(LTTNG_VIEWER_CREATE_SESSION
);
257 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
259 fprintf(stderr
, "[error] Error sending cmd\n");
262 assert(ret_len
== sizeof(cmd
));
264 ret_len
= lttng_live_recv(control_sock
, &resp
, sizeof(resp
));
266 fprintf(stderr
, "[error] Remote side has closed connection\n");
270 fprintf(stderr
, "[error] Error receiving create session reply\n");
273 assert(ret_len
== sizeof(resp
));
275 if (be32toh(resp
.status
) != LTTNG_VIEWER_CREATE_SESSION_OK
) {
276 fprintf(stderr
, "[error] Error creating viewer session\n");
285 int attach_session(int id
)
287 struct lttng_viewer_cmd cmd
;
288 struct lttng_viewer_attach_session_request rq
;
289 struct lttng_viewer_attach_session_response rp
;
290 struct lttng_viewer_stream stream
;
294 session
= zmalloc(sizeof(struct live_session
));
299 cmd
.cmd
= htobe32(LTTNG_VIEWER_ATTACH_SESSION
);
300 cmd
.data_size
= sizeof(rq
);
303 memset(&rq
, 0, sizeof(rq
));
304 rq
.session_id
= htobe64(id
);
305 rq
.seek
= htobe32(LTTNG_VIEWER_SEEK_BEGINNING
);
307 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
309 fprintf(stderr
, "Error sending cmd\n");
312 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
314 fprintf(stderr
, "Error sending attach request\n");
318 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
320 fprintf(stderr
, "[error] Remote side has closed connection\n");
324 fprintf(stderr
, "Error receiving attach response\n");
327 if (be32toh(rp
.status
) != LTTNG_VIEWER_ATTACH_OK
) {
331 session
->stream_count
= be32toh(rp
.streams_count
);
332 session
->streams
= zmalloc(session
->stream_count
*
333 sizeof(struct viewer_stream
));
334 if (!session
->streams
) {
338 for (i
= 0; i
< be32toh(rp
.streams_count
); i
++) {
339 ret_len
= lttng_live_recv(control_sock
, &stream
, sizeof(stream
));
341 fprintf(stderr
, "[error] Remote side has closed connection\n");
345 fprintf(stderr
, "Error receiving stream\n");
348 session
->streams
[i
].id
= be64toh(stream
.id
);
350 session
->streams
[i
].ctf_trace_id
= be64toh(stream
.ctf_trace_id
);
351 session
->streams
[i
].first_read
= 1;
352 session
->streams
[i
].mmap_base
= mmap(NULL
, mmap_size
, PROT_READ
| PROT_WRITE
,
353 MAP_PRIVATE
| MAP_ANONYMOUS
, -1, 0);
354 if (session
->streams
[i
].mmap_base
== MAP_FAILED
) {
355 fprintf(stderr
, "mmap error\n");
359 if (be32toh(stream
.metadata_flag
)) {
360 session
->streams
[i
].metadata_flag
= 1;
363 return session
->stream_count
;
369 int get_metadata(void)
371 struct lttng_viewer_cmd cmd
;
372 struct lttng_viewer_get_metadata rq
;
373 struct lttng_viewer_metadata_packet rp
;
379 int metadata_stream_id
= -1;
381 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_METADATA
);
382 cmd
.data_size
= sizeof(rq
);
385 for (i
= 0; i
< session
->stream_count
; i
++) {
386 if (session
->streams
[i
].metadata_flag
) {
387 metadata_stream_id
= i
;
392 if (metadata_stream_id
< 0) {
393 fprintf(stderr
, "No metadata stream found\n");
397 rq
.stream_id
= htobe64(session
->streams
[metadata_stream_id
].id
);
399 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
401 fprintf(stderr
, "Error sending cmd\n");
404 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
406 fprintf(stderr
, "Error sending get_metadata request\n");
409 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
411 fprintf(stderr
, "[error] Remote side has closed connection\n");
415 fprintf(stderr
, "Error receiving metadata response\n");
418 switch (be32toh(rp
.status
)) {
419 case LTTNG_VIEWER_METADATA_OK
:
421 case LTTNG_VIEWER_NO_NEW_METADATA
:
422 fprintf(stderr
, "NO NEW\n");
425 case LTTNG_VIEWER_METADATA_ERR
:
426 fprintf(stderr
, "ERR\n");
429 fprintf(stderr
, "UNKNOWN\n");
433 len
= be64toh(rp
.len
);
440 PERROR("relay data zmalloc");
443 ret_len
= lttng_live_recv(control_sock
, data
, len
);
445 fprintf(stderr
, "[error] Remote side has closed connection\n");
446 goto error_free_data
;
449 fprintf(stderr
, "Error receiving trace packet\n");
450 goto error_free_data
;
463 int get_next_index(void)
465 struct lttng_viewer_cmd cmd
;
466 struct lttng_viewer_get_next_index rq
;
467 struct lttng_viewer_index rp
;
471 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_NEXT_INDEX
);
472 cmd
.data_size
= sizeof(rq
);
475 for (id
= 0; id
< session
->stream_count
; id
++) {
476 if (session
->streams
[id
].metadata_flag
) {
479 memset(&rq
, 0, sizeof(rq
));
480 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
483 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
485 fprintf(stderr
, "Error sending cmd\n");
488 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
490 fprintf(stderr
, "Error sending get_next_index request\n");
493 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
495 fprintf(stderr
, "[error] Remote side has closed connection\n");
499 fprintf(stderr
, "Error receiving index response\n");
503 rp
.flags
= be32toh(rp
.flags
);
505 switch (be32toh(rp
.status
)) {
506 case LTTNG_VIEWER_INDEX_INACTIVE
:
507 fprintf(stderr
, "(INACTIVE)\n");
509 case LTTNG_VIEWER_INDEX_OK
:
511 case LTTNG_VIEWER_INDEX_RETRY
:
514 case LTTNG_VIEWER_INDEX_HUP
:
515 fprintf(stderr
, "(HUP)\n");
516 session
->streams
[id
].id
= -1ULL;
517 session
->streams
[id
].fd
= -1;
519 case LTTNG_VIEWER_INDEX_ERR
:
520 fprintf(stderr
, "(ERR)\n");
523 fprintf(stderr
, "SHOULD NOT HAPPEN\n");
526 if (!first_packet_stream_id
) {
527 first_packet_offset
= be64toh(rp
.offset
);
528 first_packet_len
= be64toh(rp
.packet_size
) / CHAR_BIT
;
529 first_packet_stream_id
= id
;
539 int get_data_packet(int id
, uint64_t offset
,
542 struct lttng_viewer_cmd cmd
;
543 struct lttng_viewer_get_packet rq
;
544 struct lttng_viewer_trace_packet rp
;
547 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_PACKET
);
548 cmd
.data_size
= sizeof(rq
);
551 memset(&rq
, 0, sizeof(rq
));
552 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
553 /* Already in big endian. */
555 rq
.len
= htobe32(len
);
557 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
559 fprintf(stderr
, "Error sending cmd\n");
562 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
564 fprintf(stderr
, "Error sending get_data_packet request\n");
567 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
569 fprintf(stderr
, "[error] Remote side has closed connection\n");
573 fprintf(stderr
, "Error receiving data response\n");
576 rp
.flags
= be32toh(rp
.flags
);
578 switch (be32toh(rp
.status
)) {
579 case LTTNG_VIEWER_GET_PACKET_OK
:
580 len
= be32toh(rp
.len
);
582 case LTTNG_VIEWER_GET_PACKET_RETRY
:
583 fprintf(stderr
, "RETRY\n");
585 case LTTNG_VIEWER_GET_PACKET_ERR
:
586 if (rp
.flags
& LTTNG_VIEWER_FLAG_NEW_METADATA
) {
587 fprintf(stderr
, "NEW_METADATA\n");
590 fprintf(stderr
, "ERR\n");
593 fprintf(stderr
, "UNKNOWN\n");
601 if (len
> mmap_size
) {
602 fprintf(stderr
, "mmap_size not big enough\n");
606 ret_len
= lttng_live_recv(control_sock
, session
->streams
[id
].mmap_base
, len
);
608 fprintf(stderr
, "[error] Remote side has closed connection\n");
612 fprintf(stderr
, "Error receiving trace packet\n");
621 int main(int argc
, char **argv
)
626 plan_tests(NUM_TESTS
);
628 diag("Live unit tests");
630 ret
= connect_viewer("localhost");
631 ok(ret
== 0, "Connect viewer to relayd");
633 ret
= establish_connection();
634 ok(ret
== 0, "Established connection and version check with %d.%d",
635 VERSION_MAJOR
, VERSION_MINOR
);
637 ret
= list_sessions(&session_id
);
638 ok(ret
> 0, "List sessions : %d session(s)", ret
);
640 ret
= create_viewer_session();
641 ok(ret
== 0, "Create viewer session");
643 ret
= attach_session(session_id
);
644 ok(ret
> 0, "Attach to session, %d streams received", ret
);
646 ret
= get_metadata();
647 ok(ret
> 0, "Get metadata, received %d bytes", ret
);
649 ret
= get_next_index();
650 ok(ret
== 0, "Get one index per stream");
652 ret
= get_data_packet(first_packet_stream_id
, first_packet_offset
,
655 "Get one data packet for stream %d, offset %d, len %d",
656 first_packet_stream_id
, first_packet_offset
,
659 return exit_status();