UST-wide warning fixes
[lttng-ust.git] / libustconsumer / libustconsumer.c
1 /* Copyright (C) 2009 Pierre-Marc Fournier
2 * 2010 Alexis Halle
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18
19 #define _GNU_SOURCE
20
21 #include <sys/epoll.h>
22 #include <sys/shm.h>
23 #include <sys/types.h>
24 #include <sys/stat.h>
25 #include <unistd.h>
26 #include <pthread.h>
27 #include <signal.h>
28
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <string.h>
32 #include <errno.h>
33 #include <assert.h>
34
35 #include <ust/ustconsumer.h>
36 #include "lowlevel.h"
37 #include "usterr.h"
38 #include "ustcomm.h"
39
40 #define GET_SUBBUF_OK 1
41 #define GET_SUBBUF_DONE 0
42 #define GET_SUBBUF_DIED 2
43
44 #define PUT_SUBBUF_OK 1
45 #define PUT_SUBBUF_DIED 0
46 #define PUT_SUBBUF_PUSHED 2
47 #define PUT_SUBBUF_DONE 3
48
49 #define UNIX_PATH_MAX 108
50
51 static int get_subbuffer(struct buffer_info *buf)
52 {
53 struct ustcomm_header _send_hdr, *send_hdr;
54 struct ustcomm_header _recv_hdr, *recv_hdr;
55 struct ustcomm_buffer_info _send_msg, _recv_msg;
56 struct ustcomm_buffer_info *send_msg, *recv_msg;
57 int result;
58
59 send_hdr = &_send_hdr;
60 recv_hdr = &_recv_hdr;
61 send_msg = &_send_msg;
62 recv_msg = &_recv_msg;
63
64 result = ustcomm_pack_buffer_info(send_hdr, send_msg, buf->trace,
65 buf->channel, buf->channel_cpu);
66 if (result < 0) {
67 return result;
68 }
69
70 send_hdr->command = GET_SUBBUFFER;
71
72 result = ustcomm_req(buf->app_sock, send_hdr, (char *)send_msg,
73 recv_hdr, (char *)recv_msg);
74 if ((result < 0 && (errno == ECONNRESET || errno == EPIPE)) ||
75 result == 0) {
76 DBG("app died while being traced");
77 return GET_SUBBUF_DIED;
78 } else if (result < 0) {
79 ERR("get_subbuffer: ustcomm_req failed");
80 return result;
81 }
82
83 if (!recv_hdr->result) {
84 DBG("got subbuffer %s", buf->name);
85 buf->consumed_old = recv_msg->consumed_old;
86 return GET_SUBBUF_OK;
87 } else if (recv_hdr->result == -ENODATA) {
88 DBG("For buffer %s, the trace was not found. This likely means"
89 " it was destroyed by the user.", buf->name);
90 return GET_SUBBUF_DIED;
91 }
92
93 DBG("error getting subbuffer %s", buf->name);
94 return recv_hdr->result;
95 }
96
97 static int put_subbuffer(struct buffer_info *buf)
98 {
99 struct ustcomm_header _send_hdr, *send_hdr;
100 struct ustcomm_header _recv_hdr, *recv_hdr;
101 struct ustcomm_buffer_info _send_msg, *send_msg;
102 int result;
103
104 send_hdr = &_send_hdr;
105 recv_hdr = &_recv_hdr;
106 send_msg = &_send_msg;
107
108 result = ustcomm_pack_buffer_info(send_hdr, send_msg, buf->trace,
109 buf->channel, buf->channel_cpu);
110 if (result < 0) {
111 return result;
112 }
113
114 send_hdr->command = PUT_SUBBUFFER;
115 send_msg->consumed_old = buf->consumed_old;
116
117 result = ustcomm_req(buf->app_sock, send_hdr, (char *)send_msg,
118 recv_hdr, NULL);
119 if ((result < 0 && (errno == ECONNRESET || errno == EPIPE)) ||
120 result == 0) {
121 DBG("app died while being traced");
122 return PUT_SUBBUF_DIED;
123 } else if (result < 0) {
124 ERR("put_subbuffer: ustcomm_req failed");
125 return result;
126 }
127
128 if (!recv_hdr->result) {
129 DBG("put subbuffer %s", buf->name);
130 return PUT_SUBBUF_OK;
131 } else if (recv_hdr->result == -ENODATA) {
132 DBG("For buffer %s, the trace was not found. This likely means"
133 " it was destroyed by the user.", buf->name);
134 return PUT_SUBBUF_DIED;
135 }
136
137 DBG("error getting subbuffer %s", buf->name);
138 return recv_hdr->result;
139 }
140
141 void decrement_active_buffers(void *arg)
142 {
143 struct ustconsumer_instance *instance = arg;
144 pthread_mutex_lock(&instance->mutex);
145 instance->active_buffers--;
146 pthread_mutex_unlock(&instance->mutex);
147 }
148
149 static int get_pidunique(int sock, s64 *pidunique)
150 {
151 struct ustcomm_header _send_hdr, *send_hdr;
152 struct ustcomm_header _recv_hdr, *recv_hdr;
153 struct ustcomm_pidunique _recv_msg, *recv_msg;
154 int result;
155
156 send_hdr = &_send_hdr;
157 recv_hdr = &_recv_hdr;
158 recv_msg = &_recv_msg;
159
160 memset(send_hdr, 0, sizeof(*send_hdr));
161
162 send_hdr->command = GET_PIDUNIQUE;
163 result = ustcomm_req(sock, send_hdr, NULL, recv_hdr, (char *)recv_msg);
164 if (result < 1) {
165 return -ENOTCONN;
166 }
167 if (recv_hdr->result < 0) {
168 ERR("App responded with error: %s", strerror(recv_hdr->result));
169 return recv_hdr->result;
170 }
171
172 *pidunique = recv_msg->pidunique;
173
174 return 0;
175 }
176
177 static int get_buf_shmid_pipe_fd(int sock, struct buffer_info *buf,
178 int *buf_shmid, int *buf_struct_shmid,
179 int *buf_pipe_fd)
180 {
181 struct ustcomm_header _send_hdr, *send_hdr;
182 struct ustcomm_header _recv_hdr, *recv_hdr;
183 struct ustcomm_buffer_info _send_msg, *send_msg;
184 struct ustcomm_buffer_info _recv_msg, *recv_msg;
185 int result, recv_pipe_fd;
186
187 send_hdr = &_send_hdr;
188 recv_hdr = &_recv_hdr;
189 send_msg = &_send_msg;
190 recv_msg = &_recv_msg;
191
192 result = ustcomm_pack_buffer_info(send_hdr, send_msg, buf->trace,
193 buf->channel, buf->channel_cpu);
194 if (result < 0) {
195 ERR("Failed to pack buffer info");
196 return result;
197 }
198
199 send_hdr->command = GET_BUF_SHMID_PIPE_FD;
200
201 result = ustcomm_send(sock, send_hdr, (char *)send_msg);
202 if (result < 1) {
203 ERR("Failed to send request");
204 return -ENOTCONN;
205 }
206 result = ustcomm_recv_fd(sock, recv_hdr, (char *)recv_msg, &recv_pipe_fd);
207 if (result < 1) {
208 ERR("Failed to receive message and fd");
209 return -ENOTCONN;
210 }
211 if (recv_hdr->result < 0) {
212 ERR("App responded with error %s", strerror(recv_hdr->result));
213 return recv_hdr->result;
214 }
215
216 *buf_shmid = recv_msg->buf_shmid;
217 *buf_struct_shmid = recv_msg->buf_struct_shmid;
218 *buf_pipe_fd = recv_pipe_fd;
219
220 return 0;
221 }
222
223 static int get_subbuf_num_size(int sock, struct buffer_info *buf,
224 int *subbuf_num, int *subbuf_size)
225 {
226 struct ustcomm_header _send_hdr, *send_hdr;
227 struct ustcomm_header _recv_hdr, *recv_hdr;
228 struct ustcomm_channel_info _send_msg, *send_msg;
229 struct ustcomm_channel_info _recv_msg, *recv_msg;
230 int result;
231
232 send_hdr = &_send_hdr;
233 recv_hdr = &_recv_hdr;
234 send_msg = &_send_msg;
235 recv_msg = &_recv_msg;
236
237 result = ustcomm_pack_channel_info(send_hdr, send_msg, buf->trace,
238 buf->channel);
239 if (result < 0) {
240 return result;
241 }
242
243 send_hdr->command = GET_SUBBUF_NUM_SIZE;
244
245 result = ustcomm_req(sock, send_hdr, (char *)send_msg,
246 recv_hdr, (char *)recv_msg);
247 if (result < 1) {
248 return -ENOTCONN;
249 }
250
251 *subbuf_num = recv_msg->subbuf_num;
252 *subbuf_size = recv_msg->subbuf_size;
253
254 return recv_hdr->result;
255 }
256
257
258 static int notify_buffer_mapped(int sock, struct buffer_info *buf)
259 {
260 struct ustcomm_header _send_hdr, *send_hdr;
261 struct ustcomm_header _recv_hdr, *recv_hdr;
262 struct ustcomm_buffer_info _send_msg, *send_msg;
263 int result;
264
265 send_hdr = &_send_hdr;
266 recv_hdr = &_recv_hdr;
267 send_msg = &_send_msg;
268
269 result = ustcomm_pack_buffer_info(send_hdr, send_msg, buf->trace,
270 buf->channel, buf->channel_cpu);
271 if (result < 0) {
272 return result;
273 }
274
275 send_hdr->command = NOTIFY_BUF_MAPPED;
276
277 result = ustcomm_req(sock, send_hdr, (char *)send_msg,
278 recv_hdr, NULL);
279 if (result < 1) {
280 return -ENOTCONN;
281 }
282
283 return recv_hdr->result;
284 }
285
286
287 struct buffer_info *connect_buffer(struct ustconsumer_instance *instance, pid_t pid,
288 const char *trace, const char *channel,
289 int channel_cpu)
290 {
291 struct buffer_info *buf;
292 int result;
293 struct shmid_ds shmds;
294
295 buf = (struct buffer_info *) zmalloc(sizeof(struct buffer_info));
296 if(buf == NULL) {
297 ERR("add_buffer: insufficient memory");
298 return NULL;
299 }
300
301 buf->trace = strdup(trace);
302 if (!buf->trace) {
303 goto free_buf;
304 }
305
306 buf->channel = strdup(channel);
307 if (!buf->channel) {
308 goto free_buf_trace;
309 }
310
311 result = asprintf(&buf->name, "%s_%d", channel, channel_cpu);
312 if (result < 0 || buf->name == NULL) {
313 goto free_buf_channel;
314 }
315
316 buf->channel_cpu = channel_cpu;
317 buf->pid = pid;
318
319 result = ustcomm_connect_app(buf->pid, &buf->app_sock);
320 if(result) {
321 WARN("unable to connect to process, it probably died before we were able to connect");
322 goto free_buf_name;
323 }
324
325 /* get pidunique */
326 result = get_pidunique(buf->app_sock, &buf->pidunique);
327 if (result < 0) {
328 ERR("Failed to get pidunique");
329 goto close_app_sock;
330 }
331
332 /* get shmid and pipe fd */
333 result = get_buf_shmid_pipe_fd(buf->app_sock, buf, &buf->shmid,
334 &buf->bufstruct_shmid, &buf->pipe_fd);
335 if (result < 0) {
336 ERR("Failed to get buf_shmid and pipe_fd");
337 goto close_app_sock;
338 } else {
339 struct stat temp;
340 fstat(buf->pipe_fd, &temp);
341 if (!S_ISFIFO(temp.st_mode)) {
342 ERR("Didn't receive a fifo from the app");
343 goto close_app_sock;
344 }
345 }
346
347
348 /* get number of subbufs and subbuf size */
349 result = get_subbuf_num_size(buf->app_sock, buf, &buf->n_subbufs,
350 &buf->subbuf_size);
351 if (result < 0) {
352 ERR("Failed to get subbuf number and size");
353 goto close_fifo;
354 }
355
356 /* attach memory */
357 buf->mem = shmat(buf->shmid, NULL, 0);
358 if(buf->mem == (void *) 0) {
359 PERROR("shmat");
360 goto close_fifo;
361 }
362 DBG("successfully attached buffer memory");
363
364 buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0);
365 if(buf->bufstruct_mem == (void *) 0) {
366 PERROR("shmat");
367 goto shmdt_mem;
368 }
369 DBG("successfully attached buffer bufstruct memory");
370
371 /* obtain info on the memory segment */
372 result = shmctl(buf->shmid, IPC_STAT, &shmds);
373 if(result == -1) {
374 PERROR("shmctl");
375 goto shmdt_bufstruct_mem;
376 }
377 buf->memlen = shmds.shm_segsz;
378
379 /* Notify the application that we have mapped the buffer */
380 result = notify_buffer_mapped(buf->app_sock, buf);
381 if (result < 0) {
382 goto shmdt_bufstruct_mem;
383 }
384
385 if(instance->callbacks->on_open_buffer)
386 instance->callbacks->on_open_buffer(instance->callbacks, buf);
387
388 pthread_mutex_lock(&instance->mutex);
389 instance->active_buffers++;
390 pthread_mutex_unlock(&instance->mutex);
391
392 return buf;
393
394 shmdt_bufstruct_mem:
395 shmdt(buf->bufstruct_mem);
396
397 shmdt_mem:
398 shmdt(buf->mem);
399
400 close_fifo:
401 close(buf->pipe_fd);
402
403 close_app_sock:
404 close(buf->app_sock);
405
406 free_buf_name:
407 free(buf->name);
408
409 free_buf_channel:
410 free(buf->channel);
411
412 free_buf_trace:
413 free(buf->trace);
414
415 free_buf:
416 free(buf);
417 return NULL;
418 }
419
420 static void destroy_buffer(struct ustconsumer_callbacks *callbacks,
421 struct buffer_info *buf)
422 {
423 int result;
424
425 result = close(buf->app_sock);
426 if(result == -1) {
427 WARN("problem calling ustcomm_close_app");
428 }
429
430 result = shmdt(buf->mem);
431 if(result == -1) {
432 PERROR("shmdt");
433 }
434
435 result = shmdt(buf->bufstruct_mem);
436 if(result == -1) {
437 PERROR("shmdt");
438 }
439
440 if(callbacks->on_close_buffer)
441 callbacks->on_close_buffer(callbacks, buf);
442
443 free(buf);
444 }
445
446 int consumer_loop(struct ustconsumer_instance *instance, struct buffer_info *buf)
447 {
448 int result = 0;
449 int read_result;
450 char read_buf;
451
452 pthread_cleanup_push(decrement_active_buffers, instance);
453
454 for(;;) {
455 read_result = read(buf->pipe_fd, &read_buf, 1);
456 /* get the subbuffer */
457 if (read_result == 1) {
458 result = get_subbuffer(buf);
459 if (result < 0) {
460 ERR("error getting subbuffer");
461 continue;
462 } else if (result == GET_SUBBUF_DIED) {
463 finish_consuming_dead_subbuffer(instance->callbacks, buf);
464 break;
465 }
466 } else if ((read_result == -1 && (errno == ECONNRESET || errno == EPIPE)) ||
467 result == 0) {
468 DBG("App died while being traced");
469 finish_consuming_dead_subbuffer(instance->callbacks, buf);
470 break;
471 }
472
473 if(instance->callbacks->on_read_subbuffer)
474 instance->callbacks->on_read_subbuffer(instance->callbacks, buf);
475
476 /* put the subbuffer */
477 result = put_subbuffer(buf);
478 if(result == -1) {
479 ERR("unknown error putting subbuffer (channel=%s)", buf->name);
480 break;
481 }
482 else if(result == PUT_SUBBUF_PUSHED) {
483 ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf->name);
484 break;
485 }
486 else if(result == PUT_SUBBUF_DIED) {
487 DBG("application died while putting subbuffer");
488 /* Skip the first subbuffer. We are not sure it is trustable
489 * because the put_subbuffer() did not complete.
490 */
491 /* TODO: check on_put_error return value */
492 if(instance->callbacks->on_put_error)
493 instance->callbacks->on_put_error(instance->callbacks, buf);
494
495 finish_consuming_dead_subbuffer(instance->callbacks, buf);
496 break;
497 }
498 else if(result == PUT_SUBBUF_DONE) {
499 /* Done with this subbuffer */
500 /* FIXME: add a case where this branch is used? Upon
501 * normal trace termination, at put_subbuf time, a
502 * special last-subbuffer code could be returned by
503 * the listener.
504 */
505 break;
506 }
507 else if(result == PUT_SUBBUF_OK) {
508 }
509 }
510
511 DBG("thread for buffer %s is stopping", buf->name);
512
513 /* FIXME: destroy, unalloc... */
514
515 pthread_cleanup_pop(1);
516
517 return 0;
518 }
519
520 struct consumer_thread_args {
521 pid_t pid;
522 const char *trace;
523 const char *channel;
524 int channel_cpu;
525 struct ustconsumer_instance *instance;
526 };
527
528 void *consumer_thread(void *arg)
529 {
530 struct buffer_info *buf;
531 struct consumer_thread_args *args = (struct consumer_thread_args *) arg;
532 int result;
533 sigset_t sigset;
534
535 if(args->instance->callbacks->on_new_thread)
536 args->instance->callbacks->on_new_thread(args->instance->callbacks);
537
538 /* Block signals that should be handled by the main thread. */
539 result = sigemptyset(&sigset);
540 if(result == -1) {
541 PERROR("sigemptyset");
542 goto end;
543 }
544 result = sigaddset(&sigset, SIGTERM);
545 if(result == -1) {
546 PERROR("sigaddset");
547 goto end;
548 }
549 result = sigaddset(&sigset, SIGINT);
550 if(result == -1) {
551 PERROR("sigaddset");
552 goto end;
553 }
554 result = sigprocmask(SIG_BLOCK, &sigset, NULL);
555 if(result == -1) {
556 PERROR("sigprocmask");
557 goto end;
558 }
559
560 buf = connect_buffer(args->instance, args->pid, args->trace,
561 args->channel, args->channel_cpu);
562 if(buf == NULL) {
563 ERR("failed to connect to buffer");
564 goto end;
565 }
566
567 consumer_loop(args->instance, buf);
568
569 destroy_buffer(args->instance->callbacks, buf);
570
571 end:
572
573 if(args->instance->callbacks->on_close_thread)
574 args->instance->callbacks->on_close_thread(args->instance->callbacks);
575
576 free((void *)args->channel);
577 free(args);
578 return NULL;
579 }
580
581 int start_consuming_buffer(struct ustconsumer_instance *instance, pid_t pid,
582 const char *trace, const char *channel,
583 int channel_cpu)
584 {
585 pthread_t thr;
586 struct consumer_thread_args *args;
587 int result;
588
589 DBG("beginning of start_consuming_buffer: args: pid %d bufname %s_%d", pid, channel,
590 channel_cpu);
591
592 args = (struct consumer_thread_args *) zmalloc(sizeof(struct consumer_thread_args));
593 if (!args) {
594 return -ENOMEM;
595 }
596
597 args->pid = pid;
598 args->trace = strdup(trace);
599 args->channel = strdup(channel);
600 args->channel_cpu = channel_cpu;
601 args->instance = instance;
602 DBG("beginning2 of start_consuming_buffer: args: pid %d trace %s"
603 " bufname %s_%d", args->pid, args->trace, args->channel, args->channel_cpu);
604
605 result = pthread_create(&thr, NULL, consumer_thread, args);
606 if(result == -1) {
607 ERR("pthread_create failed");
608 return -1;
609 }
610 result = pthread_detach(thr);
611 if(result == -1) {
612 ERR("pthread_detach failed");
613 return -1;
614 }
615 DBG("end of start_consuming_buffer: args: pid %d trace %s "
616 "bufname %s_%d", args->pid, args->channel, args->trace, args->channel_cpu);
617
618 return 0;
619 }
620 static void process_client_cmd(int sock, struct ustcomm_header *req_header,
621 char *recvbuf, struct ustconsumer_instance *instance)
622 {
623 int result;
624 struct ustcomm_header _res_header;
625 struct ustcomm_header *res_header = &_res_header;
626 struct ustcomm_buffer_info *buf_inf;
627
628 DBG("Processing client command");
629
630 switch (req_header->command) {
631 case CONSUME_BUFFER:
632
633 buf_inf = (struct ustcomm_buffer_info *)recvbuf;
634 result = ustcomm_unpack_buffer_info(buf_inf);
635 if (result < 0) {
636 ERR("Couldn't unpack buffer info");
637 return;
638 }
639
640 DBG("Going to consume trace %s buffer %s_%d in process %d",
641 buf_inf->trace, buf_inf->channel, buf_inf->ch_cpu,
642 buf_inf->pid);
643 result = start_consuming_buffer(instance, buf_inf->pid,
644 buf_inf->trace,
645 buf_inf->channel,
646 buf_inf->ch_cpu);
647 if (result < 0) {
648 ERR("error in add_buffer");
649 return;
650 }
651
652 res_header->result = 0;
653 break;
654 case EXIT:
655 res_header->result = 0;
656 /* Only there to force poll to return */
657 break;
658 default:
659 res_header->result = -EINVAL;
660 WARN("unknown command: %d", req_header->command);
661 }
662
663 if (ustcomm_send(sock, res_header, NULL) <= 0) {
664 ERR("couldn't send command response");
665 }
666 }
667
668 #define MAX_EVENTS 10
669
670 int ustconsumer_start_instance(struct ustconsumer_instance *instance)
671 {
672 struct ustcomm_header recv_hdr;
673 char recv_buf[USTCOMM_BUFFER_SIZE];
674 struct ustcomm_sock *epoll_sock;
675 struct epoll_event events[MAX_EVENTS];
676 struct sockaddr addr;
677 int result, epoll_fd, accept_fd, nfds, i, addr_size, timeout;
678
679 if(!instance->is_init) {
680 ERR("libustconsumer instance not initialized");
681 return 1;
682 }
683 epoll_fd = instance->epoll_fd;
684
685 timeout = -1;
686
687 /* app loop */
688 for(;;) {
689 nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, timeout);
690 if (nfds == -1 && errno == EINTR) {
691 /* Caught signal */
692 } else if (nfds == -1) {
693 PERROR("ustconsumer_start_instance: epoll_wait failed");
694 continue;
695 }
696
697 for (i = 0; i < nfds; ++i) {
698 epoll_sock = (struct ustcomm_sock *)events[i].data.ptr;
699 if (epoll_sock == instance->listen_sock) {
700 addr_size = sizeof(struct sockaddr);
701 accept_fd = accept(epoll_sock->fd,
702 &addr,
703 (socklen_t *)&addr_size);
704 if (accept_fd == -1) {
705 PERROR("ustconsumer_start_instance: "
706 "accept failed");
707 continue;
708 }
709 ustcomm_init_sock(accept_fd, epoll_fd,
710 &instance->connections);
711 } else {
712 result = ustcomm_recv(epoll_sock->fd, &recv_hdr,
713 recv_buf);
714 if (result < 1) {
715 ustcomm_del_sock(epoll_sock, 0);
716 } else {
717 process_client_cmd(epoll_sock->fd,
718 &recv_hdr, recv_buf,
719 instance);
720 }
721
722 }
723 }
724
725 if (instance->quit_program) {
726 pthread_mutex_lock(&instance->mutex);
727 if(instance->active_buffers == 0) {
728 pthread_mutex_unlock(&instance->mutex);
729 break;
730 }
731 pthread_mutex_unlock(&instance->mutex);
732 timeout = 100;
733 }
734 }
735
736 if(instance->callbacks->on_trace_end)
737 instance->callbacks->on_trace_end(instance);
738
739 ustconsumer_delete_instance(instance);
740
741 return 0;
742 }
743
744 /* FIXME: threads and connections !? */
745 void ustconsumer_delete_instance(struct ustconsumer_instance *instance)
746 {
747 if (instance->is_init) {
748 ustcomm_del_named_sock(instance->listen_sock, 0);
749 close(instance->epoll_fd);
750 }
751
752 pthread_mutex_destroy(&instance->mutex);
753 free(instance->sock_path);
754 free(instance);
755 }
756
757 /* FIXME: Do something about the fixed path length, maybe get rid
758 * of the whole concept and use a pipe?
759 */
760 int ustconsumer_stop_instance(struct ustconsumer_instance *instance, int send_msg)
761 {
762 int result;
763 int fd;
764 int bytes = 0;
765
766 char msg[] = "exit";
767
768 instance->quit_program = 1;
769
770 if(!send_msg)
771 return 0;
772
773 /* Send a message through the socket to force poll to return */
774
775 struct sockaddr_un addr;
776
777 result = fd = socket(PF_UNIX, SOCK_STREAM, 0);
778 if(result == -1) {
779 PERROR("socket");
780 return 1;
781 }
782
783 addr.sun_family = AF_UNIX;
784
785 strncpy(addr.sun_path, instance->sock_path, UNIX_PATH_MAX);
786 addr.sun_path[UNIX_PATH_MAX-1] = '\0';
787
788 result = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
789 if(result == -1) {
790 PERROR("connect");
791 }
792
793 while(bytes != sizeof(msg))
794 bytes += send(fd, msg, sizeof(msg), 0);
795
796 close(fd);
797
798 return 0;
799 }
800
801 struct ustconsumer_instance
802 *ustconsumer_new_instance(struct ustconsumer_callbacks *callbacks,
803 char *sock_path)
804 {
805 struct ustconsumer_instance *instance =
806 zmalloc(sizeof(struct ustconsumer_instance));
807 if(!instance) {
808 return NULL;
809 }
810
811 instance->callbacks = callbacks;
812 instance->quit_program = 0;
813 instance->is_init = 0;
814 instance->active_buffers = 0;
815 pthread_mutex_init(&instance->mutex, NULL);
816
817 if (sock_path) {
818 instance->sock_path = strdup(sock_path);
819 } else {
820 instance->sock_path = NULL;
821 }
822
823 return instance;
824 }
825
826 static int init_ustconsumer_socket(struct ustconsumer_instance *instance)
827 {
828 char *name;
829
830 if (instance->sock_path) {
831 if (asprintf(&name, "%s", instance->sock_path) < 0) {
832 ERR("ustcomm_init_ustconsumer : asprintf failed (sock_path %s)",
833 instance->sock_path);
834 return -1;
835 }
836 } else {
837 int result;
838
839 /* Only check if socket dir exists if we are using the default directory */
840 result = ensure_dir_exists(SOCK_DIR);
841 if (result == -1) {
842 ERR("Unable to create socket directory %s", SOCK_DIR);
843 return -1;
844 }
845
846 if (asprintf(&name, "%s/%s", SOCK_DIR, "ustconsumer") < 0) {
847 ERR("ustcomm_init_ustconsumer : asprintf failed (%s/ustconsumer)",
848 SOCK_DIR);
849 return -1;
850 }
851 }
852
853 /* Set up epoll */
854 instance->epoll_fd = epoll_create(MAX_EVENTS);
855 if (instance->epoll_fd == -1) {
856 ERR("epoll_create failed, start instance bailing");
857 goto free_name;
858 }
859
860 /* Create the named socket */
861 instance->listen_sock = ustcomm_init_named_socket(name,
862 instance->epoll_fd);
863 if(!instance->listen_sock) {
864 ERR("error initializing named socket at %s", name);
865 goto close_epoll;
866 }
867
868 CDS_INIT_LIST_HEAD(&instance->connections);
869
870 free(name);
871
872 return 0;
873
874 close_epoll:
875 close(instance->epoll_fd);
876 free_name:
877 free(name);
878
879 return -1;
880 }
881
882 int ustconsumer_init_instance(struct ustconsumer_instance *instance)
883 {
884 int result;
885 result = init_ustconsumer_socket(instance);
886 if(result == -1) {
887 ERR("failed to initialize socket");
888 return 1;
889 }
890 instance->is_init = 1;
891 return 0;
892 }
893
This page took 0.050318 seconds and 4 git commands to generate.