2 * Copyright (c) - 2013 Julien Desfossez <jdesfossez@efficios.com>
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License as published by as
6 * published by the Free Software Foundation; only version 2 of the License.
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
26 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <netinet/in.h>
37 #include <lttng/lttng.h>
39 #include <urcu/list.h>
40 #include <bin/lttng-sessiond/session.h>
41 #include <common/common.h>
43 #include <bin/lttng-relayd/lttng-viewer-abi.h>
44 #include <common/index/ctf-index.h>
46 #define SESSION1 "test1"
47 #define RELAYD_URL "net://localhost"
48 #define LIVE_TIMER 2000000
50 /* Number of TAP tests in this file */
52 #define mmap_size 524288
54 int ust_consumerd32_fd
;
55 int ust_consumerd64_fd
;
57 static int control_sock
;
58 struct live_session
*session
;
60 static int first_packet_offset
;
61 static int first_packet_len
;
62 static int first_packet_stream_id
;
64 struct viewer_stream
{
66 uint64_t ctf_trace_id
;
75 struct viewer_stream
*streams
;
76 uint64_t live_timer_interval
;
77 uint64_t stream_count
;
81 int connect_viewer(char *hostname
)
84 struct sockaddr_in server_addr
;
87 host
= gethostbyname(hostname
);
93 if ((control_sock
= socket(AF_INET
, SOCK_STREAM
, 0)) == -1) {
99 server_addr
.sin_family
= AF_INET
;
100 server_addr
.sin_port
= htons(5344);
101 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
102 bzero(&(server_addr
.sin_zero
), 8);
104 if (connect(control_sock
, (struct sockaddr
*) &server_addr
,
105 sizeof(struct sockaddr
)) == -1) {
111 server_addr
.sin_family
= AF_INET
;
112 server_addr
.sin_port
= htons(5345);
113 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
114 bzero(&(server_addr
.sin_zero
), 8);
122 int establish_connection(void)
124 struct lttng_viewer_cmd cmd
;
125 struct lttng_viewer_connect connect
;
128 cmd
.cmd
= htobe32(LTTNG_VIEWER_CONNECT
);
129 cmd
.data_size
= sizeof(connect
);
132 memset(&connect
, 0, sizeof(connect
));
133 connect
.major
= htobe32(VERSION_MAJOR
);
134 connect
.minor
= htobe32(VERSION_MINOR
);
135 connect
.type
= htobe32(LTTNG_VIEWER_CLIENT_COMMAND
);
138 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
139 } while (ret
< 0 && errno
== EINTR
);
141 fprintf(stderr
, "Error sending cmd\n");
145 ret
= send(control_sock
, &connect
, sizeof(connect
), 0);
146 } while (ret
< 0 && errno
== EINTR
);
148 fprintf(stderr
, "Error sending version\n");
153 ret
= recv(control_sock
, &connect
, sizeof(connect
), 0);
154 } while (ret
< 0 && errno
== EINTR
);
156 fprintf(stderr
, "Error receiving version\n");
166 * Returns the number of sessions, should be 1 during the unit test.
168 int list_sessions(int *session_id
)
170 struct lttng_viewer_cmd cmd
;
171 struct lttng_viewer_list_sessions list
;
172 struct lttng_viewer_session lsession
;
174 int first_session
= 0;
176 cmd
.cmd
= htobe32(LTTNG_VIEWER_LIST_SESSIONS
);
181 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
182 } while (ret
< 0 && errno
== EINTR
);
184 fprintf(stderr
, "Error sending cmd\n");
189 ret
= recv(control_sock
, &list
, sizeof(list
), 0);
190 } while (ret
< 0 && errno
== EINTR
);
192 fprintf(stderr
, "Error receiving session list\n");
196 for (i
= 0; i
< be32toh(list
.sessions_count
); i
++) {
198 ret
= recv(control_sock
, &lsession
, sizeof(lsession
), 0);
199 } while (ret
< 0 && errno
== EINTR
);
201 fprintf(stderr
, "Error receiving session\n");
204 if (lsession
.streams
> 0 && first_session
<= 0) {
205 first_session
= be64toh(lsession
.id
);
206 *session_id
= first_session
;
210 ret
= be32toh(list
.sessions_count
);
216 int attach_session(int id
)
218 struct lttng_viewer_cmd cmd
;
219 struct lttng_viewer_attach_session_request rq
;
220 struct lttng_viewer_attach_session_response rp
;
221 struct lttng_viewer_stream stream
;
224 session
= zmalloc(sizeof(struct live_session
));
230 cmd
.cmd
= htobe32(LTTNG_VIEWER_ATTACH_SESSION
);
231 cmd
.data_size
= sizeof(rq
);
234 memset(&rq
, 0, sizeof(rq
));
235 rq
.session_id
= htobe64(id
);
236 rq
.seek
= htobe32(LTTNG_VIEWER_SEEK_BEGINNING
);
239 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
240 } while (ret
< 0 && errno
== EINTR
);
242 fprintf(stderr
, "Error sending cmd\n");
246 ret
= send(control_sock
, &rq
, sizeof(rq
), 0);
247 } while (ret
< 0 && errno
== EINTR
);
249 fprintf(stderr
, "Error sending attach request\n");
254 ret
= recv(control_sock
, &rp
, sizeof(rp
), 0);
255 } while (ret
< 0 && errno
== EINTR
);
257 fprintf(stderr
, "Error receiving attach response\n");
260 if (be32toh(rp
.status
) != LTTNG_VIEWER_ATTACH_OK
) {
265 session
->stream_count
= be32toh(rp
.streams_count
);
266 session
->streams
= zmalloc(session
->stream_count
*
267 sizeof(struct viewer_stream
));
268 if (!session
->streams
) {
273 for (i
= 0; i
< be32toh(rp
.streams_count
); i
++) {
275 ret
= recv(control_sock
, &stream
, sizeof(stream
), 0);
276 } while (ret
< 0 && errno
== EINTR
);
278 fprintf(stderr
, "Error receiving stream\n");
281 session
->streams
[i
].id
= be64toh(stream
.id
);
283 session
->streams
[i
].ctf_trace_id
= be64toh(stream
.ctf_trace_id
);
284 session
->streams
[i
].first_read
= 1;
285 session
->streams
[i
].mmap_base
= mmap(NULL
, mmap_size
, PROT_READ
| PROT_WRITE
,
286 MAP_PRIVATE
| MAP_ANONYMOUS
, -1, 0);
287 if (session
->streams
[i
].mmap_base
== MAP_FAILED
) {
288 fprintf(stderr
, "mmap error\n");
293 if (be32toh(stream
.metadata_flag
)) {
294 session
->streams
[i
].metadata_flag
= 1;
297 ret
= session
->stream_count
;
304 int get_metadata(void)
306 struct lttng_viewer_cmd cmd
;
307 struct lttng_viewer_get_metadata rq
;
308 struct lttng_viewer_metadata_packet rp
;
313 int metadata_stream_id
= -1;
315 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_METADATA
);
316 cmd
.data_size
= sizeof(rq
);
319 for (i
= 0; i
< session
->stream_count
; i
++) {
320 if (session
->streams
[i
].metadata_flag
) {
321 metadata_stream_id
= i
;
326 if (metadata_stream_id
< 0) {
327 fprintf(stderr
, "No metadata stream found\n");
332 rq
.stream_id
= htobe64(session
->streams
[metadata_stream_id
].id
);
335 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
336 } while (ret
< 0 && errno
== EINTR
);
338 fprintf(stderr
, "Error sending cmd\n");
342 ret
= send(control_sock
, &rq
, sizeof(rq
), 0);
343 } while (ret
< 0 && errno
== EINTR
);
345 fprintf(stderr
, "Error sending get_metadata request\n");
349 ret
= recv(control_sock
, &rp
, sizeof(rp
), 0);
350 } while (ret
< 0 && errno
== EINTR
);
352 fprintf(stderr
, "Error receiving metadata response\n");
355 switch (be32toh(rp
.status
)) {
356 case LTTNG_VIEWER_METADATA_OK
:
358 case LTTNG_VIEWER_NO_NEW_METADATA
:
359 fprintf(stderr
, "NO NEW\n");
362 case LTTNG_VIEWER_METADATA_ERR
:
363 fprintf(stderr
, "ERR\n");
367 fprintf(stderr
, "UNKNOWN\n");
372 len
= be64toh(rp
.len
);
379 perror("relay data zmalloc");
383 ret
= recv(control_sock
, data
, len
, MSG_WAITALL
);
384 } while (ret
< 0 && errno
== EINTR
);
386 fprintf(stderr
, "Error receiving trace packet\n");
398 int get_next_index(void)
400 struct lttng_viewer_cmd cmd
;
401 struct lttng_viewer_get_next_index rq
;
402 struct lttng_viewer_index rp
;
406 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_NEXT_INDEX
);
407 cmd
.data_size
= sizeof(rq
);
410 for (id
= 0; id
< session
->stream_count
; id
++) {
411 if (session
->streams
[id
].metadata_flag
) {
414 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
418 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
419 } while (ret
< 0 && errno
== EINTR
);
421 fprintf(stderr
, "Error sending cmd\n");
425 ret
= send(control_sock
, &rq
, sizeof(rq
), 0);
426 } while (ret
< 0 && errno
== EINTR
);
428 fprintf(stderr
, "Error sending get_next_index request\n");
432 ret
= recv(control_sock
, &rp
, sizeof(rp
), 0);
433 } while (ret
< 0 && errno
== EINTR
);
435 fprintf(stderr
, "Error receiving index response\n");
439 rp
.flags
= be32toh(rp
.flags
);
441 switch (be32toh(rp
.status
)) {
442 case LTTNG_VIEWER_INDEX_INACTIVE
:
443 fprintf(stderr
, "(INACTIVE)\n");
445 case LTTNG_VIEWER_INDEX_OK
:
447 case LTTNG_VIEWER_INDEX_RETRY
:
450 case LTTNG_VIEWER_INDEX_HUP
:
451 fprintf(stderr
, "(HUP)\n");
452 session
->streams
[id
].id
= -1ULL;
453 session
->streams
[id
].fd
= -1;
455 case LTTNG_VIEWER_INDEX_ERR
:
456 fprintf(stderr
, "(ERR)\n");
460 fprintf(stderr
, "SHOULD NOT HAPPEN\n");
464 if (!first_packet_stream_id
) {
465 first_packet_offset
= be64toh(rp
.offset
);
466 first_packet_len
= be64toh(rp
.packet_size
) / CHAR_BIT
;
467 first_packet_stream_id
= id
;
477 int get_data_packet(int id
, uint64_t offset
,
480 struct lttng_viewer_cmd cmd
;
481 struct lttng_viewer_get_packet rq
;
482 struct lttng_viewer_trace_packet rp
;
485 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_PACKET
);
486 cmd
.data_size
= sizeof(rq
);
489 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
490 /* Already in big endian. */
492 rq
.len
= htobe32(len
);
495 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
496 } while (ret
< 0 && errno
== EINTR
);
498 fprintf(stderr
, "Error sending cmd\n");
502 ret
= send(control_sock
, &rq
, sizeof(rq
), 0);
503 } while (ret
< 0 && errno
== EINTR
);
505 fprintf(stderr
, "Error sending get_data_packet request\n");
509 ret
= recv(control_sock
, &rp
, sizeof(rp
), 0);
510 } while (ret
< 0 && errno
== EINTR
);
512 fprintf(stderr
, "Error receiving data response\n");
515 rp
.flags
= be32toh(rp
.flags
);
517 switch (be32toh(rp
.status
)) {
518 case LTTNG_VIEWER_GET_PACKET_OK
:
520 case LTTNG_VIEWER_GET_PACKET_RETRY
:
521 fprintf(stderr
, "RETRY\n");
524 case LTTNG_VIEWER_GET_PACKET_ERR
:
525 if (rp
.flags
& LTTNG_VIEWER_FLAG_NEW_METADATA
) {
526 fprintf(stderr
, "NEW_METADATA\n");
530 fprintf(stderr
, "ERR\n");
534 fprintf(stderr
, "UNKNOWN\n");
539 len
= be32toh(rp
.len
);
544 if (len
> mmap_size
) {
545 fprintf(stderr
, "mmap_size not big enough\n");
551 ret
= recv(control_sock
, session
->streams
[id
].mmap_base
, len
, MSG_WAITALL
);
552 } while (ret
< 0 && errno
== EINTR
);
554 fprintf(stderr
, "Error receiving trace packet\n");
564 int main(int argc
, char **argv
)
569 plan_tests(NUM_TESTS
);
571 diag("Live unit tests");
573 ret
= connect_viewer("localhost");
574 ok(ret
== 0, "Connect viewer to relayd");
576 ret
= establish_connection();
577 ok(ret
== 0, "Established connection and version check with %d.%d",
578 VERSION_MAJOR
, VERSION_MINOR
);
580 ret
= list_sessions(&session_id
);
581 ok(ret
> 0, "List sessions : %d session(s)", ret
);
583 ret
= attach_session(session_id
);
584 ok(ret
> 0, "Attach to session, %d streams received", ret
);
586 ret
= get_metadata();
587 ok(ret
> 0, "Get metadata, received %d bytes", ret
);
589 ret
= get_next_index();
590 ok(ret
== 0, "Get one index per stream");
592 ret
= get_data_packet(first_packet_stream_id
, first_packet_offset
,
594 ok(ret
== first_packet_len
,
595 "Get one data packet for stream %d, offset %d, len %d",
596 first_packet_stream_id
, first_packet_offset
,
599 return exit_status();