2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
4 * SPDX-License-Identifier: GPL-2.0-only
12 #include <common/compat/time.h>
13 #include <sys/types.h>
16 #include <sys/socket.h>
17 #include <netinet/in.h>
24 #include <lttng/lttng.h>
26 #include <urcu/list.h>
27 #include <common/common.h>
29 #include <bin/lttng-relayd/lttng-viewer-abi.h>
30 #include <common/index/ctf-index.h>
32 #include <common/compat/errno.h>
33 #include <common/compat/endian.h>
35 #define SESSION1 "test1"
36 #define RELAYD_URL "net://localhost"
37 #define LIVE_TIMER 2000000
39 /* Number of TAP tests in this file */
41 #define mmap_size 524288
43 #ifdef HAVE_LIBLTTNG_UST_CTL
44 #include <lttng/ust-sigbus.h>
45 DEFINE_LTTNG_UST_SIGBUS_STATE();
48 static int control_sock
;
49 struct live_session
*session
;
51 static int first_packet_offset
;
52 static int first_packet_len
;
53 static int first_packet_stream_id
= -1;
55 struct viewer_stream
{
57 uint64_t ctf_trace_id
;
66 struct viewer_stream
*streams
;
67 uint64_t live_timer_interval
;
68 uint64_t stream_count
;
72 ssize_t
lttng_live_recv(int fd
, void *buf
, size_t len
)
75 size_t copied
= 0, to_copy
= len
;
78 ret
= recv(fd
, buf
+ copied
, to_copy
, 0);
80 LTTNG_ASSERT(ret
<= to_copy
);
84 } while ((ret
> 0 && to_copy
> 0)
85 || (ret
< 0 && errno
== EINTR
));
88 /* ret = 0 means orderly shutdown, ret < 0 is error. */
93 ssize_t
lttng_live_send(int fd
, const void *buf
, size_t len
)
98 ret
= send(fd
, buf
, len
, MSG_NOSIGNAL
);
99 } while (ret
< 0 && errno
== EINTR
);
104 int connect_viewer(const char *hostname
)
106 struct hostent
*host
;
107 struct sockaddr_in server_addr
;
110 host
= gethostbyname(hostname
);
116 if ((control_sock
= socket(AF_INET
, SOCK_STREAM
, 0)) == -1) {
122 server_addr
.sin_family
= AF_INET
;
123 server_addr
.sin_port
= htons(5344);
124 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
125 bzero(&(server_addr
.sin_zero
), 8);
127 if (connect(control_sock
, (struct sockaddr
*) &server_addr
,
128 sizeof(struct sockaddr
)) == -1) {
134 server_addr
.sin_family
= AF_INET
;
135 server_addr
.sin_port
= htons(5345);
136 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
137 bzero(&(server_addr
.sin_zero
), 8);
146 int establish_connection(void)
148 struct lttng_viewer_cmd cmd
;
149 struct lttng_viewer_connect connect
;
152 cmd
.cmd
= htobe32(LTTNG_VIEWER_CONNECT
);
153 cmd
.data_size
= htobe64(sizeof(connect
));
154 cmd
.cmd_version
= htobe32(0);
156 memset(&connect
, 0, sizeof(connect
));
157 connect
.major
= htobe32(VERSION_MAJOR
);
158 connect
.minor
= htobe32(VERSION_MINOR
);
159 connect
.type
= htobe32(LTTNG_VIEWER_CLIENT_COMMAND
);
161 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
163 diag("Error sending cmd");
166 ret_len
= lttng_live_send(control_sock
, &connect
, sizeof(connect
));
168 diag("Error sending version");
172 ret_len
= lttng_live_recv(control_sock
, &connect
, sizeof(connect
));
174 diag("[error] Remote side has closed connection");
178 diag("Error receiving version");
188 * Returns the number of sessions, should be 1 during the unit test.
191 int list_sessions(uint64_t *session_id
)
193 struct lttng_viewer_cmd cmd
;
194 struct lttng_viewer_list_sessions list
;
195 struct lttng_viewer_session lsession
;
198 int first_session
= 0;
200 cmd
.cmd
= htobe32(LTTNG_VIEWER_LIST_SESSIONS
);
201 cmd
.data_size
= htobe64(0);
202 cmd
.cmd_version
= htobe32(0);
204 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
206 diag("Error sending cmd");
210 ret_len
= lttng_live_recv(control_sock
, &list
, sizeof(list
));
212 diag("[error] Remote side has closed connection");
216 diag("Error receiving session list");
220 for (i
= 0; i
< be32toh(list
.sessions_count
); i
++) {
221 ret_len
= lttng_live_recv(control_sock
, &lsession
, sizeof(lsession
));
223 diag("Error receiving session");
226 if (lsession
.streams
> 0 && first_session
<= 0) {
227 first_session
= be64toh(lsession
.id
);
228 *session_id
= first_session
;
232 return be32toh(list
.sessions_count
);
239 int create_viewer_session(void)
241 struct lttng_viewer_cmd cmd
;
242 struct lttng_viewer_create_session_response resp
;
245 cmd
.cmd
= htobe32(LTTNG_VIEWER_CREATE_SESSION
);
246 cmd
.data_size
= htobe64(0);
247 cmd
.cmd_version
= htobe32(0);
249 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
251 diag("[error] Error sending cmd");
254 LTTNG_ASSERT(ret_len
== sizeof(cmd
));
256 ret_len
= lttng_live_recv(control_sock
, &resp
, sizeof(resp
));
258 diag("[error] Remote side has closed connection");
262 diag("[error] Error receiving create session reply");
265 LTTNG_ASSERT(ret_len
== sizeof(resp
));
267 if (be32toh(resp
.status
) != LTTNG_VIEWER_CREATE_SESSION_OK
) {
268 diag("[error] Error creating viewer session");
278 int attach_session(uint64_t id
)
280 struct lttng_viewer_cmd cmd
;
281 struct lttng_viewer_attach_session_request rq
;
282 struct lttng_viewer_attach_session_response rp
;
283 struct lttng_viewer_stream stream
;
287 session
= zmalloc(sizeof(struct live_session
));
292 cmd
.cmd
= htobe32(LTTNG_VIEWER_ATTACH_SESSION
);
293 cmd
.data_size
= htobe64(sizeof(rq
));
294 cmd
.cmd_version
= htobe32(0);
296 memset(&rq
, 0, sizeof(rq
));
297 rq
.session_id
= htobe64(id
);
298 rq
.seek
= htobe32(LTTNG_VIEWER_SEEK_BEGINNING
);
300 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
302 diag("Error sending cmd LTTNG_VIEWER_ATTACH_SESSION");
305 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
307 diag("Error sending attach request");
311 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
313 diag("[error] Remote side has closed connection");
317 diag("Error receiving attach response");
320 if (be32toh(rp
.status
) != LTTNG_VIEWER_ATTACH_OK
) {
324 session
->stream_count
= be32toh(rp
.streams_count
);
325 if (session
->stream_count
== 0) {
326 diag("Got session stream count == 0");
329 session
->streams
= zmalloc(session
->stream_count
*
330 sizeof(struct viewer_stream
));
331 if (!session
->streams
) {
335 for (i
= 0; i
< be32toh(rp
.streams_count
); i
++) {
336 ret_len
= lttng_live_recv(control_sock
, &stream
, sizeof(stream
));
338 diag("[error] Remote side has closed connection");
342 diag("Error receiving stream");
345 session
->streams
[i
].id
= be64toh(stream
.id
);
347 session
->streams
[i
].ctf_trace_id
= be64toh(stream
.ctf_trace_id
);
348 session
->streams
[i
].first_read
= 1;
349 session
->streams
[i
].mmap_base
= mmap(NULL
, mmap_size
,
350 PROT_READ
| PROT_WRITE
,
351 MAP_PRIVATE
| MAP_ANONYMOUS
, -1, 0);
352 if (session
->streams
[i
].mmap_base
== MAP_FAILED
) {
357 if (be32toh(stream
.metadata_flag
)) {
358 session
->streams
[i
].metadata_flag
= 1;
361 return session
->stream_count
;
368 int get_metadata(void)
370 struct lttng_viewer_cmd cmd
;
371 struct lttng_viewer_get_metadata rq
;
372 struct lttng_viewer_metadata_packet rp
;
378 int metadata_stream_id
= -1;
380 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_METADATA
);
381 cmd
.data_size
= htobe64(sizeof(rq
));
382 cmd
.cmd_version
= htobe32(0);
384 for (i
= 0; i
< session
->stream_count
; i
++) {
385 if (session
->streams
[i
].metadata_flag
) {
386 metadata_stream_id
= i
;
391 if (metadata_stream_id
< 0) {
392 diag("No metadata stream found");
396 rq
.stream_id
= htobe64(session
->streams
[metadata_stream_id
].id
);
399 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
401 diag("Error sending cmd");
404 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
406 diag("Error sending get_metadata request");
409 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
411 diag("[error] Remote side has closed connection");
415 diag("Error receiving metadata response");
418 switch (be32toh(rp
.status
)) {
419 case LTTNG_VIEWER_METADATA_OK
:
421 case LTTNG_VIEWER_NO_NEW_METADATA
:
422 diag("Got LTTNG_VIEWER_NO_NEW_METADATA:");
425 case LTTNG_VIEWER_METADATA_ERR
:
426 diag("Got LTTNG_VIEWER_METADATA_ERR:");
429 diag("Got unknown status during LTTNG_VIEWER_GET_METADATA");
433 len
= be64toh(rp
.len
);
440 PERROR("relay data zmalloc");
443 ret_len
= lttng_live_recv(control_sock
, data
, len
);
445 diag("[error] Remote side has closed connection");
446 goto error_free_data
;
449 diag("Error receiving trace packet");
450 goto error_free_data
;
464 int get_next_index(void)
466 struct lttng_viewer_cmd cmd
;
467 struct lttng_viewer_get_next_index rq
;
468 struct lttng_viewer_index rp
;
472 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_NEXT_INDEX
);
473 cmd
.data_size
= htobe64(sizeof(rq
));
474 cmd
.cmd_version
= htobe32(0);
476 for (id
= 0; id
< session
->stream_count
; id
++) {
477 if (session
->streams
[id
].metadata_flag
) {
480 memset(&rq
, 0, sizeof(rq
));
481 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
484 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
486 diag("Error sending cmd");
489 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
491 diag("Error sending get_next_index request");
494 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
496 diag("[error] Remote side has closed connection");
500 diag("Error receiving index response");
504 rp
.flags
= be32toh(rp
.flags
);
506 switch (be32toh(rp
.status
)) {
507 case LTTNG_VIEWER_INDEX_INACTIVE
:
508 /* Skip this stream. */
509 diag("Got LTTNG_VIEWER_INDEX_INACTIVE");
511 case LTTNG_VIEWER_INDEX_OK
:
513 case LTTNG_VIEWER_INDEX_RETRY
:
516 case LTTNG_VIEWER_INDEX_HUP
:
517 diag("Got LTTNG_VIEWER_INDEX_HUP");
518 session
->streams
[id
].id
= -1ULL;
519 session
->streams
[id
].fd
= -1;
521 case LTTNG_VIEWER_INDEX_ERR
:
522 diag("Got LTTNG_VIEWER_INDEX_ERR");
525 diag("Unknown reply status during LTTNG_VIEWER_GET_NEXT_INDEX (%d)", be32toh(rp
.status
));
528 if (first_packet_stream_id
< 0) {
530 * Initialize the first packet stream id. That is,
531 * the first active stream encoutered.
533 first_packet_offset
= be64toh(rp
.offset
);
534 first_packet_len
= be64toh(rp
.packet_size
) / CHAR_BIT
;
535 first_packet_stream_id
= id
;
536 diag("Got first packet index with offset %d and len %d",
537 first_packet_offset
, first_packet_len
);
547 int get_data_packet(int id
, uint64_t offset
,
550 struct lttng_viewer_cmd cmd
;
551 struct lttng_viewer_get_packet rq
;
552 struct lttng_viewer_trace_packet rp
;
555 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_PACKET
);
556 cmd
.data_size
= htobe64(sizeof(rq
));
557 cmd
.cmd_version
= htobe32(0);
559 memset(&rq
, 0, sizeof(rq
));
560 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
561 /* Already in big endian. */
563 rq
.len
= htobe32(len
);
565 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
567 diag("Error sending cmd");
570 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
572 diag("Error sending get_data_packet request");
575 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
577 diag("[error] Remote side has closed connection");
581 diag("Error receiving data response");
584 rp
.flags
= be32toh(rp
.flags
);
586 switch (be32toh(rp
.status
)) {
587 case LTTNG_VIEWER_GET_PACKET_OK
:
588 len
= be32toh(rp
.len
);
590 diag("Got LTTNG_VIEWER_GET_PACKET_OK, but len == 0");
594 case LTTNG_VIEWER_GET_PACKET_RETRY
:
595 diag("Got LTTNG_VIEWER_GET_PACKET_RETRY:");
597 case LTTNG_VIEWER_GET_PACKET_ERR
:
598 if (rp
.flags
& LTTNG_VIEWER_FLAG_NEW_METADATA
) {
599 diag("Got LTTNG_VIEWER_GET_PACKET_ERR with NEW_METADATA flag");
602 diag("Got LTTNG_VIEWER_GET_PACKET_ERR:");
605 diag("Got unknown status code during LTTNG_VIEWER_GET_PACKET");
609 if (len
> mmap_size
) {
610 diag("mmap_size not big enough");
614 ret_len
= lttng_live_recv(control_sock
, session
->streams
[id
].mmap_base
, len
);
616 diag("[error] Remote side has closed connection");
620 diag("Error receiving trace packet");
630 int detach_viewer_session(uint64_t id
)
632 struct lttng_viewer_cmd cmd
;
633 struct lttng_viewer_detach_session_response resp
;
634 struct lttng_viewer_detach_session_request rq
;
638 cmd
.cmd
= htobe32(LTTNG_VIEWER_DETACH_SESSION
);
639 cmd
.data_size
= htobe64(sizeof(rq
));
640 cmd
.cmd_version
= htobe32(0);
642 memset(&rq
, 0, sizeof(rq
));
643 rq
.session_id
= htobe64(id
);
645 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
647 fprintf(stderr
, "[error] Error sending cmd\n");
652 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
654 fprintf(stderr
, "Error sending attach request\n");
659 ret_len
= lttng_live_recv(control_sock
, &resp
, sizeof(resp
));
661 fprintf(stderr
, "[error] Error receiving detach session reply\n");
666 if (be32toh(resp
.status
) != LTTNG_VIEWER_DETACH_SESSION_OK
) {
667 fprintf(stderr
, "[error] Error detaching viewer session\n");
677 int main(int argc
, char **argv
)
682 plan_tests(NUM_TESTS
);
684 diag("Live unit tests");
686 ret
= connect_viewer("localhost");
687 ok(ret
== 0, "Connect viewer to relayd");
689 ret
= establish_connection();
690 ok(ret
== 0, "Established connection and version check with %d.%d",
691 VERSION_MAJOR
, VERSION_MINOR
);
693 ret
= list_sessions(&session_id
);
694 ok(ret
> 0, "List sessions : %d session(s)", ret
);
699 ret
= create_viewer_session();
700 ok(ret
== 0, "Create viewer session");
702 ret
= attach_session(session_id
);
703 ok(ret
> 0, "Attach to session, %d stream(s) received", ret
);
705 ret
= get_metadata();
706 ok(ret
> 0, "Get metadata, received %d bytes", ret
);
708 ret
= get_next_index();
709 ok(ret
== 0, "Get one index per stream");
711 ret
= get_data_packet(first_packet_stream_id
, first_packet_offset
,
714 "Get one data packet for stream %d, offset %d, len %d",
715 first_packet_stream_id
, first_packet_offset
,
718 ret
= detach_viewer_session(session_id
);
719 ok(ret
== 0, "Detach viewer session");
721 ret
= list_sessions(&session_id
);
722 ok(ret
> 0, "List sessions : %d session(s)", ret
);
724 ret
= attach_session(session_id
);
725 ok(ret
> 0, "Attach to session, %d streams received", ret
);
727 return exit_status();