2 * Copyright (c) - 2013 Julien Desfossez <jdesfossez@efficios.com>
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License as published by as
6 * published by the Free Software Foundation; only version 2 of the License.
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 #include <common/compat/time.h>
25 #include <sys/types.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
36 #include <lttng/lttng.h>
38 #include <urcu/list.h>
39 #include <common/common.h>
41 #include <bin/lttng-relayd/lttng-viewer-abi.h>
42 #include <common/index/ctf-index.h>
44 #include <common/compat/endian.h>
46 #define SESSION1 "test1"
47 #define RELAYD_URL "net://localhost"
48 #define LIVE_TIMER 2000000
50 /* Number of TAP tests in this file */
52 #define mmap_size 524288
54 int ust_consumerd32_fd
;
55 int ust_consumerd64_fd
;
57 static int control_sock
;
58 struct live_session
*session
;
60 static int first_packet_offset
;
61 static int first_packet_len
;
62 static int first_packet_stream_id
= -1;
64 struct viewer_stream
{
66 uint64_t ctf_trace_id
;
75 struct viewer_stream
*streams
;
76 uint64_t live_timer_interval
;
77 uint64_t stream_count
;
81 ssize_t
lttng_live_recv(int fd
, void *buf
, size_t len
)
84 size_t copied
= 0, to_copy
= len
;
87 ret
= recv(fd
, buf
+ copied
, to_copy
, 0);
89 assert(ret
<= to_copy
);
93 } while ((ret
> 0 && to_copy
> 0)
94 || (ret
< 0 && errno
== EINTR
));
97 /* ret = 0 means orderly shutdown, ret < 0 is error. */
102 ssize_t
lttng_live_send(int fd
, const void *buf
, size_t len
)
107 ret
= send(fd
, buf
, len
, MSG_NOSIGNAL
);
108 } while (ret
< 0 && errno
== EINTR
);
113 int connect_viewer(char *hostname
)
115 struct hostent
*host
;
116 struct sockaddr_in server_addr
;
119 host
= gethostbyname(hostname
);
125 if ((control_sock
= socket(AF_INET
, SOCK_STREAM
, 0)) == -1) {
131 server_addr
.sin_family
= AF_INET
;
132 server_addr
.sin_port
= htons(5344);
133 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
134 bzero(&(server_addr
.sin_zero
), 8);
136 if (connect(control_sock
, (struct sockaddr
*) &server_addr
,
137 sizeof(struct sockaddr
)) == -1) {
143 server_addr
.sin_family
= AF_INET
;
144 server_addr
.sin_port
= htons(5345);
145 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
146 bzero(&(server_addr
.sin_zero
), 8);
154 int establish_connection(void)
156 struct lttng_viewer_cmd cmd
;
157 struct lttng_viewer_connect connect
;
160 cmd
.cmd
= htobe32(LTTNG_VIEWER_CONNECT
);
161 cmd
.data_size
= htobe64(sizeof(connect
));
162 cmd
.cmd_version
= htobe32(0);
164 memset(&connect
, 0, sizeof(connect
));
165 connect
.major
= htobe32(VERSION_MAJOR
);
166 connect
.minor
= htobe32(VERSION_MINOR
);
167 connect
.type
= htobe32(LTTNG_VIEWER_CLIENT_COMMAND
);
169 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
171 diag("Error sending cmd");
174 ret_len
= lttng_live_send(control_sock
, &connect
, sizeof(connect
));
176 diag("Error sending version");
180 ret_len
= lttng_live_recv(control_sock
, &connect
, sizeof(connect
));
182 diag("[error] Remote side has closed connection");
186 diag("Error receiving version");
196 * Returns the number of sessions, should be 1 during the unit test.
198 int list_sessions(uint64_t *session_id
)
200 struct lttng_viewer_cmd cmd
;
201 struct lttng_viewer_list_sessions list
;
202 struct lttng_viewer_session lsession
;
205 int first_session
= 0;
207 cmd
.cmd
= htobe32(LTTNG_VIEWER_LIST_SESSIONS
);
208 cmd
.data_size
= htobe64(0);
209 cmd
.cmd_version
= htobe32(0);
211 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
213 diag("Error sending cmd");
217 ret_len
= lttng_live_recv(control_sock
, &list
, sizeof(list
));
219 diag("[error] Remote side has closed connection");
223 diag("Error receiving session list");
227 for (i
= 0; i
< be32toh(list
.sessions_count
); i
++) {
228 ret_len
= lttng_live_recv(control_sock
, &lsession
, sizeof(lsession
));
230 diag("Error receiving session");
233 if (lsession
.streams
> 0 && first_session
<= 0) {
234 first_session
= be64toh(lsession
.id
);
235 *session_id
= first_session
;
239 return be32toh(list
.sessions_count
);
245 int create_viewer_session(void)
247 struct lttng_viewer_cmd cmd
;
248 struct lttng_viewer_create_session_response resp
;
251 cmd
.cmd
= htobe32(LTTNG_VIEWER_CREATE_SESSION
);
252 cmd
.data_size
= htobe64(0);
253 cmd
.cmd_version
= htobe32(0);
255 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
257 diag("[error] Error sending cmd");
260 assert(ret_len
== sizeof(cmd
));
262 ret_len
= lttng_live_recv(control_sock
, &resp
, sizeof(resp
));
264 diag("[error] Remote side has closed connection");
268 diag("[error] Error receiving create session reply");
271 assert(ret_len
== sizeof(resp
));
273 if (be32toh(resp
.status
) != LTTNG_VIEWER_CREATE_SESSION_OK
) {
274 diag("[error] Error creating viewer session");
283 int attach_session(uint64_t id
)
285 struct lttng_viewer_cmd cmd
;
286 struct lttng_viewer_attach_session_request rq
;
287 struct lttng_viewer_attach_session_response rp
;
288 struct lttng_viewer_stream stream
;
292 session
= zmalloc(sizeof(struct live_session
));
297 cmd
.cmd
= htobe32(LTTNG_VIEWER_ATTACH_SESSION
);
298 cmd
.data_size
= htobe64(sizeof(rq
));
299 cmd
.cmd_version
= htobe32(0);
301 memset(&rq
, 0, sizeof(rq
));
302 rq
.session_id
= htobe64(id
);
303 rq
.seek
= htobe32(LTTNG_VIEWER_SEEK_BEGINNING
);
305 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
307 diag("Error sending cmd LTTNG_VIEWER_ATTACH_SESSION");
310 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
312 diag("Error sending attach request");
316 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
318 diag("[error] Remote side has closed connection");
322 diag("Error receiving attach response");
325 if (be32toh(rp
.status
) != LTTNG_VIEWER_ATTACH_OK
) {
329 session
->stream_count
= be32toh(rp
.streams_count
);
330 if (session
->stream_count
== 0) {
331 diag("Got session stream count == 0");
334 session
->streams
= zmalloc(session
->stream_count
*
335 sizeof(struct viewer_stream
));
336 if (!session
->streams
) {
340 for (i
= 0; i
< be32toh(rp
.streams_count
); i
++) {
341 ret_len
= lttng_live_recv(control_sock
, &stream
, sizeof(stream
));
343 diag("[error] Remote side has closed connection");
347 diag("Error receiving stream");
350 session
->streams
[i
].id
= be64toh(stream
.id
);
352 session
->streams
[i
].ctf_trace_id
= be64toh(stream
.ctf_trace_id
);
353 session
->streams
[i
].first_read
= 1;
354 session
->streams
[i
].mmap_base
= mmap(NULL
, mmap_size
,
355 PROT_READ
| PROT_WRITE
,
356 MAP_PRIVATE
| MAP_ANONYMOUS
, -1, 0);
357 if (session
->streams
[i
].mmap_base
== MAP_FAILED
) {
362 if (be32toh(stream
.metadata_flag
)) {
363 session
->streams
[i
].metadata_flag
= 1;
366 return session
->stream_count
;
372 int get_metadata(void)
374 struct lttng_viewer_cmd cmd
;
375 struct lttng_viewer_get_metadata rq
;
376 struct lttng_viewer_metadata_packet rp
;
382 int metadata_stream_id
= -1;
384 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_METADATA
);
385 cmd
.data_size
= htobe64(sizeof(rq
));
386 cmd
.cmd_version
= htobe32(0);
388 for (i
= 0; i
< session
->stream_count
; i
++) {
389 if (session
->streams
[i
].metadata_flag
) {
390 metadata_stream_id
= i
;
395 if (metadata_stream_id
< 0) {
396 diag("No metadata stream found");
400 rq
.stream_id
= htobe64(session
->streams
[metadata_stream_id
].id
);
402 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
404 diag("Error sending cmd");
407 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
409 diag("Error sending get_metadata request");
412 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
414 diag("[error] Remote side has closed connection");
418 diag("Error receiving metadata response");
421 switch (be32toh(rp
.status
)) {
422 case LTTNG_VIEWER_METADATA_OK
:
424 case LTTNG_VIEWER_NO_NEW_METADATA
:
425 diag("Got LTTNG_VIEWER_NO_NEW_METADATA:");
428 case LTTNG_VIEWER_METADATA_ERR
:
429 diag("Got LTTNG_VIEWER_METADATA_ERR:");
432 diag("Got unknown status during LTTNG_VIEWER_GET_METADATA");
436 len
= be64toh(rp
.len
);
443 PERROR("relay data zmalloc");
446 ret_len
= lttng_live_recv(control_sock
, data
, len
);
448 diag("[error] Remote side has closed connection");
449 goto error_free_data
;
452 diag("Error receiving trace packet");
453 goto error_free_data
;
466 int get_next_index(void)
468 struct lttng_viewer_cmd cmd
;
469 struct lttng_viewer_get_next_index rq
;
470 struct lttng_viewer_index rp
;
474 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_NEXT_INDEX
);
475 cmd
.data_size
= htobe64(sizeof(rq
));
476 cmd
.cmd_version
= htobe32(0);
478 for (id
= 0; id
< session
->stream_count
; id
++) {
479 if (session
->streams
[id
].metadata_flag
) {
482 memset(&rq
, 0, sizeof(rq
));
483 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
486 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
488 diag("Error sending cmd");
491 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
493 diag("Error sending get_next_index request");
496 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
498 diag("[error] Remote side has closed connection");
502 diag("Error receiving index response");
506 rp
.flags
= be32toh(rp
.flags
);
508 switch (be32toh(rp
.status
)) {
509 case LTTNG_VIEWER_INDEX_INACTIVE
:
510 /* Skip this stream. */
511 diag("Got LTTNG_VIEWER_INDEX_INACTIVE");
513 case LTTNG_VIEWER_INDEX_OK
:
515 case LTTNG_VIEWER_INDEX_RETRY
:
518 case LTTNG_VIEWER_INDEX_HUP
:
519 diag("Got LTTNG_VIEWER_INDEX_HUP");
520 session
->streams
[id
].id
= -1ULL;
521 session
->streams
[id
].fd
= -1;
523 case LTTNG_VIEWER_INDEX_ERR
:
524 diag("Got LTTNG_VIEWER_INDEX_ERR");
527 diag("Unknown reply status during LTTNG_VIEWER_GET_NEXT_INDEX (%d)", be32toh(rp
.status
));
530 if (first_packet_stream_id
< 0) {
532 * Initialize the first packet stream id. That is,
533 * the first active stream encoutered.
535 first_packet_offset
= be64toh(rp
.offset
);
536 first_packet_len
= be64toh(rp
.packet_size
) / CHAR_BIT
;
537 first_packet_stream_id
= id
;
538 diag("Got first packet index with offset %d and len %d",
539 first_packet_offset
, first_packet_len
);
549 int get_data_packet(int id
, uint64_t offset
,
552 struct lttng_viewer_cmd cmd
;
553 struct lttng_viewer_get_packet rq
;
554 struct lttng_viewer_trace_packet rp
;
557 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_PACKET
);
558 cmd
.data_size
= htobe64(sizeof(rq
));
559 cmd
.cmd_version
= htobe32(0);
561 memset(&rq
, 0, sizeof(rq
));
562 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
563 /* Already in big endian. */
565 rq
.len
= htobe32(len
);
567 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
569 diag("Error sending cmd");
572 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
574 diag("Error sending get_data_packet request");
577 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
579 diag("[error] Remote side has closed connection");
583 diag("Error receiving data response");
586 rp
.flags
= be32toh(rp
.flags
);
588 switch (be32toh(rp
.status
)) {
589 case LTTNG_VIEWER_GET_PACKET_OK
:
590 len
= be32toh(rp
.len
);
592 diag("Got LTTNG_VIEWER_GET_PACKET_OK, but len == 0");
596 case LTTNG_VIEWER_GET_PACKET_RETRY
:
597 diag("Got LTTNG_VIEWER_GET_PACKET_RETRY:");
599 case LTTNG_VIEWER_GET_PACKET_ERR
:
600 if (rp
.flags
& LTTNG_VIEWER_FLAG_NEW_METADATA
) {
601 diag("Got LTTNG_VIEWER_GET_PACKET_ERR with NEW_METADATA flag");
604 diag("Got LTTNG_VIEWER_GET_PACKET_ERR:");
607 diag("Got unknown status code during LTTNG_VIEWER_GET_PACKET");
611 if (len
> mmap_size
) {
612 diag("mmap_size not big enough");
616 ret_len
= lttng_live_recv(control_sock
, session
->streams
[id
].mmap_base
, len
);
618 diag("[error] Remote side has closed connection");
622 diag("Error receiving trace packet");
631 int detach_viewer_session(uint64_t id
)
633 struct lttng_viewer_cmd cmd
;
634 struct lttng_viewer_detach_session_response resp
;
635 struct lttng_viewer_detach_session_request rq
;
639 cmd
.cmd
= htobe32(LTTNG_VIEWER_DETACH_SESSION
);
640 cmd
.data_size
= htobe64(sizeof(rq
));
641 cmd
.cmd_version
= htobe32(0);
643 memset(&rq
, 0, sizeof(rq
));
644 rq
.session_id
= htobe64(id
);
646 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
648 fprintf(stderr
, "[error] Error sending cmd\n");
653 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
655 fprintf(stderr
, "Error sending attach request\n");
660 ret_len
= lttng_live_recv(control_sock
, &resp
, sizeof(resp
));
662 fprintf(stderr
, "[error] Error receiving detach session reply\n");
667 if (be32toh(resp
.status
) != LTTNG_VIEWER_DETACH_SESSION_OK
) {
668 fprintf(stderr
, "[error] Error detaching viewer session\n");
678 int main(int argc
, char **argv
)
683 plan_tests(NUM_TESTS
);
685 diag("Live unit tests");
687 ret
= connect_viewer("localhost");
688 ok(ret
== 0, "Connect viewer to relayd");
690 ret
= establish_connection();
691 ok(ret
== 0, "Established connection and version check with %d.%d",
692 VERSION_MAJOR
, VERSION_MINOR
);
694 ret
= list_sessions(&session_id
);
695 ok(ret
> 0, "List sessions : %d session(s)", ret
);
697 ret
= create_viewer_session();
698 ok(ret
== 0, "Create viewer session");
700 ret
= attach_session(session_id
);
701 ok(ret
> 0, "Attach to session, %d stream(s) received", ret
);
703 ret
= get_metadata();
704 ok(ret
> 0, "Get metadata, received %d bytes", ret
);
706 ret
= get_next_index();
707 ok(ret
== 0, "Get one index per stream");
709 ret
= get_data_packet(first_packet_stream_id
, first_packet_offset
,
712 "Get one data packet for stream %d, offset %d, len %d",
713 first_packet_stream_id
, first_packet_offset
,
716 ret
= detach_viewer_session(session_id
);
717 ok(ret
== 0, "Detach viewer session");
719 ret
= list_sessions(&session_id
);
720 ok(ret
> 0, "List sessions : %d session(s)", ret
);
722 ret
= attach_session(session_id
);
723 ok(ret
> 0, "Attach to session, %d streams received", ret
);
725 return exit_status();