create libustd from ustd
[lttng-ust.git] / libustd / libustd.c
... / ...
CommitLineData
1/* Copyright (C) 2009 Pierre-Marc Fournier
2 * 2010 Alexis Halle
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18
19#define _GNU_SOURCE
20
21#include <sys/shm.h>
22#include <unistd.h>
23#include <pthread.h>
24#include <signal.h>
25
26#include <stdlib.h>
27#include <stdio.h>
28#include <string.h>
29#include <errno.h>
30#include <assert.h>
31
32#include "libustd.h"
33#include "usterr.h"
34#include "ustcomm.h"
35
36/* return value: 0 = subbuffer is finished, it won't produce data anymore
37 * 1 = got subbuffer successfully
38 * <0 = error
39 */
40
41#define GET_SUBBUF_OK 1
42#define GET_SUBBUF_DONE 0
43#define GET_SUBBUF_DIED 2
44
45#define PUT_SUBBUF_OK 1
46#define PUT_SUBBUF_DIED 0
47#define PUT_SUBBUF_PUSHED 2
48#define PUT_SUBBUF_DONE 3
49
50#define UNIX_PATH_MAX 108
51
52int get_subbuffer(struct buffer_info *buf)
53{
54 char *send_msg=NULL;
55 char *received_msg=NULL;
56 char *rep_code=NULL;
57 int retval;
58 int result;
59
60 asprintf(&send_msg, "get_subbuffer %s", buf->name);
61 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
62 if((result == -1 && (errno == ECONNRESET || errno == EPIPE)) || result == 0) {
63 DBG("app died while being traced");
64 retval = GET_SUBBUF_DIED;
65 goto end;
66 }
67 else if(result < 0) {
68 ERR("get_subbuffer: ustcomm_send_request failed");
69 retval = -1;
70 goto end;
71 }
72
73 result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
74 if(result != 2 && result != 1) {
75 ERR("unable to parse response to get_subbuffer");
76 retval = -1;
77 free(received_msg);
78 goto end_rep;
79 }
80
81 if(!strcmp(rep_code, "OK")) {
82 DBG("got subbuffer %s", buf->name);
83 retval = GET_SUBBUF_OK;
84 }
85 else if(nth_token_is(received_msg, "END", 0) == 1) {
86 retval = GET_SUBBUF_DONE;
87 goto end_rep;
88 }
89 else if(!strcmp(received_msg, "NOTFOUND")) {
90 DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name);
91 retval = GET_SUBBUF_DIED;
92 goto end_rep;
93 }
94 else {
95 DBG("error getting subbuffer %s", buf->name);
96 retval = -1;
97 }
98
99 /* FIXME: free correctly the stuff */
100end_rep:
101 if(rep_code)
102 free(rep_code);
103end:
104 if(send_msg)
105 free(send_msg);
106 if(received_msg)
107 free(received_msg);
108
109 return retval;
110}
111
112int put_subbuffer(struct buffer_info *buf)
113{
114 char *send_msg=NULL;
115 char *received_msg=NULL;
116 char *rep_code=NULL;
117 int retval;
118 int result;
119
120 asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
121 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
122 if(result < 0 && (errno == ECONNRESET || errno == EPIPE)) {
123 retval = PUT_SUBBUF_DIED;
124 goto end;
125 }
126 else if(result < 0) {
127 ERR("put_subbuffer: send_message failed");
128 retval = -1;
129 goto end;
130 }
131 else if(result == 0) {
132 /* Program seems finished. However this might not be
133 * the last subbuffer that has to be collected.
134 */
135 retval = PUT_SUBBUF_DIED;
136 goto end;
137 }
138
139 result = sscanf(received_msg, "%as", &rep_code);
140 if(result != 1) {
141 ERR("unable to parse response to put_subbuffer");
142 retval = -1;
143 goto end_rep;
144 }
145
146 if(!strcmp(rep_code, "OK")) {
147 DBG("subbuffer put %s", buf->name);
148 retval = PUT_SUBBUF_OK;
149 }
150 else if(!strcmp(received_msg, "NOTFOUND")) {
151 DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name);
152 /* However, maybe this was not the last subbuffer. So
153 * we return the program died.
154 */
155 retval = PUT_SUBBUF_DIED;
156 goto end_rep;
157 }
158 else {
159 DBG("put_subbuffer: received error, we were pushed");
160 retval = PUT_SUBBUF_PUSHED;
161 goto end_rep;
162 }
163
164end_rep:
165 if(rep_code)
166 free(rep_code);
167
168end:
169 if(send_msg)
170 free(send_msg);
171 if(received_msg)
172 free(received_msg);
173
174 return retval;
175}
176
177void decrement_active_buffers(void *arg)
178{
179 struct libustd_instance *instance = arg;
180 pthread_mutex_lock(&instance->mutex);
181 instance->active_buffers--;
182 pthread_mutex_unlock(&instance->mutex);
183}
184
185struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, const char *bufname)
186{
187 struct buffer_info *buf;
188 char *send_msg;
189 char *received_msg;
190 int result;
191 char *tmp;
192 int fd;
193 struct shmid_ds shmds;
194
195 buf = (struct buffer_info *) malloc(sizeof(struct buffer_info));
196 if(buf == NULL) {
197 ERR("add_buffer: insufficient memory");
198 return NULL;
199 }
200
201 buf->name = bufname;
202 buf->pid = pid;
203
204 /* connect to app */
205 result = ustcomm_connect_app(buf->pid, &buf->conn);
206 if(result) {
207 WARN("unable to connect to process, it probably died before we were able to connect");
208 return NULL;
209 }
210
211 /* get pidunique */
212 asprintf(&send_msg, "get_pidunique");
213 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
214 free(send_msg);
215 if(result == -1) {
216 ERR("problem in ustcomm_send_request(get_pidunique)");
217 return NULL;
218 }
219 if(result == 0) {
220 goto error;
221 }
222
223 result = sscanf(received_msg, "%lld", &buf->pidunique);
224 if(result != 1) {
225 ERR("unable to parse response to get_pidunique");
226 return NULL;
227 }
228 free(received_msg);
229 DBG("got pidunique %lld", buf->pidunique);
230
231 /* get shmid */
232 asprintf(&send_msg, "get_shmid %s", buf->name);
233 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
234 free(send_msg);
235 if(result == -1) {
236 ERR("problem in ustcomm_send_request(get_shmid)");
237 return NULL;
238 }
239 if(result == 0) {
240 goto error;
241 }
242
243 result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid);
244 if(result != 2) {
245 ERR("unable to parse response to get_shmid (\"%s\")", received_msg);
246 return NULL;
247 }
248 free(received_msg);
249 DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid);
250
251 /* get n_subbufs */
252 asprintf(&send_msg, "get_n_subbufs %s", buf->name);
253 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
254 free(send_msg);
255 if(result == -1) {
256 ERR("problem in ustcomm_send_request(g_n_subbufs)");
257 return NULL;
258 }
259 if(result == 0) {
260 goto error;
261 }
262
263 result = sscanf(received_msg, "%d", &buf->n_subbufs);
264 if(result != 1) {
265 ERR("unable to parse response to get_n_subbufs");
266 return NULL;
267 }
268 free(received_msg);
269 DBG("got n_subbufs %d", buf->n_subbufs);
270
271 /* get subbuf size */
272 asprintf(&send_msg, "get_subbuf_size %s", buf->name);
273 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
274 free(send_msg);
275 if(result == -1) {
276 ERR("problem in ustcomm_send_request(get_subbuf_size)");
277 return NULL;
278 }
279 if(result == 0) {
280 goto error;
281 }
282
283 result = sscanf(received_msg, "%d", &buf->subbuf_size);
284 if(result != 1) {
285 ERR("unable to parse response to get_subbuf_size");
286 return NULL;
287 }
288 free(received_msg);
289 DBG("got subbuf_size %d", buf->subbuf_size);
290
291 /* attach memory */
292 buf->mem = shmat(buf->shmid, NULL, 0);
293 if(buf->mem == (void *) 0) {
294 PERROR("shmat");
295 return NULL;
296 }
297 DBG("successfully attached buffer memory");
298
299 buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0);
300 if(buf->bufstruct_mem == (void *) 0) {
301 PERROR("shmat");
302 return NULL;
303 }
304 DBG("successfully attached buffer bufstruct memory");
305
306 /* obtain info on the memory segment */
307 result = shmctl(buf->shmid, IPC_STAT, &shmds);
308 if(result == -1) {
309 PERROR("shmctl");
310 return NULL;
311 }
312 buf->memlen = shmds.shm_segsz;
313
314 if(instance->callbacks->on_open_buffer)
315 instance->callbacks->on_open_buffer(instance->callbacks, buf);
316
317 pthread_mutex_lock(&instance->mutex);
318 instance->active_buffers++;
319 pthread_mutex_unlock(&instance->mutex);
320
321 return buf;
322
323error:
324 free(buf);
325 return NULL;
326}
327
328static void destroy_buffer(struct libustd_callbacks *callbacks,
329 struct buffer_info *buf)
330{
331 int result;
332
333 result = ustcomm_close_app(&buf->conn);
334 if(result == -1) {
335 WARN("problem calling ustcomm_close_app");
336 }
337
338 result = shmdt(buf->mem);
339 if(result == -1) {
340 PERROR("shmdt");
341 }
342
343 result = shmdt(buf->bufstruct_mem);
344 if(result == -1) {
345 PERROR("shmdt");
346 }
347
348 if(callbacks->on_close_buffer)
349 callbacks->on_close_buffer(callbacks, buf);
350
351 free(buf);
352}
353
354int consumer_loop(struct libustd_instance *instance, struct buffer_info *buf)
355{
356 int result;
357
358 pthread_cleanup_push(decrement_active_buffers, instance);
359
360 for(;;) {
361 /* get the subbuffer */
362 result = get_subbuffer(buf);
363 if(result == -1) {
364 ERR("error getting subbuffer");
365 continue;
366 }
367 else if(result == GET_SUBBUF_DONE) {
368 /* this is done */
369 break;
370 }
371 else if(result == GET_SUBBUF_DIED) {
372 finish_consuming_dead_subbuffer(instance->callbacks, buf);
373 break;
374 }
375
376 if(instance->callbacks->on_read_subbuffer)
377 instance->callbacks->on_read_subbuffer(instance->callbacks, buf);
378
379 /* put the subbuffer */
380 result = put_subbuffer(buf);
381 if(result == -1) {
382 ERR("unknown error putting subbuffer (channel=%s)", buf->name);
383 break;
384 }
385 else if(result == PUT_SUBBUF_PUSHED) {
386 ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf->name);
387 break;
388 }
389 else if(result == PUT_SUBBUF_DIED) {
390 DBG("application died while putting subbuffer");
391 /* Skip the first subbuffer. We are not sure it is trustable
392 * because the put_subbuffer() did not complete.
393 */
394 if(instance->callbacks->on_put_error)
395 instance->callbacks->on_put_error(instance->callbacks, buf);
396
397 finish_consuming_dead_subbuffer(instance->callbacks, buf);
398 break;
399 }
400 else if(result == PUT_SUBBUF_DONE) {
401 /* Done with this subbuffer */
402 /* FIXME: add a case where this branch is used? Upon
403 * normal trace termination, at put_subbuf time, a
404 * special last-subbuffer code could be returned by
405 * the listener.
406 */
407 break;
408 }
409 else if(result == PUT_SUBBUF_OK) {
410 }
411 }
412
413 DBG("thread for buffer %s is stopping", buf->name);
414
415 /* FIXME: destroy, unalloc... */
416
417 pthread_cleanup_pop(1);
418
419 return 0;
420}
421
422struct consumer_thread_args {
423 pid_t pid;
424 const char *bufname;
425 struct libustd_instance *instance;
426};
427
428void *consumer_thread(void *arg)
429{
430 struct buffer_info *buf;
431 struct consumer_thread_args *args = (struct consumer_thread_args *) arg;
432 int result;
433 sigset_t sigset;
434
435 DBG("GOT ARGS: pid %d bufname %s", args->pid, args->bufname);
436
437 if(args->instance->callbacks->on_new_thread)
438 args->instance->callbacks->on_new_thread(args->instance->callbacks);
439
440 /* Block signals that should be handled by the main thread. */
441 result = sigemptyset(&sigset);
442 if(result == -1) {
443 PERROR("sigemptyset");
444 goto end;
445 }
446 result = sigaddset(&sigset, SIGTERM);
447 if(result == -1) {
448 PERROR("sigaddset");
449 goto end;
450 }
451 result = sigaddset(&sigset, SIGINT);
452 if(result == -1) {
453 PERROR("sigaddset");
454 goto end;
455 }
456 result = sigprocmask(SIG_BLOCK, &sigset, NULL);
457 if(result == -1) {
458 PERROR("sigprocmask");
459 goto end;
460 }
461
462 buf = connect_buffer(args->instance, args->pid, args->bufname);
463 if(buf == NULL) {
464 ERR("failed to connect to buffer");
465 goto end;
466 }
467
468 consumer_loop(args->instance, buf);
469
470 destroy_buffer(args->instance->callbacks, buf);
471
472 end:
473
474 if(args->instance->callbacks->on_close_thread)
475 args->instance->callbacks->on_close_thread(args->instance->callbacks);
476
477 free((void *)args->bufname);
478 free(args);
479 return NULL;
480}
481
482int start_consuming_buffer(
483 struct libustd_instance *instance, pid_t pid, const char *bufname)
484{
485 pthread_t thr;
486 struct consumer_thread_args *args;
487 int result;
488
489 DBG("beginning of start_consuming_buffer: args: pid %d bufname %s", pid, bufname);
490
491 args = (struct consumer_thread_args *) malloc(sizeof(struct consumer_thread_args));
492
493 args->pid = pid;
494 args->bufname = strdup(bufname);
495 args->instance = instance;
496 DBG("beginning2 of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname);
497
498 result = pthread_create(&thr, NULL, consumer_thread, args);
499 if(result == -1) {
500 ERR("pthread_create failed");
501 return -1;
502 }
503 result = pthread_detach(thr);
504 if(result == -1) {
505 ERR("pthread_detach failed");
506 return -1;
507 }
508 DBG("end of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname);
509
510 return 0;
511}
512
513int libustd_start_instance(struct libustd_instance *instance)
514{
515 int result;
516 int timeout = -1;
517
518 if(!instance->is_init) {
519 ERR("libustd instance not initialized");
520 return 1;
521 }
522
523 /* app loop */
524 for(;;) {
525 char *recvbuf;
526
527 /* check for requests on our public socket */
528 result = ustcomm_ustd_recv_message(&instance->comm, &recvbuf, NULL, timeout);
529 if(result == -1 && errno == EINTR) {
530 /* Caught signal */
531 }
532 else if(result == -1) {
533 ERR("error in ustcomm_ustd_recv_message");
534 goto loop_end;
535 }
536 else if(result > 0) {
537 if(!strncmp(recvbuf, "collect", 7)) {
538 pid_t pid;
539 char *bufname;
540 int result;
541
542 result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
543 if(result != 2) {
544 ERR("parsing error: %s", recvbuf);
545 goto free_bufname;
546 }
547
548 result = start_consuming_buffer(instance, pid, bufname);
549 if(result < 0) {
550 ERR("error in add_buffer");
551 goto free_bufname;
552 }
553
554 free_bufname:
555 free(bufname);
556 }
557 else if(!strncmp(recvbuf, "exit", 4)) {
558 /* Only there to force poll to return */
559 }
560 else {
561 WARN("unknown command: %s", recvbuf);
562 }
563
564 free(recvbuf);
565 }
566
567 loop_end:
568
569 if(instance->quit_program) {
570 pthread_mutex_lock(&instance->mutex);
571 if(instance->active_buffers == 0) {
572 pthread_mutex_unlock(&instance->mutex);
573 break;
574 }
575 pthread_mutex_unlock(&instance->mutex);
576 timeout = 100;
577 }
578 }
579
580 if(instance->callbacks->on_trace_end)
581 instance->callbacks->on_trace_end(instance);
582
583 libustd_delete_instance(instance);
584
585 return 0;
586}
587
588void libustd_delete_instance(struct libustd_instance *instance)
589{
590 if(instance->is_init)
591 ustcomm_fini_ustd(&instance->comm);
592
593 pthread_mutex_destroy(&instance->mutex);
594 free(instance->sock_path);
595 free(instance);
596}
597
598int libustd_stop_instance(struct libustd_instance *instance, int send_msg)
599{
600 int result;
601 int fd;
602 int bytes = 0;
603
604 char msg[] = "exit";
605
606 instance->quit_program = 1;
607
608 if(!send_msg)
609 return 0;
610
611 /* Send a message through the socket to force poll to return */
612
613 struct sockaddr_un addr;
614
615 result = fd = socket(PF_UNIX, SOCK_STREAM, 0);
616 if(result == -1) {
617 PERROR("socket");
618 return 1;
619 }
620
621 addr.sun_family = AF_UNIX;
622
623 strncpy(addr.sun_path, instance->sock_path, UNIX_PATH_MAX);
624 addr.sun_path[UNIX_PATH_MAX-1] = '\0';
625
626 result = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
627 if(result == -1) {
628 PERROR("connect");
629 }
630
631 while(bytes != sizeof(msg))
632 bytes += send(fd, msg, sizeof(msg), 0);
633
634 close(fd);
635
636 return 0;
637}
638
639struct libustd_instance *libustd_new_instance(
640 struct libustd_callbacks *callbacks, char *sock_path)
641{
642 struct libustd_instance *instance =
643 malloc(sizeof(struct libustd_instance));
644 if(!instance)
645 return NULL;
646
647 instance->callbacks = callbacks;
648 instance->quit_program = 0;
649 instance->is_init = 0;
650 instance->active_buffers = 0;
651 pthread_mutex_init(&instance->mutex, NULL);
652
653 if(sock_path)
654 instance->sock_path = strdup(sock_path);
655 else
656 instance->sock_path = NULL;
657
658 return instance;
659}
660
661int libustd_init_instance(struct libustd_instance *instance)
662{
663 int result;
664 result = ustcomm_init_ustd(&instance->comm, instance->sock_path);
665 if(result == -1) {
666 ERR("failed to initialize socket");
667 return 1;
668 }
669 instance->is_init = 1;
670 return 0;
671}
672
This page took 0.025644 seconds and 4 git commands to generate.