2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
4 * SPDX-License-Identifier: GPL-2.0-only
12 #include <common/compat/time.hpp>
13 #include <sys/types.h>
16 #include <sys/socket.h>
17 #include <netinet/in.h>
23 #include <lttng/lttng.h>
25 #include <urcu/list.h>
26 #include <common/common.hpp>
28 #include <bin/lttng-relayd/lttng-viewer-abi.hpp>
29 #include <common/index/ctf-index.hpp>
31 #include <common/compat/errno.hpp>
32 #include <common/compat/endian.hpp>
36 #define SESSION1 "test1"
37 #define RELAYD_URL "net://localhost"
38 #define LIVE_TIMER 2000000
40 /* Number of TAP tests in this file */
42 #define mmap_size 524288
44 #ifdef HAVE_LIBLTTNG_UST_CTL
45 #include <lttng/lttng-export.h>
46 #include <lttng/ust-sigbus.h>
47 LTTNG_EXPORT
DEFINE_LTTNG_UST_SIGBUS_STATE();
51 struct live_session
*session
;
54 int first_packet_offset
;
56 int first_packet_stream_id
= -1;
58 struct viewer_stream
{
60 uint64_t ctf_trace_id
;
69 struct viewer_stream
*streams
;
70 uint64_t live_timer_interval
;
71 uint64_t stream_count
;
76 ssize_t
lttng_live_recv(int fd
, void *buf
, size_t len
)
79 size_t copied
= 0, to_copy
= len
;
82 ret
= recv(fd
, (char *) buf
+ copied
, to_copy
, 0);
84 LTTNG_ASSERT(ret
<= to_copy
);
88 } while ((ret
> 0 && to_copy
> 0)
89 || (ret
< 0 && errno
== EINTR
));
92 /* ret = 0 means orderly shutdown, ret < 0 is error. */
97 ssize_t
lttng_live_send(int fd
, const void *buf
, size_t len
)
102 ret
= send(fd
, buf
, len
, MSG_NOSIGNAL
);
103 } while (ret
< 0 && errno
== EINTR
);
108 int connect_viewer(const char *hostname
)
110 struct hostent
*host
;
111 struct sockaddr_in server_addr
;
114 host
= gethostbyname(hostname
);
120 if ((control_sock
= socket(AF_INET
, SOCK_STREAM
, 0)) == -1) {
126 server_addr
.sin_family
= AF_INET
;
127 server_addr
.sin_port
= htons(5344);
128 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
129 bzero(&(server_addr
.sin_zero
), 8);
131 if (connect(control_sock
, (struct sockaddr
*) &server_addr
,
132 sizeof(struct sockaddr
)) == -1) {
138 server_addr
.sin_family
= AF_INET
;
139 server_addr
.sin_port
= htons(5345);
140 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
141 bzero(&(server_addr
.sin_zero
), 8);
150 int establish_connection(void)
152 struct lttng_viewer_cmd cmd
;
153 struct lttng_viewer_connect connect
;
156 cmd
.cmd
= htobe32(LTTNG_VIEWER_CONNECT
);
157 cmd
.data_size
= htobe64(sizeof(connect
));
158 cmd
.cmd_version
= htobe32(0);
160 memset(&connect
, 0, sizeof(connect
));
161 connect
.major
= htobe32(VERSION_MAJOR
);
162 connect
.minor
= htobe32(VERSION_MINOR
);
163 connect
.type
= htobe32(LTTNG_VIEWER_CLIENT_COMMAND
);
165 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
167 diag("Error sending cmd");
170 ret_len
= lttng_live_send(control_sock
, &connect
, sizeof(connect
));
172 diag("Error sending version");
176 ret_len
= lttng_live_recv(control_sock
, &connect
, sizeof(connect
));
178 diag("[error] Remote side has closed connection");
182 diag("Error receiving version");
192 * Returns the number of sessions, should be 1 during the unit test.
195 int list_sessions(uint64_t *session_id
)
197 struct lttng_viewer_cmd cmd
;
198 struct lttng_viewer_list_sessions list
;
199 struct lttng_viewer_session lsession
;
202 int first_session
= 0;
204 cmd
.cmd
= htobe32(LTTNG_VIEWER_LIST_SESSIONS
);
205 cmd
.data_size
= htobe64(0);
206 cmd
.cmd_version
= htobe32(0);
208 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
210 diag("Error sending cmd");
214 ret_len
= lttng_live_recv(control_sock
, &list
, sizeof(list
));
216 diag("[error] Remote side has closed connection");
220 diag("Error receiving session list");
224 for (i
= 0; i
< be32toh(list
.sessions_count
); i
++) {
225 ret_len
= lttng_live_recv(control_sock
, &lsession
, sizeof(lsession
));
227 diag("Error receiving session");
230 if (lsession
.streams
> 0 && first_session
<= 0) {
231 first_session
= be64toh(lsession
.id
);
232 *session_id
= first_session
;
236 return be32toh(list
.sessions_count
);
243 int create_viewer_session(void)
245 struct lttng_viewer_cmd cmd
;
246 struct lttng_viewer_create_session_response resp
;
249 cmd
.cmd
= htobe32(LTTNG_VIEWER_CREATE_SESSION
);
250 cmd
.data_size
= htobe64(0);
251 cmd
.cmd_version
= htobe32(0);
253 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
255 diag("[error] Error sending cmd");
258 LTTNG_ASSERT(ret_len
== sizeof(cmd
));
260 ret_len
= lttng_live_recv(control_sock
, &resp
, sizeof(resp
));
262 diag("[error] Remote side has closed connection");
266 diag("[error] Error receiving create session reply");
269 LTTNG_ASSERT(ret_len
== sizeof(resp
));
271 if (be32toh(resp
.status
) != LTTNG_VIEWER_CREATE_SESSION_OK
) {
272 diag("[error] Error creating viewer session");
282 int attach_session(uint64_t id
)
284 struct lttng_viewer_cmd cmd
;
285 struct lttng_viewer_attach_session_request rq
;
286 struct lttng_viewer_attach_session_response rp
;
287 struct lttng_viewer_stream stream
;
291 session
= zmalloc
<live_session
>();
296 cmd
.cmd
= htobe32(LTTNG_VIEWER_ATTACH_SESSION
);
297 cmd
.data_size
= htobe64(sizeof(rq
));
298 cmd
.cmd_version
= htobe32(0);
300 memset(&rq
, 0, sizeof(rq
));
301 rq
.session_id
= htobe64(id
);
302 rq
.seek
= htobe32(LTTNG_VIEWER_SEEK_BEGINNING
);
304 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
306 diag("Error sending cmd LTTNG_VIEWER_ATTACH_SESSION");
309 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
311 diag("Error sending attach request");
315 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
317 diag("[error] Remote side has closed connection");
321 diag("Error receiving attach response");
324 if (be32toh(rp
.status
) != LTTNG_VIEWER_ATTACH_OK
) {
328 session
->stream_count
= be32toh(rp
.streams_count
);
329 if (session
->stream_count
== 0) {
330 diag("Got session stream count == 0");
333 session
->streams
= calloc
<viewer_stream
>(session
->stream_count
);
334 if (!session
->streams
) {
338 for (i
= 0; i
< be32toh(rp
.streams_count
); i
++) {
339 ret_len
= lttng_live_recv(control_sock
, &stream
, sizeof(stream
));
341 diag("[error] Remote side has closed connection");
345 diag("Error receiving stream");
348 session
->streams
[i
].id
= be64toh(stream
.id
);
350 session
->streams
[i
].ctf_trace_id
= be64toh(stream
.ctf_trace_id
);
351 session
->streams
[i
].first_read
= 1;
352 session
->streams
[i
].mmap_base
= mmap(NULL
, mmap_size
,
353 PROT_READ
| PROT_WRITE
,
354 MAP_PRIVATE
| MAP_ANONYMOUS
, -1, 0);
355 if (session
->streams
[i
].mmap_base
== MAP_FAILED
) {
360 if (be32toh(stream
.metadata_flag
)) {
361 session
->streams
[i
].metadata_flag
= 1;
364 return session
->stream_count
;
371 int get_metadata(void)
373 struct lttng_viewer_cmd cmd
;
374 struct lttng_viewer_get_metadata rq
;
375 struct lttng_viewer_metadata_packet rp
;
381 int metadata_stream_id
= -1;
383 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_METADATA
);
384 cmd
.data_size
= htobe64(sizeof(rq
));
385 cmd
.cmd_version
= htobe32(0);
387 for (i
= 0; i
< session
->stream_count
; i
++) {
388 if (session
->streams
[i
].metadata_flag
) {
389 metadata_stream_id
= i
;
394 if (metadata_stream_id
< 0) {
395 diag("No metadata stream found");
399 rq
.stream_id
= htobe64(session
->streams
[metadata_stream_id
].id
);
402 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
404 diag("Error sending cmd");
407 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
409 diag("Error sending get_metadata request");
412 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
414 diag("[error] Remote side has closed connection");
418 diag("Error receiving metadata response");
421 switch (be32toh(rp
.status
)) {
422 case LTTNG_VIEWER_METADATA_OK
:
424 case LTTNG_VIEWER_NO_NEW_METADATA
:
425 diag("Got LTTNG_VIEWER_NO_NEW_METADATA:");
428 case LTTNG_VIEWER_METADATA_ERR
:
429 diag("Got LTTNG_VIEWER_METADATA_ERR:");
432 diag("Got unknown status during LTTNG_VIEWER_GET_METADATA");
436 len
= be64toh(rp
.len
);
441 data
= calloc
<char>(len
);
443 PERROR("relay data zmalloc");
446 ret_len
= lttng_live_recv(control_sock
, data
, len
);
448 diag("[error] Remote side has closed connection");
449 goto error_free_data
;
452 diag("Error receiving trace packet");
453 goto error_free_data
;
467 int get_next_index(void)
469 struct lttng_viewer_cmd cmd
;
470 struct lttng_viewer_get_next_index rq
;
471 struct lttng_viewer_index rp
;
475 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_NEXT_INDEX
);
476 cmd
.data_size
= htobe64(sizeof(rq
));
477 cmd
.cmd_version
= htobe32(0);
479 for (id
= 0; id
< session
->stream_count
; id
++) {
480 if (session
->streams
[id
].metadata_flag
) {
483 memset(&rq
, 0, sizeof(rq
));
484 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
487 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
489 diag("Error sending cmd");
492 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
494 diag("Error sending get_next_index request");
497 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
499 diag("[error] Remote side has closed connection");
503 diag("Error receiving index response");
507 rp
.flags
= be32toh(rp
.flags
);
509 switch (be32toh(rp
.status
)) {
510 case LTTNG_VIEWER_INDEX_INACTIVE
:
511 /* Skip this stream. */
512 diag("Got LTTNG_VIEWER_INDEX_INACTIVE");
514 case LTTNG_VIEWER_INDEX_OK
:
516 case LTTNG_VIEWER_INDEX_RETRY
:
519 case LTTNG_VIEWER_INDEX_HUP
:
520 diag("Got LTTNG_VIEWER_INDEX_HUP");
521 session
->streams
[id
].id
= -1ULL;
522 session
->streams
[id
].fd
= -1;
524 case LTTNG_VIEWER_INDEX_ERR
:
525 diag("Got LTTNG_VIEWER_INDEX_ERR");
528 diag("Unknown reply status during LTTNG_VIEWER_GET_NEXT_INDEX (%d)", be32toh(rp
.status
));
531 if (first_packet_stream_id
< 0) {
533 * Initialize the first packet stream id. That is,
534 * the first active stream encoutered.
536 first_packet_offset
= be64toh(rp
.offset
);
537 first_packet_len
= be64toh(rp
.packet_size
) / CHAR_BIT
;
538 first_packet_stream_id
= id
;
539 diag("Got first packet index with offset %d and len %d",
540 first_packet_offset
, first_packet_len
);
550 int get_data_packet(int id
, uint64_t offset
,
553 struct lttng_viewer_cmd cmd
;
554 struct lttng_viewer_get_packet rq
;
555 struct lttng_viewer_trace_packet rp
;
558 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_PACKET
);
559 cmd
.data_size
= htobe64(sizeof(rq
));
560 cmd
.cmd_version
= htobe32(0);
562 memset(&rq
, 0, sizeof(rq
));
563 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
564 /* Already in big endian. */
566 rq
.len
= htobe32(len
);
568 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
570 diag("Error sending cmd");
573 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
575 diag("Error sending get_data_packet request");
578 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
580 diag("[error] Remote side has closed connection");
584 diag("Error receiving data response");
587 rp
.flags
= be32toh(rp
.flags
);
589 switch (be32toh(rp
.status
)) {
590 case LTTNG_VIEWER_GET_PACKET_OK
:
591 len
= be32toh(rp
.len
);
593 diag("Got LTTNG_VIEWER_GET_PACKET_OK, but len == 0");
597 case LTTNG_VIEWER_GET_PACKET_RETRY
:
598 diag("Got LTTNG_VIEWER_GET_PACKET_RETRY:");
600 case LTTNG_VIEWER_GET_PACKET_ERR
:
601 if (rp
.flags
& LTTNG_VIEWER_FLAG_NEW_METADATA
) {
602 diag("Got LTTNG_VIEWER_GET_PACKET_ERR with NEW_METADATA flag");
605 diag("Got LTTNG_VIEWER_GET_PACKET_ERR:");
608 diag("Got unknown status code during LTTNG_VIEWER_GET_PACKET");
612 if (len
> mmap_size
) {
613 diag("mmap_size not big enough");
617 ret_len
= lttng_live_recv(control_sock
, session
->streams
[id
].mmap_base
, len
);
619 diag("[error] Remote side has closed connection");
623 diag("Error receiving trace packet");
633 int detach_viewer_session(uint64_t id
)
635 struct lttng_viewer_cmd cmd
;
636 struct lttng_viewer_detach_session_response resp
;
637 struct lttng_viewer_detach_session_request rq
;
641 cmd
.cmd
= htobe32(LTTNG_VIEWER_DETACH_SESSION
);
642 cmd
.data_size
= htobe64(sizeof(rq
));
643 cmd
.cmd_version
= htobe32(0);
645 memset(&rq
, 0, sizeof(rq
));
646 rq
.session_id
= htobe64(id
);
648 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
650 fprintf(stderr
, "[error] Error sending cmd\n");
655 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
657 fprintf(stderr
, "Error sending attach request\n");
662 ret_len
= lttng_live_recv(control_sock
, &resp
, sizeof(resp
));
664 fprintf(stderr
, "[error] Error receiving detach session reply\n");
669 if (be32toh(resp
.status
) != LTTNG_VIEWER_DETACH_SESSION_OK
) {
670 fprintf(stderr
, "[error] Error detaching viewer session\n");
685 plan_tests(NUM_TESTS
);
687 diag("Live unit tests");
689 ret
= connect_viewer("localhost");
690 ok(ret
== 0, "Connect viewer to relayd");
692 ret
= establish_connection();
693 ok(ret
== 0, "Established connection and version check with %d.%d",
694 VERSION_MAJOR
, VERSION_MINOR
);
696 ret
= list_sessions(&session_id
);
697 ok(ret
> 0, "List sessions : %d session(s)", ret
);
702 ret
= create_viewer_session();
703 ok(ret
== 0, "Create viewer session");
705 ret
= attach_session(session_id
);
706 ok(ret
> 0, "Attach to session, %d stream(s) received", ret
);
708 ret
= get_metadata();
709 ok(ret
> 0, "Get metadata, received %d bytes", ret
);
711 ret
= get_next_index();
712 ok(ret
== 0, "Get one index per stream");
714 ret
= get_data_packet(first_packet_stream_id
, first_packet_offset
,
717 "Get one data packet for stream %d, offset %d, len %d",
718 first_packet_stream_id
, first_packet_offset
,
721 ret
= detach_viewer_session(session_id
);
722 ok(ret
== 0, "Detach viewer session");
724 ret
= list_sessions(&session_id
);
725 ok(ret
> 0, "List sessions : %d session(s)", ret
);
727 ret
= attach_session(session_id
);
728 ok(ret
> 0, "Attach to session, %d streams received", ret
);
730 return exit_status();