Commit | Line | Data |
---|---|---|
d159ac37 AH |
1 | /* Copyright (C) 2009 Pierre-Marc Fournier |
2 | * 2010 Alexis Halle | |
3 | * | |
4 | * This library is free software; you can redistribute it and/or | |
5 | * modify it under the terms of the GNU Lesser General Public | |
6 | * License as published by the Free Software Foundation; either | |
7 | * version 2.1 of the License, or (at your option) any later version. | |
8 | * | |
9 | * This library is distributed in the hope that it will be useful, | |
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
12 | * Lesser General Public License for more details. | |
13 | * | |
14 | * You should have received a copy of the GNU Lesser General Public | |
15 | * License along with this library; if not, write to the Free Software | |
16 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |
17 | */ | |
18 | ||
19 | #define _GNU_SOURCE | |
20 | ||
4723ca09 | 21 | #include <sys/epoll.h> |
d159ac37 | 22 | #include <sys/shm.h> |
4723ca09 NC |
23 | #include <sys/types.h> |
24 | #include <sys/stat.h> | |
d159ac37 AH |
25 | #include <unistd.h> |
26 | #include <pthread.h> | |
27 | #include <signal.h> | |
28 | ||
29 | #include <stdlib.h> | |
30 | #include <stdio.h> | |
31 | #include <string.h> | |
32 | #include <errno.h> | |
33 | #include <assert.h> | |
34 | ||
d6c9f207 AH |
35 | #include <ust/ustd.h> |
36 | #include "lowlevel.h" | |
d159ac37 AH |
37 | #include "usterr.h" |
38 | #include "ustcomm.h" | |
39 | ||
d159ac37 AH |
40 | #define GET_SUBBUF_OK 1 |
41 | #define GET_SUBBUF_DONE 0 | |
42 | #define GET_SUBBUF_DIED 2 | |
43 | ||
44 | #define PUT_SUBBUF_OK 1 | |
45 | #define PUT_SUBBUF_DIED 0 | |
46 | #define PUT_SUBBUF_PUSHED 2 | |
47 | #define PUT_SUBBUF_DONE 3 | |
48 | ||
49 | #define UNIX_PATH_MAX 108 | |
50 | ||
72098143 | 51 | static int get_subbuffer(struct buffer_info *buf) |
d159ac37 | 52 | { |
72098143 NC |
53 | struct ustcomm_header _send_hdr, *send_hdr; |
54 | struct ustcomm_header _recv_hdr, *recv_hdr; | |
55 | struct ustcomm_buffer_info _send_msg, _recv_msg; | |
56 | struct ustcomm_buffer_info *send_msg, *recv_msg; | |
d159ac37 AH |
57 | int result; |
58 | ||
72098143 NC |
59 | send_hdr = &_send_hdr; |
60 | recv_hdr = &_recv_hdr; | |
61 | send_msg = &_send_msg; | |
62 | recv_msg = &_recv_msg; | |
4723ca09 | 63 | |
72098143 NC |
64 | result = ustcomm_pack_buffer_info(send_hdr, send_msg, |
65 | buf->channel, buf->channel_cpu); | |
66 | if (result < 0) { | |
67 | return result; | |
d159ac37 AH |
68 | } |
69 | ||
72098143 NC |
70 | send_hdr->command = GET_SUBBUFFER; |
71 | ||
72 | result = ustcomm_req(buf->app_sock, send_hdr, (char *)send_msg, | |
73 | recv_hdr, (char *)recv_msg); | |
74 | if ((result < 0 && (errno == ECONNRESET || errno == EPIPE)) || | |
75 | result == 0) { | |
76 | DBG("app died while being traced"); | |
77 | return GET_SUBBUF_DIED; | |
78 | } else if (result < 0) { | |
79 | ERR("get_subbuffer: ustcomm_req failed"); | |
80 | return result; | |
d159ac37 AH |
81 | } |
82 | ||
72098143 | 83 | if (!recv_hdr->result) { |
d159ac37 | 84 | DBG("got subbuffer %s", buf->name); |
72098143 NC |
85 | buf->consumed_old = recv_msg->consumed_old; |
86 | return GET_SUBBUF_OK; | |
87 | } else if (recv_hdr->result == -ENODATA) { | |
88 | DBG("For buffer %s, the trace was not found. This likely means" | |
89 | " it was destroyed by the user.", buf->name); | |
90 | return GET_SUBBUF_DIED; | |
d159ac37 AH |
91 | } |
92 | ||
72098143 NC |
93 | DBG("error getting subbuffer %s", buf->name); |
94 | return recv_hdr->result; | |
d159ac37 AH |
95 | } |
96 | ||
72098143 | 97 | static int put_subbuffer(struct buffer_info *buf) |
d159ac37 | 98 | { |
72098143 NC |
99 | struct ustcomm_header _send_hdr, *send_hdr; |
100 | struct ustcomm_header _recv_hdr, *recv_hdr; | |
101 | struct ustcomm_buffer_info _send_msg, *send_msg; | |
d159ac37 AH |
102 | int result; |
103 | ||
72098143 NC |
104 | send_hdr = &_send_hdr; |
105 | recv_hdr = &_recv_hdr; | |
106 | send_msg = &_send_msg; | |
d159ac37 | 107 | |
72098143 NC |
108 | result = ustcomm_pack_buffer_info(send_hdr, send_msg, |
109 | buf->channel, buf->channel_cpu); | |
110 | if (result < 0) { | |
111 | return result; | |
d159ac37 AH |
112 | } |
113 | ||
72098143 NC |
114 | send_hdr->command = PUT_SUBBUFFER; |
115 | send_msg->consumed_old = buf->consumed_old; | |
d159ac37 | 116 | |
72098143 NC |
117 | result = ustcomm_req(buf->app_sock, send_hdr, (char *)send_msg, |
118 | recv_hdr, NULL); | |
119 | if ((result < 0 && (errno == ECONNRESET || errno == EPIPE)) || | |
120 | result == 0) { | |
121 | DBG("app died while being traced"); | |
122 | return PUT_SUBBUF_DIED; | |
123 | } else if (result < 0) { | |
124 | ERR("put_subbuffer: ustcomm_req failed"); | |
125 | return result; | |
126 | } | |
d159ac37 | 127 | |
72098143 NC |
128 | if (!recv_hdr->result) { |
129 | DBG("put subbuffer %s", buf->name); | |
130 | return PUT_SUBBUF_OK; | |
131 | } else if (recv_hdr->result == -ENODATA) { | |
132 | DBG("For buffer %s, the trace was not found. This likely means" | |
133 | " it was destroyed by the user.", buf->name); | |
134 | return PUT_SUBBUF_DIED; | |
135 | } | |
d159ac37 | 136 | |
72098143 NC |
137 | DBG("error getting subbuffer %s", buf->name); |
138 | return recv_hdr->result; | |
d159ac37 AH |
139 | } |
140 | ||
141 | void decrement_active_buffers(void *arg) | |
142 | { | |
143 | struct libustd_instance *instance = arg; | |
144 | pthread_mutex_lock(&instance->mutex); | |
145 | instance->active_buffers--; | |
146 | pthread_mutex_unlock(&instance->mutex); | |
147 | } | |
148 | ||
72098143 | 149 | static int get_pidunique(int sock, s64 *pidunique) |
d159ac37 | 150 | { |
72098143 NC |
151 | struct ustcomm_header _send_hdr, *send_hdr; |
152 | struct ustcomm_header _recv_hdr, *recv_hdr; | |
153 | struct ustcomm_pidunique _recv_msg, *recv_msg; | |
d159ac37 | 154 | int result; |
d159ac37 | 155 | |
72098143 NC |
156 | send_hdr = &_send_hdr; |
157 | recv_hdr = &_recv_hdr; | |
158 | recv_msg = &_recv_msg; | |
159 | ||
160 | memset(send_hdr, 0, sizeof(*send_hdr)); | |
161 | ||
162 | send_hdr->command = GET_PIDUNIQUE; | |
163 | result = ustcomm_req(sock, send_hdr, NULL, recv_hdr, (char *)recv_msg); | |
164 | if (result < 1) { | |
165 | return -ENOTCONN; | |
166 | } | |
167 | if (recv_hdr->result < 0) { | |
168 | ERR("App responded with error: %s", strerror(recv_hdr->result)); | |
169 | return recv_hdr->result; | |
d159ac37 AH |
170 | } |
171 | ||
72098143 | 172 | *pidunique = recv_msg->pidunique; |
d159ac37 | 173 | |
72098143 NC |
174 | return 0; |
175 | } | |
176 | ||
177 | static int get_buf_shmid_pipe_fd(int sock, struct buffer_info *buf, | |
178 | int *buf_shmid, int *buf_struct_shmid, | |
179 | int *buf_pipe_fd) | |
180 | { | |
181 | struct ustcomm_header _send_hdr, *send_hdr; | |
182 | struct ustcomm_header _recv_hdr, *recv_hdr; | |
183 | struct ustcomm_buffer_info _send_msg, *send_msg; | |
184 | struct ustcomm_buffer_info _recv_msg, *recv_msg; | |
185 | int result, recv_pipe_fd; | |
186 | ||
187 | send_hdr = &_send_hdr; | |
188 | recv_hdr = &_recv_hdr; | |
189 | send_msg = &_send_msg; | |
190 | recv_msg = &_recv_msg; | |
191 | ||
192 | result = ustcomm_pack_buffer_info(send_hdr, send_msg, | |
193 | buf->channel, buf->channel_cpu); | |
194 | if (result < 0) { | |
195 | ERR("Failed to pack buffer info"); | |
196 | return result; | |
d159ac37 AH |
197 | } |
198 | ||
72098143 NC |
199 | send_hdr->command = GET_BUF_SHMID_PIPE_FD; |
200 | ||
201 | result = ustcomm_send(sock, send_hdr, (char *)send_msg); | |
202 | if (result < 1) { | |
203 | ERR("Failed to send request"); | |
204 | return -ENOTCONN; | |
08b8805e | 205 | } |
72098143 NC |
206 | result = ustcomm_recv_fd(sock, recv_hdr, (char *)recv_msg, &recv_pipe_fd); |
207 | if (result < 1) { | |
208 | ERR("Failed to receive message and fd"); | |
209 | return -ENOTCONN; | |
d159ac37 | 210 | } |
72098143 NC |
211 | if (recv_hdr->result < 0) { |
212 | ERR("App responded with error %s", strerror(recv_hdr->result)); | |
213 | return recv_hdr->result; | |
d159ac37 AH |
214 | } |
215 | ||
72098143 NC |
216 | *buf_shmid = recv_msg->buf_shmid; |
217 | *buf_struct_shmid = recv_msg->buf_struct_shmid; | |
218 | *buf_pipe_fd = recv_pipe_fd; | |
d159ac37 | 219 | |
72098143 NC |
220 | return 0; |
221 | } | |
222 | ||
223 | static int get_subbuf_num_size(int sock, struct buffer_info *buf, | |
224 | int *subbuf_num, int *subbuf_size) | |
225 | { | |
226 | struct ustcomm_header _send_hdr, *send_hdr; | |
227 | struct ustcomm_header _recv_hdr, *recv_hdr; | |
228 | struct ustcomm_channel_info _send_msg, *send_msg; | |
229 | struct ustcomm_channel_info _recv_msg, *recv_msg; | |
230 | int result; | |
231 | ||
232 | send_hdr = &_send_hdr; | |
233 | recv_hdr = &_recv_hdr; | |
234 | send_msg = &_send_msg; | |
235 | recv_msg = &_recv_msg; | |
236 | ||
237 | result = ustcomm_pack_channel_info(send_hdr, send_msg, | |
238 | buf->channel); | |
239 | if (result < 0) { | |
240 | return result; | |
d159ac37 | 241 | } |
72098143 NC |
242 | |
243 | send_hdr->command = GET_SUBBUF_NUM_SIZE; | |
244 | ||
245 | result = ustcomm_req(sock, send_hdr, (char *)send_msg, | |
246 | recv_hdr, (char *)recv_msg); | |
247 | if (result < 1) { | |
248 | return -ENOTCONN; | |
d159ac37 AH |
249 | } |
250 | ||
72098143 NC |
251 | *subbuf_num = recv_msg->subbuf_num; |
252 | *subbuf_size = recv_msg->subbuf_size; | |
253 | ||
254 | return recv_hdr->result; | |
255 | } | |
256 | ||
257 | ||
258 | static int notify_buffer_mapped(int sock, struct buffer_info *buf) | |
259 | { | |
260 | struct ustcomm_header _send_hdr, *send_hdr; | |
261 | struct ustcomm_header _recv_hdr, *recv_hdr; | |
262 | struct ustcomm_buffer_info _send_msg, *send_msg; | |
263 | int result; | |
264 | ||
265 | send_hdr = &_send_hdr; | |
266 | recv_hdr = &_recv_hdr; | |
267 | send_msg = &_send_msg; | |
268 | ||
269 | result = ustcomm_pack_buffer_info(send_hdr, send_msg, | |
270 | buf->channel, buf->channel_cpu); | |
271 | if (result < 0) { | |
272 | return result; | |
d159ac37 | 273 | } |
d159ac37 | 274 | |
72098143 NC |
275 | send_hdr->command = NOTIFY_BUF_MAPPED; |
276 | ||
277 | result = ustcomm_req(sock, send_hdr, (char *)send_msg, | |
278 | recv_hdr, NULL); | |
279 | if (result < 1) { | |
280 | return -ENOTCONN; | |
08b8805e | 281 | } |
72098143 NC |
282 | |
283 | return recv_hdr->result; | |
284 | } | |
285 | ||
286 | ||
287 | struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, | |
288 | const char *channel, int channel_cpu) | |
289 | { | |
290 | struct buffer_info *buf; | |
291 | int result; | |
292 | struct shmid_ds shmds; | |
293 | ||
294 | buf = (struct buffer_info *) zmalloc(sizeof(struct buffer_info)); | |
295 | if(buf == NULL) { | |
296 | ERR("add_buffer: insufficient memory"); | |
d159ac37 AH |
297 | return NULL; |
298 | } | |
d159ac37 | 299 | |
72098143 NC |
300 | buf->channel = strdup(channel); |
301 | if (!buf->channel) { | |
302 | goto free_buf; | |
d159ac37 | 303 | } |
d159ac37 | 304 | |
72098143 NC |
305 | result = asprintf(&buf->name, "%s_%d", channel, channel_cpu); |
306 | if (result < 0 || buf->name == NULL) { | |
307 | goto free_buf_channel; | |
08b8805e | 308 | } |
72098143 NC |
309 | |
310 | buf->channel_cpu = channel_cpu; | |
311 | buf->pid = pid; | |
312 | ||
313 | result = ustcomm_connect_app(buf->pid, &buf->app_sock); | |
314 | if(result) { | |
315 | WARN("unable to connect to process, it probably died before we were able to connect"); | |
316 | goto free_buf_name; | |
d159ac37 | 317 | } |
72098143 NC |
318 | |
319 | /* get pidunique */ | |
320 | result = get_pidunique(buf->app_sock, &buf->pidunique); | |
321 | if (result < 0) { | |
322 | ERR("Failed to get pidunique"); | |
323 | goto close_app_sock; | |
324 | } | |
325 | ||
326 | /* get shmid and pipe fd */ | |
327 | result = get_buf_shmid_pipe_fd(buf->app_sock, buf, &buf->shmid, | |
328 | &buf->bufstruct_shmid, &buf->pipe_fd); | |
329 | if (result < 0) { | |
330 | ERR("Failed to get buf_shmid and pipe_fd"); | |
331 | goto close_app_sock; | |
332 | } else { | |
333 | struct stat temp; | |
334 | fstat(buf->pipe_fd, &temp); | |
335 | if (!S_ISFIFO(temp.st_mode)) { | |
336 | ERR("Didn't receive a fifo from the app"); | |
337 | goto close_app_sock; | |
338 | } | |
d159ac37 AH |
339 | } |
340 | ||
72098143 NC |
341 | |
342 | /* get number of subbufs and subbuf size */ | |
343 | result = get_subbuf_num_size(buf->app_sock, buf, &buf->n_subbufs, | |
344 | &buf->subbuf_size); | |
345 | if (result < 0) { | |
346 | ERR("Failed to get subbuf number and size"); | |
347 | goto close_fifo; | |
d159ac37 | 348 | } |
d159ac37 AH |
349 | |
350 | /* attach memory */ | |
351 | buf->mem = shmat(buf->shmid, NULL, 0); | |
352 | if(buf->mem == (void *) 0) { | |
353 | PERROR("shmat"); | |
72098143 | 354 | goto close_fifo; |
d159ac37 AH |
355 | } |
356 | DBG("successfully attached buffer memory"); | |
357 | ||
358 | buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0); | |
359 | if(buf->bufstruct_mem == (void *) 0) { | |
360 | PERROR("shmat"); | |
72098143 | 361 | goto shmdt_mem; |
d159ac37 AH |
362 | } |
363 | DBG("successfully attached buffer bufstruct memory"); | |
364 | ||
365 | /* obtain info on the memory segment */ | |
366 | result = shmctl(buf->shmid, IPC_STAT, &shmds); | |
367 | if(result == -1) { | |
368 | PERROR("shmctl"); | |
72098143 | 369 | goto shmdt_bufstruct_mem; |
d159ac37 AH |
370 | } |
371 | buf->memlen = shmds.shm_segsz; | |
372 | ||
72098143 NC |
373 | /* Notify the application that we have mapped the buffer */ |
374 | result = notify_buffer_mapped(buf->app_sock, buf); | |
375 | if (result < 0) { | |
376 | goto shmdt_bufstruct_mem; | |
4723ca09 | 377 | } |
72098143 | 378 | |
d159ac37 AH |
379 | if(instance->callbacks->on_open_buffer) |
380 | instance->callbacks->on_open_buffer(instance->callbacks, buf); | |
381 | ||
382 | pthread_mutex_lock(&instance->mutex); | |
383 | instance->active_buffers++; | |
384 | pthread_mutex_unlock(&instance->mutex); | |
385 | ||
386 | return buf; | |
387 | ||
72098143 NC |
388 | shmdt_bufstruct_mem: |
389 | shmdt(buf->bufstruct_mem); | |
390 | ||
391 | shmdt_mem: | |
392 | shmdt(buf->mem); | |
393 | ||
394 | close_fifo: | |
395 | close(buf->pipe_fd); | |
396 | ||
397 | close_app_sock: | |
398 | close(buf->app_sock); | |
399 | ||
400 | free_buf_name: | |
401 | free(buf->name); | |
402 | ||
403 | free_buf_channel: | |
404 | free(buf->channel); | |
405 | ||
406 | free_buf: | |
d159ac37 AH |
407 | free(buf); |
408 | return NULL; | |
409 | } | |
410 | ||
411 | static void destroy_buffer(struct libustd_callbacks *callbacks, | |
412 | struct buffer_info *buf) | |
413 | { | |
414 | int result; | |
415 | ||
4723ca09 | 416 | result = close(buf->app_sock); |
d159ac37 AH |
417 | if(result == -1) { |
418 | WARN("problem calling ustcomm_close_app"); | |
419 | } | |
420 | ||
421 | result = shmdt(buf->mem); | |
422 | if(result == -1) { | |
423 | PERROR("shmdt"); | |
424 | } | |
425 | ||
426 | result = shmdt(buf->bufstruct_mem); | |
427 | if(result == -1) { | |
428 | PERROR("shmdt"); | |
429 | } | |
430 | ||
431 | if(callbacks->on_close_buffer) | |
432 | callbacks->on_close_buffer(callbacks, buf); | |
433 | ||
434 | free(buf); | |
435 | } | |
436 | ||
437 | int consumer_loop(struct libustd_instance *instance, struct buffer_info *buf) | |
438 | { | |
4723ca09 NC |
439 | int result, read_result; |
440 | char read_buf; | |
d159ac37 AH |
441 | |
442 | pthread_cleanup_push(decrement_active_buffers, instance); | |
443 | ||
444 | for(;;) { | |
4723ca09 | 445 | read_result = read(buf->pipe_fd, &read_buf, 1); |
d159ac37 | 446 | /* get the subbuffer */ |
4723ca09 NC |
447 | if (read_result == 1) { |
448 | result = get_subbuffer(buf); | |
72098143 | 449 | if (result < 0) { |
4723ca09 NC |
450 | ERR("error getting subbuffer"); |
451 | continue; | |
452 | } else if (result == GET_SUBBUF_DIED) { | |
453 | finish_consuming_dead_subbuffer(instance->callbacks, buf); | |
454 | break; | |
455 | } | |
456 | } else if ((read_result == -1 && (errno == ECONNRESET || errno == EPIPE)) || | |
457 | result == 0) { | |
458 | DBG("App died while being traced"); | |
d159ac37 AH |
459 | finish_consuming_dead_subbuffer(instance->callbacks, buf); |
460 | break; | |
461 | } | |
462 | ||
463 | if(instance->callbacks->on_read_subbuffer) | |
464 | instance->callbacks->on_read_subbuffer(instance->callbacks, buf); | |
465 | ||
466 | /* put the subbuffer */ | |
467 | result = put_subbuffer(buf); | |
468 | if(result == -1) { | |
469 | ERR("unknown error putting subbuffer (channel=%s)", buf->name); | |
470 | break; | |
471 | } | |
472 | else if(result == PUT_SUBBUF_PUSHED) { | |
473 | ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf->name); | |
474 | break; | |
475 | } | |
476 | else if(result == PUT_SUBBUF_DIED) { | |
477 | DBG("application died while putting subbuffer"); | |
478 | /* Skip the first subbuffer. We are not sure it is trustable | |
479 | * because the put_subbuffer() did not complete. | |
480 | */ | |
481 | if(instance->callbacks->on_put_error) | |
482 | instance->callbacks->on_put_error(instance->callbacks, buf); | |
483 | ||
484 | finish_consuming_dead_subbuffer(instance->callbacks, buf); | |
485 | break; | |
486 | } | |
487 | else if(result == PUT_SUBBUF_DONE) { | |
488 | /* Done with this subbuffer */ | |
489 | /* FIXME: add a case where this branch is used? Upon | |
490 | * normal trace termination, at put_subbuf time, a | |
491 | * special last-subbuffer code could be returned by | |
492 | * the listener. | |
493 | */ | |
494 | break; | |
495 | } | |
496 | else if(result == PUT_SUBBUF_OK) { | |
497 | } | |
498 | } | |
499 | ||
500 | DBG("thread for buffer %s is stopping", buf->name); | |
501 | ||
502 | /* FIXME: destroy, unalloc... */ | |
503 | ||
504 | pthread_cleanup_pop(1); | |
505 | ||
506 | return 0; | |
507 | } | |
508 | ||
509 | struct consumer_thread_args { | |
510 | pid_t pid; | |
72098143 NC |
511 | const char *channel; |
512 | int channel_cpu; | |
d159ac37 AH |
513 | struct libustd_instance *instance; |
514 | }; | |
515 | ||
516 | void *consumer_thread(void *arg) | |
517 | { | |
518 | struct buffer_info *buf; | |
519 | struct consumer_thread_args *args = (struct consumer_thread_args *) arg; | |
520 | int result; | |
521 | sigset_t sigset; | |
522 | ||
d159ac37 AH |
523 | if(args->instance->callbacks->on_new_thread) |
524 | args->instance->callbacks->on_new_thread(args->instance->callbacks); | |
525 | ||
526 | /* Block signals that should be handled by the main thread. */ | |
527 | result = sigemptyset(&sigset); | |
528 | if(result == -1) { | |
529 | PERROR("sigemptyset"); | |
530 | goto end; | |
531 | } | |
532 | result = sigaddset(&sigset, SIGTERM); | |
533 | if(result == -1) { | |
534 | PERROR("sigaddset"); | |
535 | goto end; | |
536 | } | |
537 | result = sigaddset(&sigset, SIGINT); | |
538 | if(result == -1) { | |
539 | PERROR("sigaddset"); | |
540 | goto end; | |
541 | } | |
542 | result = sigprocmask(SIG_BLOCK, &sigset, NULL); | |
543 | if(result == -1) { | |
544 | PERROR("sigprocmask"); | |
545 | goto end; | |
546 | } | |
547 | ||
72098143 NC |
548 | buf = connect_buffer(args->instance, args->pid, |
549 | args->channel, args->channel_cpu); | |
d159ac37 AH |
550 | if(buf == NULL) { |
551 | ERR("failed to connect to buffer"); | |
552 | goto end; | |
553 | } | |
554 | ||
555 | consumer_loop(args->instance, buf); | |
556 | ||
557 | destroy_buffer(args->instance->callbacks, buf); | |
558 | ||
559 | end: | |
560 | ||
561 | if(args->instance->callbacks->on_close_thread) | |
562 | args->instance->callbacks->on_close_thread(args->instance->callbacks); | |
563 | ||
72098143 | 564 | free((void *)args->channel); |
d159ac37 AH |
565 | free(args); |
566 | return NULL; | |
567 | } | |
568 | ||
72098143 NC |
569 | int start_consuming_buffer(struct libustd_instance *instance, pid_t pid, |
570 | const char *channel, int channel_cpu) | |
d159ac37 AH |
571 | { |
572 | pthread_t thr; | |
573 | struct consumer_thread_args *args; | |
574 | int result; | |
575 | ||
72098143 NC |
576 | DBG("beginning of start_consuming_buffer: args: pid %d bufname %s_%d", pid, channel, |
577 | channel_cpu); | |
d159ac37 | 578 | |
7032c7d3 | 579 | args = (struct consumer_thread_args *) zmalloc(sizeof(struct consumer_thread_args)); |
72098143 NC |
580 | if (!args) { |
581 | return -ENOMEM; | |
582 | } | |
d159ac37 AH |
583 | |
584 | args->pid = pid; | |
72098143 NC |
585 | args->channel = strdup(channel); |
586 | args->channel_cpu = channel_cpu; | |
d159ac37 | 587 | args->instance = instance; |
72098143 NC |
588 | DBG("beginning2 of start_consuming_buffer: args: pid %d bufname %s_%d", |
589 | args->pid, args->channel, args->channel_cpu); | |
d159ac37 AH |
590 | |
591 | result = pthread_create(&thr, NULL, consumer_thread, args); | |
592 | if(result == -1) { | |
593 | ERR("pthread_create failed"); | |
594 | return -1; | |
595 | } | |
596 | result = pthread_detach(thr); | |
597 | if(result == -1) { | |
598 | ERR("pthread_detach failed"); | |
599 | return -1; | |
600 | } | |
72098143 NC |
601 | DBG("end of start_consuming_buffer: args: pid %d bufname %s_%d", |
602 | args->pid, args->channel, args->channel_cpu); | |
d159ac37 AH |
603 | |
604 | return 0; | |
605 | } | |
72098143 NC |
606 | static void process_client_cmd(int sock, struct ustcomm_header *req_header, |
607 | char *recvbuf, struct libustd_instance *instance) | |
4723ca09 | 608 | { |
72098143 NC |
609 | int result; |
610 | struct ustcomm_header _res_header; | |
611 | struct ustcomm_header *res_header = &_res_header; | |
612 | struct ustcomm_buffer_info *buf_inf; | |
613 | ||
614 | DBG("Processing client command"); | |
615 | ||
616 | switch (req_header->command) { | |
617 | case CONSUME_BUFFER: | |
618 | ||
619 | buf_inf = (struct ustcomm_buffer_info *)recvbuf; | |
620 | result = ustcomm_unpack_buffer_info(buf_inf); | |
621 | if (result < 0) { | |
622 | ERR("Couldn't unpack buffer info"); | |
623 | return; | |
4723ca09 NC |
624 | } |
625 | ||
72098143 NC |
626 | DBG("Going to consume buffer %s_%d in process %d", |
627 | buf_inf->channel, buf_inf->ch_cpu, buf_inf->pid); | |
628 | result = start_consuming_buffer(instance, buf_inf->pid, | |
629 | buf_inf->channel, | |
630 | buf_inf->ch_cpu); | |
4723ca09 NC |
631 | if (result < 0) { |
632 | ERR("error in add_buffer"); | |
72098143 | 633 | return; |
4723ca09 NC |
634 | } |
635 | ||
72098143 NC |
636 | res_header->result = 0; |
637 | break; | |
638 | case EXIT: | |
639 | res_header->result = 0; | |
4723ca09 | 640 | /* Only there to force poll to return */ |
72098143 NC |
641 | break; |
642 | default: | |
643 | res_header->result = -EINVAL; | |
644 | WARN("unknown command: %d", req_header->command); | |
645 | } | |
646 | ||
647 | if (ustcomm_send(sock, res_header, NULL) <= 0) { | |
648 | ERR("couldn't send command response"); | |
4723ca09 NC |
649 | } |
650 | } | |
651 | ||
652 | #define MAX_EVENTS 10 | |
d159ac37 AH |
653 | |
654 | int libustd_start_instance(struct libustd_instance *instance) | |
655 | { | |
72098143 NC |
656 | struct ustcomm_header recv_hdr; |
657 | char recv_buf[USTCOMM_BUFFER_SIZE]; | |
4723ca09 NC |
658 | struct ustcomm_sock *epoll_sock; |
659 | struct epoll_event events[MAX_EVENTS]; | |
660 | struct sockaddr addr; | |
661 | int result, epoll_fd, accept_fd, nfds, i, addr_size, timeout; | |
d159ac37 AH |
662 | |
663 | if(!instance->is_init) { | |
664 | ERR("libustd instance not initialized"); | |
665 | return 1; | |
666 | } | |
4723ca09 NC |
667 | epoll_fd = instance->epoll_fd; |
668 | ||
669 | timeout = -1; | |
d159ac37 AH |
670 | |
671 | /* app loop */ | |
672 | for(;;) { | |
4723ca09 NC |
673 | nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, timeout); |
674 | if (nfds == -1 && errno == EINTR) { | |
d159ac37 | 675 | /* Caught signal */ |
4723ca09 NC |
676 | } else if (nfds == -1) { |
677 | PERROR("libustd_start_instance: epoll_wait failed"); | |
678 | continue; | |
d159ac37 | 679 | } |
d159ac37 | 680 | |
4723ca09 NC |
681 | for (i = 0; i < nfds; ++i) { |
682 | epoll_sock = (struct ustcomm_sock *)events[i].data.ptr; | |
683 | if (epoll_sock == instance->listen_sock) { | |
684 | addr_size = sizeof(struct sockaddr); | |
685 | accept_fd = accept(epoll_sock->fd, | |
686 | &addr, | |
687 | (socklen_t *)&addr_size); | |
688 | if (accept_fd == -1) { | |
689 | PERROR("libustd_start_instance: " | |
690 | "accept failed"); | |
691 | continue; | |
692 | } | |
693 | ustcomm_init_sock(accept_fd, epoll_fd, | |
694 | &instance->connections); | |
695 | } else { | |
72098143 NC |
696 | result = ustcomm_recv(epoll_sock->fd, &recv_hdr, |
697 | recv_buf); | |
698 | if (result < 1) { | |
4723ca09 | 699 | ustcomm_del_sock(epoll_sock, 0); |
72098143 NC |
700 | } else { |
701 | process_client_cmd(epoll_sock->fd, | |
702 | &recv_hdr, recv_buf, | |
703 | instance); | |
d159ac37 AH |
704 | } |
705 | ||
d159ac37 | 706 | } |
d159ac37 AH |
707 | } |
708 | ||
4723ca09 | 709 | if (instance->quit_program) { |
d159ac37 AH |
710 | pthread_mutex_lock(&instance->mutex); |
711 | if(instance->active_buffers == 0) { | |
712 | pthread_mutex_unlock(&instance->mutex); | |
713 | break; | |
714 | } | |
715 | pthread_mutex_unlock(&instance->mutex); | |
716 | timeout = 100; | |
717 | } | |
718 | } | |
719 | ||
720 | if(instance->callbacks->on_trace_end) | |
721 | instance->callbacks->on_trace_end(instance); | |
722 | ||
723 | libustd_delete_instance(instance); | |
724 | ||
725 | return 0; | |
726 | } | |
727 | ||
4723ca09 | 728 | /* FIXME: threads and connections !? */ |
d159ac37 AH |
729 | void libustd_delete_instance(struct libustd_instance *instance) |
730 | { | |
4723ca09 NC |
731 | if (instance->is_init) { |
732 | ustcomm_del_named_sock(instance->listen_sock, 0); | |
733 | close(instance->epoll_fd); | |
734 | } | |
d159ac37 AH |
735 | |
736 | pthread_mutex_destroy(&instance->mutex); | |
737 | free(instance->sock_path); | |
738 | free(instance); | |
739 | } | |
740 | ||
72098143 NC |
741 | /* FIXME: Do something about the fixed path length, maybe get rid |
742 | * of the whole concept and use a pipe? | |
743 | */ | |
d159ac37 AH |
744 | int libustd_stop_instance(struct libustd_instance *instance, int send_msg) |
745 | { | |
746 | int result; | |
747 | int fd; | |
748 | int bytes = 0; | |
749 | ||
750 | char msg[] = "exit"; | |
751 | ||
752 | instance->quit_program = 1; | |
753 | ||
754 | if(!send_msg) | |
755 | return 0; | |
756 | ||
757 | /* Send a message through the socket to force poll to return */ | |
758 | ||
759 | struct sockaddr_un addr; | |
760 | ||
761 | result = fd = socket(PF_UNIX, SOCK_STREAM, 0); | |
762 | if(result == -1) { | |
763 | PERROR("socket"); | |
764 | return 1; | |
765 | } | |
766 | ||
767 | addr.sun_family = AF_UNIX; | |
768 | ||
769 | strncpy(addr.sun_path, instance->sock_path, UNIX_PATH_MAX); | |
770 | addr.sun_path[UNIX_PATH_MAX-1] = '\0'; | |
771 | ||
772 | result = connect(fd, (struct sockaddr *)&addr, sizeof(addr)); | |
773 | if(result == -1) { | |
774 | PERROR("connect"); | |
775 | } | |
776 | ||
777 | while(bytes != sizeof(msg)) | |
778 | bytes += send(fd, msg, sizeof(msg), 0); | |
779 | ||
780 | close(fd); | |
781 | ||
782 | return 0; | |
783 | } | |
784 | ||
4723ca09 NC |
785 | struct libustd_instance |
786 | *libustd_new_instance(struct libustd_callbacks *callbacks, | |
787 | char *sock_path) | |
d159ac37 AH |
788 | { |
789 | struct libustd_instance *instance = | |
7032c7d3 | 790 | zmalloc(sizeof(struct libustd_instance)); |
4723ca09 | 791 | if(!instance) { |
f3f8cc91 AH |
792 | return NULL; |
793 | } | |
794 | ||
d159ac37 AH |
795 | instance->callbacks = callbacks; |
796 | instance->quit_program = 0; | |
797 | instance->is_init = 0; | |
798 | instance->active_buffers = 0; | |
799 | pthread_mutex_init(&instance->mutex, NULL); | |
800 | ||
4723ca09 | 801 | if (sock_path) { |
d159ac37 | 802 | instance->sock_path = strdup(sock_path); |
4723ca09 | 803 | } else { |
d159ac37 | 804 | instance->sock_path = NULL; |
4723ca09 | 805 | } |
d159ac37 AH |
806 | |
807 | return instance; | |
808 | } | |
809 | ||
4723ca09 NC |
810 | static int init_ustd_socket(struct libustd_instance *instance) |
811 | { | |
812 | char *name; | |
813 | ||
814 | if (instance->sock_path) { | |
815 | if (asprintf(&name, "%s", instance->sock_path) < 0) { | |
816 | ERR("ustcomm_init_ustd : asprintf failed (sock_path %s)", | |
817 | instance->sock_path); | |
818 | return -1; | |
819 | } | |
820 | } else { | |
821 | int result; | |
822 | ||
823 | /* Only check if socket dir exists if we are using the default directory */ | |
824 | result = ensure_dir_exists(SOCK_DIR); | |
825 | if (result == -1) { | |
826 | ERR("Unable to create socket directory %s", SOCK_DIR); | |
827 | return -1; | |
828 | } | |
829 | ||
830 | if (asprintf(&name, "%s/%s", SOCK_DIR, "ustd") < 0) { | |
831 | ERR("ustcomm_init_ustd : asprintf failed (%s/ustd)", | |
832 | SOCK_DIR); | |
833 | return -1; | |
834 | } | |
835 | } | |
836 | ||
837 | /* Set up epoll */ | |
838 | instance->epoll_fd = epoll_create(MAX_EVENTS); | |
839 | if (instance->epoll_fd == -1) { | |
840 | ERR("epoll_create failed, start instance bailing"); | |
841 | goto free_name; | |
842 | } | |
843 | ||
844 | /* Create the named socket */ | |
845 | instance->listen_sock = ustcomm_init_named_socket(name, | |
846 | instance->epoll_fd); | |
847 | if(!instance->listen_sock) { | |
848 | ERR("error initializing named socket at %s", name); | |
849 | goto close_epoll; | |
850 | } | |
851 | ||
852 | INIT_LIST_HEAD(&instance->connections); | |
853 | ||
854 | free(name); | |
855 | ||
856 | return 0; | |
857 | ||
858 | close_epoll: | |
859 | close(instance->epoll_fd); | |
860 | free_name: | |
861 | free(name); | |
862 | ||
863 | return -1; | |
864 | } | |
865 | ||
d159ac37 AH |
866 | int libustd_init_instance(struct libustd_instance *instance) |
867 | { | |
868 | int result; | |
4723ca09 | 869 | result = init_ustd_socket(instance); |
d159ac37 AH |
870 | if(result == -1) { |
871 | ERR("failed to initialize socket"); | |
872 | return 1; | |
873 | } | |
874 | instance->is_init = 1; | |
875 | return 0; | |
876 | } | |
877 |