5b45b6c812b94263d92ce81b8f4a8a06a81b70f3
1 /* Copyright (C) 2009 Pierre-Marc Fournier
3 * This library is free software; you can redistribute it and/or
4 * modify it under the terms of the GNU Lesser General Public
5 * License as published by the Free Software Foundation; either
6 * version 2.1 of the License, or (at your option) any later version.
8 * This library is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * Lesser General Public License for more details.
13 * You should have received a copy of the GNU Lesser General Public
14 * License along with this library; if not, write to the Free Software
15 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 #include <sys/types.h>
37 /* return value: 0 = subbuffer is finished, it won't produce data anymore
38 * 1 = got subbuffer successfully
42 #define GET_SUBBUF_OK 1
43 #define GET_SUBBUF_DONE 0
44 #define GET_SUBBUF_DIED 2
46 #define PUT_SUBBUF_OK 1
47 #define PUT_SUBBUF_DIED 0
48 #define PUT_SUBBUF_PUSHED 2
50 int test_sigpipe(void)
55 result
= sigemptyset(&sigset
);
57 perror("sigemptyset");
60 result
= sigaddset(&sigset
, SIGPIPE
);
66 result
= sigtimedwait(&sigset
, NULL
, &(struct timespec
){0,0});
67 if(result
== -1 && errno
== EAGAIN
) {
68 /* no signal received */
71 else if(result
== -1) {
72 perror("sigtimedwait");
75 else if(result
== SIGPIPE
) {
76 /* received sigpipe */
84 int get_subbuffer(struct buffer_info
*buf
)
92 asprintf(&send_msg
, "get_subbuffer %s", buf
->name
);
93 result
= ustcomm_send_request(&buf
->conn
, send_msg
, &received_msg
);
96 WARN("process %d destroyed before we could connect to it", buf
->pid
);
97 return GET_SUBBUF_DONE
;
100 ERR("get_subbuffer: ustcomm_send_request failed");
103 else if(result
== 0) {
104 DBG("app died while being traced");
105 return GET_SUBBUF_DIED
;
108 result
= sscanf(received_msg
, "%as %ld", &rep_code
, &buf
->consumed_old
);
109 if(result
!= 2 && result
!= 1) {
110 ERR("unable to parse response to get_subbuffer");
114 DBG("received msg is %s", received_msg
);
116 if(!strcmp(rep_code
, "OK")) {
117 DBG("got subbuffer %s", buf
->name
);
118 retval
= GET_SUBBUF_OK
;
120 else if(nth_token_is(received_msg
, "END", 0) == 1) {
121 return GET_SUBBUF_DONE
;
124 DBG("error getting subbuffer %s", buf
->name
);
128 /* FIMXE: free correctly the stuff */
134 int put_subbuffer(struct buffer_info
*buf
)
142 asprintf(&send_msg
, "put_subbuffer %s %ld", buf
->name
, buf
->consumed_old
);
143 result
= ustcomm_send_request(&buf
->conn
, send_msg
, &received_msg
);
145 ERR("put_subbuffer: send_message failed");
150 result
= sscanf(received_msg
, "%as", &rep_code
);
152 ERR("unable to parse response to put_subbuffer");
157 if(!strcmp(rep_code
, "OK")) {
158 DBG("subbuffer put %s", buf
->name
);
159 retval
= PUT_SUBBUF_OK
;
162 DBG("put_subbuffer: received error, we were pushed");
163 return PUT_SUBBUF_PUSHED
;
170 /* This write is patient because it restarts if it was incomplete.
173 ssize_t
patient_write(int fd
, const void *buf
, size_t count
)
175 const char *bufc
= (const char *) buf
;
179 result
= write(fd
, bufc
, count
);
191 return bufc
-(const char *)buf
;
194 void *consumer_thread(void *arg
)
196 struct buffer_info
*buf
= (struct buffer_info
*) arg
;
200 /* get the subbuffer */
201 result
= get_subbuffer(buf
);
203 ERR("error getting subbuffer");
206 else if(result
== GET_SUBBUF_DONE
) {
210 else if(result
== GET_SUBBUF_DIED
) {
211 finish_consuming_dead_subbuffer(buf
);
215 /* write data to file */
216 result
= patient_write(buf
->file_fd
, buf
->mem
+ (buf
->consumed_old
& (buf
->n_subbufs
* buf
->subbuf_size
-1)), buf
->subbuf_size
);
219 /* FIXME: maybe drop this trace */
222 /* put the subbuffer */
223 result
= put_subbuffer(buf
);
225 ERR("unknown error putting subbuffer (channel=%s)", buf
->name
);
228 else if(result
== PUT_SUBBUF_PUSHED
) {
229 ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf
->name
);
232 else if(result
== PUT_SUBBUF_DIED
) {
233 WARN("application died while putting subbuffer");
234 /* FIXME: probably need to skip the first subbuffer in finish_consuming_dead_subbuffer */
235 finish_consuming_dead_subbuffer(buf
);
237 else if(result
== PUT_SUBBUF_OK
) {
241 DBG("thread for buffer %s is stopping", buf
->name
);
243 /* FIXME: destroy, unalloc... */
248 int create_dir_if_needed(char *dir
)
251 result
= mkdir(dir
, 0777);
253 if(errno
!= EEXIST
) {
262 int add_buffer(pid_t pid
, char *bufname
)
264 struct buffer_info
*buf
;
271 struct shmid_ds shmds
;
273 buf
= (struct buffer_info
*) malloc(sizeof(struct buffer_info
));
275 ERR("add_buffer: insufficient memory");
283 result
= ustcomm_connect_app(buf
->pid
, &buf
->conn
);
285 WARN("unable to connect to process, it probably died before we were able to connect");
290 asprintf(&send_msg
, "get_shmid %s", buf
->name
);
291 result
= ustcomm_send_request(&buf
->conn
, send_msg
, &received_msg
);
294 ERR("problem in ustcomm_send_request(get_shmid)");
298 result
= sscanf(received_msg
, "%d %d", &buf
->shmid
, &buf
->bufstruct_shmid
);
300 ERR("unable to parse response to get_shmid");
304 DBG("got shmids %d %d", buf
->shmid
, buf
->bufstruct_shmid
);
307 asprintf(&send_msg
, "get_n_subbufs %s", buf
->name
);
308 result
= ustcomm_send_request(&buf
->conn
, send_msg
, &received_msg
);
311 ERR("problem in ustcomm_send_request(g_n_subbufs)");
315 result
= sscanf(received_msg
, "%d", &buf
->n_subbufs
);
317 ERR("unable to parse response to get_n_subbufs");
321 DBG("got n_subbufs %d", buf
->n_subbufs
);
323 /* get subbuf size */
324 asprintf(&send_msg
, "get_subbuf_size %s", buf
->name
);
325 ustcomm_send_request(&buf
->conn
, send_msg
, &received_msg
);
328 result
= sscanf(received_msg
, "%d", &buf
->subbuf_size
);
330 ERR("unable to parse response to get_subbuf_size");
334 DBG("got subbuf_size %d", buf
->subbuf_size
);
337 buf
->mem
= shmat(buf
->shmid
, NULL
, 0);
338 if(buf
->mem
== (void *) 0) {
342 DBG("successfully attached buffer memory");
344 buf
->bufstruct_mem
= shmat(buf
->bufstruct_shmid
, NULL
, 0);
345 if(buf
->bufstruct_mem
== (void *) 0) {
349 DBG("successfully attached buffer bufstruct memory");
351 /* obtain info on the memory segment */
352 result
= shmctl(buf
->shmid
, IPC_STAT
, &shmds
);
357 buf
->memlen
= shmds
.shm_segsz
;
359 /* open file for output */
360 result
= create_dir_if_needed(USTD_DEFAULT_TRACE_PATH
);
362 ERR("could not create directory %s", USTD_DEFAULT_TRACE_PATH
);
366 asprintf(&tmp
, "%s/%u", USTD_DEFAULT_TRACE_PATH
, buf
->pid
);
367 result
= create_dir_if_needed(tmp
);
369 ERR("could not create directory %s", tmp
);
375 asprintf(&tmp
, "%s/%u/%s_0", USTD_DEFAULT_TRACE_PATH
, buf
->pid
, buf
->name
);
376 result
= fd
= open(tmp
, O_WRONLY
| O_CREAT
| O_TRUNC
, 00600);
379 ERR("failed opening trace file %s", tmp
);
385 pthread_create(&thr
, NULL
, consumer_thread
, buf
);
390 int main(int argc
, char **argv
)
392 struct ustcomm_ustd ustd
;
396 result
= ustcomm_init_ustd(&ustd
);
398 ERR("failed to initialize socket");
402 result
= sigemptyset(&sigset
);
404 perror("sigemptyset");
407 result
= sigaddset(&sigset
, SIGPIPE
);
412 result
= sigprocmask(SIG_BLOCK
, &sigset
, NULL
);
414 perror("sigprocmask");
422 /* check for requests on our public socket */
423 result
= ustcomm_ustd_recv_message(&ustd
, &recvbuf
, NULL
, 100);
425 ERR("error in ustcomm_ustd_recv_message");
429 if(!strncmp(recvbuf
, "collect", 7)) {
434 result
= sscanf(recvbuf
, "%*s %d %50as", &pid
, &bufname
);
436 fprintf(stderr
, "parsing error: %s\n", recvbuf
);
439 result
= add_buffer(pid
, bufname
);
441 ERR("error in add_buffer");
This page took 0.045131 seconds and 3 git commands to generate.