Fix a broken test makefile
[lttng-ust.git] / libustd / libustd.c
1 /* Copyright (C) 2009 Pierre-Marc Fournier
2 * 2010 Alexis Halle
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18
19 #define _GNU_SOURCE
20
21 #include <sys/shm.h>
22 #include <unistd.h>
23 #include <pthread.h>
24 #include <signal.h>
25
26 #include <stdlib.h>
27 #include <stdio.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <assert.h>
31
32 #include <ust/ustd.h>
33 #include "lowlevel.h"
34 #include "usterr.h"
35 #include "ustcomm.h"
36
37 /* return value: 0 = subbuffer is finished, it won't produce data anymore
38 * 1 = got subbuffer successfully
39 * <0 = error
40 */
41
42 #define GET_SUBBUF_OK 1
43 #define GET_SUBBUF_DONE 0
44 #define GET_SUBBUF_DIED 2
45
46 #define PUT_SUBBUF_OK 1
47 #define PUT_SUBBUF_DIED 0
48 #define PUT_SUBBUF_PUSHED 2
49 #define PUT_SUBBUF_DONE 3
50
51 #define UNIX_PATH_MAX 108
52
53 int get_subbuffer(struct buffer_info *buf)
54 {
55 char *send_msg=NULL;
56 char *received_msg=NULL;
57 char *rep_code=NULL;
58 int retval;
59 int result;
60
61 asprintf(&send_msg, "get_subbuffer %s", buf->name);
62 result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
63 if((result == -1 && (errno == ECONNRESET || errno == EPIPE)) || result == 0) {
64 DBG("app died while being traced");
65 retval = GET_SUBBUF_DIED;
66 goto end;
67 }
68 else if(result < 0) {
69 ERR("get_subbuffer: ustcomm_send_request failed");
70 retval = -1;
71 goto end;
72 }
73
74 result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
75 if(result != 2 && result != 1) {
76 ERR("unable to parse response to get_subbuffer");
77 retval = -1;
78 free(received_msg);
79 goto end_rep;
80 }
81
82 if(!strcmp(rep_code, "OK")) {
83 DBG("got subbuffer %s", buf->name);
84 retval = GET_SUBBUF_OK;
85 }
86 else if(nth_token_is(received_msg, "END", 0) == 1) {
87 retval = GET_SUBBUF_DONE;
88 goto end_rep;
89 }
90 else if(!strcmp(received_msg, "NOTFOUND")) {
91 DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name);
92 retval = GET_SUBBUF_DIED;
93 goto end_rep;
94 }
95 else {
96 DBG("error getting subbuffer %s", buf->name);
97 retval = -1;
98 }
99
100 /* FIXME: free correctly the stuff */
101 end_rep:
102 if(rep_code)
103 free(rep_code);
104 end:
105 if(send_msg)
106 free(send_msg);
107 if(received_msg)
108 free(received_msg);
109
110 return retval;
111 }
112
113 int put_subbuffer(struct buffer_info *buf)
114 {
115 char *send_msg=NULL;
116 char *received_msg=NULL;
117 char *rep_code=NULL;
118 int retval;
119 int result;
120
121 asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
122 result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
123 if(result < 0 && (errno == ECONNRESET || errno == EPIPE)) {
124 retval = PUT_SUBBUF_DIED;
125 goto end;
126 }
127 else if(result < 0) {
128 ERR("put_subbuffer: send_message failed");
129 retval = -1;
130 goto end;
131 }
132 else if(result == 0) {
133 /* Program seems finished. However this might not be
134 * the last subbuffer that has to be collected.
135 */
136 retval = PUT_SUBBUF_DIED;
137 goto end;
138 }
139
140 result = sscanf(received_msg, "%as", &rep_code);
141 if(result != 1) {
142 ERR("unable to parse response to put_subbuffer");
143 retval = -1;
144 goto end_rep;
145 }
146
147 if(!strcmp(rep_code, "OK")) {
148 DBG("subbuffer put %s", buf->name);
149 retval = PUT_SUBBUF_OK;
150 }
151 else if(!strcmp(received_msg, "NOTFOUND")) {
152 DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name);
153 /* However, maybe this was not the last subbuffer. So
154 * we return the program died.
155 */
156 retval = PUT_SUBBUF_DIED;
157 goto end_rep;
158 }
159 else {
160 DBG("put_subbuffer: received error, we were pushed");
161 retval = PUT_SUBBUF_PUSHED;
162 goto end_rep;
163 }
164
165 end_rep:
166 if(rep_code)
167 free(rep_code);
168
169 end:
170 if(send_msg)
171 free(send_msg);
172 if(received_msg)
173 free(received_msg);
174
175 return retval;
176 }
177
178 void decrement_active_buffers(void *arg)
179 {
180 struct libustd_instance *instance = arg;
181 pthread_mutex_lock(&instance->mutex);
182 instance->active_buffers--;
183 pthread_mutex_unlock(&instance->mutex);
184 }
185
186 struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, const char *bufname)
187 {
188 struct buffer_info *buf;
189 char *send_msg;
190 char *received_msg;
191 int result;
192 struct shmid_ds shmds;
193
194 buf = (struct buffer_info *) zmalloc(sizeof(struct buffer_info));
195 if(buf == NULL) {
196 ERR("add_buffer: insufficient memory");
197 return NULL;
198 }
199
200 buf->conn = malloc(sizeof(struct ustcomm_connection));
201 if(buf->conn == NULL) {
202 ERR("add_buffer: insufficient memory");
203 free(buf);
204 return NULL;
205 }
206
207 buf->name = bufname;
208 buf->pid = pid;
209
210 /* connect to app */
211 result = ustcomm_connect_app(buf->pid, buf->conn);
212 if(result) {
213 WARN("unable to connect to process, it probably died before we were able to connect");
214 return NULL;
215 }
216
217 /* get pidunique */
218 asprintf(&send_msg, "get_pidunique");
219 result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
220 free(send_msg);
221 if(result == -1) {
222 ERR("problem in ustcomm_send_request(get_pidunique)");
223 return NULL;
224 }
225 if(result == 0) {
226 goto error;
227 }
228
229 result = sscanf(received_msg, "%lld", &buf->pidunique);
230 if(result != 1) {
231 ERR("unable to parse response to get_pidunique");
232 return NULL;
233 }
234 free(received_msg);
235 DBG("got pidunique %lld", buf->pidunique);
236
237 /* get shmid */
238 asprintf(&send_msg, "get_shmid %s", buf->name);
239 result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
240 free(send_msg);
241 if(result == -1) {
242 ERR("problem in ustcomm_send_request(get_shmid)");
243 return NULL;
244 }
245 if(result == 0) {
246 goto error;
247 }
248
249 result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid);
250 if(result != 2) {
251 ERR("unable to parse response to get_shmid (\"%s\")", received_msg);
252 return NULL;
253 }
254 free(received_msg);
255 DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid);
256
257 /* get n_subbufs */
258 asprintf(&send_msg, "get_n_subbufs %s", buf->name);
259 result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
260 free(send_msg);
261 if(result == -1) {
262 ERR("problem in ustcomm_send_request(g_n_subbufs)");
263 return NULL;
264 }
265 if(result == 0) {
266 goto error;
267 }
268
269 result = sscanf(received_msg, "%d", &buf->n_subbufs);
270 if(result != 1) {
271 ERR("unable to parse response to get_n_subbufs");
272 return NULL;
273 }
274 free(received_msg);
275 DBG("got n_subbufs %d", buf->n_subbufs);
276
277 /* get subbuf size */
278 asprintf(&send_msg, "get_subbuf_size %s", buf->name);
279 result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
280 free(send_msg);
281 if(result == -1) {
282 ERR("problem in ustcomm_send_request(get_subbuf_size)");
283 return NULL;
284 }
285 if(result == 0) {
286 goto error;
287 }
288
289 result = sscanf(received_msg, "%d", &buf->subbuf_size);
290 if(result != 1) {
291 ERR("unable to parse response to get_subbuf_size");
292 return NULL;
293 }
294 free(received_msg);
295 DBG("got subbuf_size %d", buf->subbuf_size);
296
297 /* attach memory */
298 buf->mem = shmat(buf->shmid, NULL, 0);
299 if(buf->mem == (void *) 0) {
300 PERROR("shmat");
301 return NULL;
302 }
303 DBG("successfully attached buffer memory");
304
305 buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0);
306 if(buf->bufstruct_mem == (void *) 0) {
307 PERROR("shmat");
308 return NULL;
309 }
310 DBG("successfully attached buffer bufstruct memory");
311
312 /* obtain info on the memory segment */
313 result = shmctl(buf->shmid, IPC_STAT, &shmds);
314 if(result == -1) {
315 PERROR("shmctl");
316 return NULL;
317 }
318 buf->memlen = shmds.shm_segsz;
319
320 if(instance->callbacks->on_open_buffer)
321 instance->callbacks->on_open_buffer(instance->callbacks, buf);
322
323 pthread_mutex_lock(&instance->mutex);
324 instance->active_buffers++;
325 pthread_mutex_unlock(&instance->mutex);
326
327 return buf;
328
329 error:
330 free(buf);
331 return NULL;
332 }
333
334 static void destroy_buffer(struct libustd_callbacks *callbacks,
335 struct buffer_info *buf)
336 {
337 int result;
338
339 result = ustcomm_close_app(buf->conn);
340 if(result == -1) {
341 WARN("problem calling ustcomm_close_app");
342 }
343
344 result = shmdt(buf->mem);
345 if(result == -1) {
346 PERROR("shmdt");
347 }
348
349 result = shmdt(buf->bufstruct_mem);
350 if(result == -1) {
351 PERROR("shmdt");
352 }
353
354 if(callbacks->on_close_buffer)
355 callbacks->on_close_buffer(callbacks, buf);
356
357 free(buf->conn);
358 free(buf);
359 }
360
361 int consumer_loop(struct libustd_instance *instance, struct buffer_info *buf)
362 {
363 int result;
364
365 pthread_cleanup_push(decrement_active_buffers, instance);
366
367 for(;;) {
368 /* get the subbuffer */
369 result = get_subbuffer(buf);
370 if(result == -1) {
371 ERR("error getting subbuffer");
372 continue;
373 }
374 else if(result == GET_SUBBUF_DONE) {
375 /* this is done */
376 break;
377 }
378 else if(result == GET_SUBBUF_DIED) {
379 finish_consuming_dead_subbuffer(instance->callbacks, buf);
380 break;
381 }
382
383 if(instance->callbacks->on_read_subbuffer)
384 instance->callbacks->on_read_subbuffer(instance->callbacks, buf);
385
386 /* put the subbuffer */
387 result = put_subbuffer(buf);
388 if(result == -1) {
389 ERR("unknown error putting subbuffer (channel=%s)", buf->name);
390 break;
391 }
392 else if(result == PUT_SUBBUF_PUSHED) {
393 ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf->name);
394 break;
395 }
396 else if(result == PUT_SUBBUF_DIED) {
397 DBG("application died while putting subbuffer");
398 /* Skip the first subbuffer. We are not sure it is trustable
399 * because the put_subbuffer() did not complete.
400 */
401 if(instance->callbacks->on_put_error)
402 instance->callbacks->on_put_error(instance->callbacks, buf);
403
404 finish_consuming_dead_subbuffer(instance->callbacks, buf);
405 break;
406 }
407 else if(result == PUT_SUBBUF_DONE) {
408 /* Done with this subbuffer */
409 /* FIXME: add a case where this branch is used? Upon
410 * normal trace termination, at put_subbuf time, a
411 * special last-subbuffer code could be returned by
412 * the listener.
413 */
414 break;
415 }
416 else if(result == PUT_SUBBUF_OK) {
417 }
418 }
419
420 DBG("thread for buffer %s is stopping", buf->name);
421
422 /* FIXME: destroy, unalloc... */
423
424 pthread_cleanup_pop(1);
425
426 return 0;
427 }
428
429 struct consumer_thread_args {
430 pid_t pid;
431 const char *bufname;
432 struct libustd_instance *instance;
433 };
434
435 void *consumer_thread(void *arg)
436 {
437 struct buffer_info *buf;
438 struct consumer_thread_args *args = (struct consumer_thread_args *) arg;
439 int result;
440 sigset_t sigset;
441
442 DBG("GOT ARGS: pid %d bufname %s", args->pid, args->bufname);
443
444 if(args->instance->callbacks->on_new_thread)
445 args->instance->callbacks->on_new_thread(args->instance->callbacks);
446
447 /* Block signals that should be handled by the main thread. */
448 result = sigemptyset(&sigset);
449 if(result == -1) {
450 PERROR("sigemptyset");
451 goto end;
452 }
453 result = sigaddset(&sigset, SIGTERM);
454 if(result == -1) {
455 PERROR("sigaddset");
456 goto end;
457 }
458 result = sigaddset(&sigset, SIGINT);
459 if(result == -1) {
460 PERROR("sigaddset");
461 goto end;
462 }
463 result = sigprocmask(SIG_BLOCK, &sigset, NULL);
464 if(result == -1) {
465 PERROR("sigprocmask");
466 goto end;
467 }
468
469 buf = connect_buffer(args->instance, args->pid, args->bufname);
470 if(buf == NULL) {
471 ERR("failed to connect to buffer");
472 goto end;
473 }
474
475 consumer_loop(args->instance, buf);
476
477 destroy_buffer(args->instance->callbacks, buf);
478
479 end:
480
481 if(args->instance->callbacks->on_close_thread)
482 args->instance->callbacks->on_close_thread(args->instance->callbacks);
483
484 free((void *)args->bufname);
485 free(args);
486 return NULL;
487 }
488
489 int start_consuming_buffer(
490 struct libustd_instance *instance, pid_t pid, const char *bufname)
491 {
492 pthread_t thr;
493 struct consumer_thread_args *args;
494 int result;
495
496 DBG("beginning of start_consuming_buffer: args: pid %d bufname %s", pid, bufname);
497
498 args = (struct consumer_thread_args *) zmalloc(sizeof(struct consumer_thread_args));
499
500 args->pid = pid;
501 args->bufname = strdup(bufname);
502 args->instance = instance;
503 DBG("beginning2 of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname);
504
505 result = pthread_create(&thr, NULL, consumer_thread, args);
506 if(result == -1) {
507 ERR("pthread_create failed");
508 return -1;
509 }
510 result = pthread_detach(thr);
511 if(result == -1) {
512 ERR("pthread_detach failed");
513 return -1;
514 }
515 DBG("end of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname);
516
517 return 0;
518 }
519
520 int libustd_start_instance(struct libustd_instance *instance)
521 {
522 int result;
523 int timeout = -1;
524
525 if(!instance->is_init) {
526 ERR("libustd instance not initialized");
527 return 1;
528 }
529
530 /* app loop */
531 for(;;) {
532 char *recvbuf;
533
534 /* check for requests on our public socket */
535 result = ustcomm_ustd_recv_message(instance->comm, &recvbuf, NULL, timeout);
536 if(result == -1 && errno == EINTR) {
537 /* Caught signal */
538 }
539 else if(result == -1) {
540 ERR("error in ustcomm_ustd_recv_message");
541 goto loop_end;
542 }
543 else if(result > 0) {
544 if(!strncmp(recvbuf, "collect", 7)) {
545 pid_t pid;
546 char *bufname;
547 int result;
548
549 result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
550 if(result != 2) {
551 ERR("parsing error: %s", recvbuf);
552 goto free_bufname;
553 }
554
555 result = start_consuming_buffer(instance, pid, bufname);
556 if(result < 0) {
557 ERR("error in add_buffer");
558 goto free_bufname;
559 }
560
561 free_bufname:
562 free(bufname);
563 }
564 else if(!strncmp(recvbuf, "exit", 4)) {
565 /* Only there to force poll to return */
566 }
567 else {
568 WARN("unknown command: %s", recvbuf);
569 }
570
571 free(recvbuf);
572 }
573
574 loop_end:
575
576 if(instance->quit_program) {
577 pthread_mutex_lock(&instance->mutex);
578 if(instance->active_buffers == 0) {
579 pthread_mutex_unlock(&instance->mutex);
580 break;
581 }
582 pthread_mutex_unlock(&instance->mutex);
583 timeout = 100;
584 }
585 }
586
587 if(instance->callbacks->on_trace_end)
588 instance->callbacks->on_trace_end(instance);
589
590 libustd_delete_instance(instance);
591
592 return 0;
593 }
594
595 void libustd_delete_instance(struct libustd_instance *instance)
596 {
597 if(instance->is_init)
598 ustcomm_fini_ustd(instance->comm);
599
600 pthread_mutex_destroy(&instance->mutex);
601 free(instance->sock_path);
602 free(instance->comm);
603 free(instance);
604 }
605
606 int libustd_stop_instance(struct libustd_instance *instance, int send_msg)
607 {
608 int result;
609 int fd;
610 int bytes = 0;
611
612 char msg[] = "exit";
613
614 instance->quit_program = 1;
615
616 if(!send_msg)
617 return 0;
618
619 /* Send a message through the socket to force poll to return */
620
621 struct sockaddr_un addr;
622
623 result = fd = socket(PF_UNIX, SOCK_STREAM, 0);
624 if(result == -1) {
625 PERROR("socket");
626 return 1;
627 }
628
629 addr.sun_family = AF_UNIX;
630
631 strncpy(addr.sun_path, instance->sock_path, UNIX_PATH_MAX);
632 addr.sun_path[UNIX_PATH_MAX-1] = '\0';
633
634 result = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
635 if(result == -1) {
636 PERROR("connect");
637 }
638
639 while(bytes != sizeof(msg))
640 bytes += send(fd, msg, sizeof(msg), 0);
641
642 close(fd);
643
644 return 0;
645 }
646
647 struct libustd_instance *libustd_new_instance(
648 struct libustd_callbacks *callbacks, char *sock_path)
649 {
650 struct libustd_instance *instance =
651 zmalloc(sizeof(struct libustd_instance));
652 if(!instance)
653 return NULL;
654
655 instance->comm = malloc(sizeof(struct ustcomm_ustd));
656 if(!instance->comm) {
657 free(instance);
658 return NULL;
659 }
660
661 instance->callbacks = callbacks;
662 instance->quit_program = 0;
663 instance->is_init = 0;
664 instance->active_buffers = 0;
665 pthread_mutex_init(&instance->mutex, NULL);
666
667 if(sock_path)
668 instance->sock_path = strdup(sock_path);
669 else
670 instance->sock_path = NULL;
671
672 return instance;
673 }
674
675 int libustd_init_instance(struct libustd_instance *instance)
676 {
677 int result;
678 result = ustcomm_init_ustd(instance->comm, instance->sock_path);
679 if(result == -1) {
680 ERR("failed to initialize socket");
681 return 1;
682 }
683 instance->is_init = 1;
684 return 0;
685 }
686
This page took 0.05143 seconds and 5 git commands to generate.