a4c7e1f3f0c8a6dbdd17ac2fbf349a3fe7cb6f9d
2 * Copyright (C) 2009 Pierre-Marc Fournier
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include <sys/types.h>
33 struct list_head buffers
= LIST_HEAD_INIT(buffers
);
38 struct ustcomm_connection conn
;
43 /* the buffer memory */
47 /* number of subbuffers in buffer */
49 /* size of each subbuffer */
52 /* the buffer information struct */
55 int file_fd
; /* output file */
57 struct list_head list
;
62 /* return value: 0 = subbuffer is finished, it won't produce data anymore
63 * 1 = got subbuffer successfully
67 #define GET_SUBBUF_OK 1
68 #define GET_SUBBUF_DONE 0
69 #define GET_SUBBUF_DIED 2
71 int get_subbuffer(struct buffer_info
*buf
)
79 asprintf(&send_msg
, "get_subbuffer %s", buf
->name
);
80 result
= ustcomm_send_request(&buf
->conn
, send_msg
, &received_msg
);
83 ERR("get_subbuffer: ustcomm_send_request failed");
86 else if(result
== 0) {
87 DBG("app died while being traced");
88 return GET_SUBBUF_DIED
;
91 result
= sscanf(received_msg
, "%as %ld", &rep_code
, &buf
->consumed_old
);
92 if(result
!= 2 && result
!= 1) {
93 ERR("unable to parse response to get_subbuffer");
97 DBG("received msg is %s", received_msg
);
99 if(!strcmp(rep_code
, "OK")) {
100 DBG("got subbuffer %s", buf
->name
);
101 retval
= GET_SUBBUF_OK
;
103 else if(nth_token_is(received_msg
, "END", 0) == 1) {
104 return GET_SUBBUF_DONE
;
107 DBG("error getting subbuffer %s", buf
->name
);
111 /* FIMXE: free correctly the stuff */
117 int put_subbuffer(struct buffer_info
*buf
)
125 asprintf(&send_msg
, "put_subbuffer %s %ld", buf
->name
, buf
->consumed_old
);
126 result
= ustcomm_send_request(&buf
->conn
, send_msg
, &received_msg
);
128 ERR("put_subbuffer: send_message failed");
133 result
= sscanf(received_msg
, "%as", &rep_code
);
135 ERR("unable to parse response to put_subbuffer");
140 if(!strcmp(rep_code
, "OK")) {
141 DBG("subbuffer put %s", buf
->name
);
145 ERR("invalid response to put_subbuffer");
152 ssize_t
patient_write(int fd
, const void *buf
, size_t count
)
154 const char *bufc
= (const char *) buf
;
158 result
= write(fd
, bufc
, count
);
170 return bufc
-(const char *)buf
;
173 int get_subbuffer_died(struct buffer_info
*buf
)
178 //int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, long *pconsumed_old)
180 // struct ltt_channel_buf_struct *ltt_buf = buf->bufstruct_mem;
182 ////ust// struct ltt_channel_struct *ltt_channel = (struct ltt_channel_struct *)buf->chan->private_data;
183 // long consumed_old, consumed_idx, commit_count, write_offset;
184 // consumed_old = atomic_long_read(<t_buf->consumed);
185 // consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan);
186 // commit_count = local_read(<t_buf->commit_count[consumed_idx]);
188 // * Make sure we read the commit count before reading the buffer
189 // * data and the write offset. Correct consumed offset ordering
190 // * wrt commit count is insured by the use of cmpxchg to update
191 // * the consumed offset.
194 // write_offset = local_read(<t_buf->offset);
196 // * Check that the subbuffer we are trying to consume has been
197 // * already fully committed.
199 // if (((commit_count - buf->chan->subbuf_size)
200 // & ltt_channel->commit_count_mask)
201 // - (BUFFER_TRUNC(consumed_old, buf->chan)
202 // >> ltt_channel->n_subbufs_order)
207 // * Check that we are not about to read the same subbuffer in
208 // * which the writer head is.
210 // if ((SUBBUF_TRUNC(write_offset, buf->chan)
211 // - SUBBUF_TRUNC(consumed_old, buf->chan))
216 // *pconsumed_old = consumed_old;
220 void *consumer_thread(void *arg
)
222 struct buffer_info
*buf
= (struct buffer_info
*) arg
;
227 /* get the subbuffer */
229 result
= get_subbuffer(buf
);
231 ERR("error getting subbuffer");
234 else if(result
== GET_SUBBUF_DONE
) {
238 else if(result
== GET_SUBBUF_DIED
) {
243 result
= get_subbuffer_died(buf
);
249 /* write data to file */
250 result
= patient_write(buf
->file_fd
, buf
->mem
+ (buf
->consumed_old
& (buf
->n_subbufs
* buf
->subbuf_size
-1)), buf
->subbuf_size
);
253 /* FIXME: maybe drop this trace */
256 /* put the subbuffer */
258 result
= put_subbuffer(buf
);
260 ERR("error putting subbuffer");
265 // result = put_subbuffer_died(buf);
269 DBG("thread for buffer %s is stopping", buf
->name
);
271 /* FIXME: destroy, unalloc... */
276 int add_buffer(pid_t pid
, char *bufname
)
278 struct buffer_info
*buf
;
286 buf
= (struct buffer_info
*) malloc(sizeof(struct buffer_info
));
288 ERR("add_buffer: insufficient memory");
296 result
= ustcomm_connect_app(buf
->pid
, &buf
->conn
);
298 ERR("unable to connect to process");
303 asprintf(&send_msg
, "get_shmid %s", buf
->name
);
304 ustcomm_send_request(&buf
->conn
, send_msg
, &received_msg
);
306 DBG("got buffer name %s", buf
->name
);
308 result
= sscanf(received_msg
, "%d %d", &buf
->shmid
, &buf
->bufstruct_shmid
);
310 ERR("unable to parse response to get_shmid");
314 DBG("got shmids %d %d", buf
->shmid
, buf
->bufstruct_shmid
);
317 asprintf(&send_msg
, "get_n_subbufs %s", buf
->name
);
318 ustcomm_send_request(&buf
->conn
, send_msg
, &received_msg
);
321 result
= sscanf(received_msg
, "%d", &buf
->n_subbufs
);
323 ERR("unable to parse response to get_n_subbufs");
327 DBG("got n_subbufs %d", buf
->n_subbufs
);
329 /* get subbuf size */
330 asprintf(&send_msg
, "get_subbuf_size %s", buf
->name
);
331 ustcomm_send_request(&buf
->conn
, send_msg
, &received_msg
);
334 result
= sscanf(received_msg
, "%d", &buf
->subbuf_size
);
336 ERR("unable to parse response to get_subbuf_size");
340 DBG("got subbuf_size %d", buf
->subbuf_size
);
343 buf
->mem
= shmat(buf
->shmid
, NULL
, 0);
344 if(buf
->mem
== (void *) 0) {
348 DBG("successfully attached buffer memory");
350 buf
->bufstruct_mem
= shmat(buf
->bufstruct_shmid
, NULL
, 0);
351 if(buf
->bufstruct_mem
== (void *) 0) {
355 DBG("successfully attached buffer bufstruct memory");
357 /* open file for output */
358 asprintf(&tmp
, "/tmp/trace/%s_0", buf
->name
);
359 result
= fd
= open(tmp
, O_WRONLY
| O_CREAT
| O_TRUNC
, 00600);
367 //list_add(&buf->list, &buffers);
369 pthread_create(&thr
, NULL
, consumer_thread
, buf
);
374 int main(int argc
, char **argv
)
376 struct ustcomm_ustd ustd
;
379 result
= ustcomm_init_ustd(&ustd
);
381 ERR("failed to initialize socket");
389 /* check for requests on our public socket */
390 result
= ustcomm_ustd_recv_message(&ustd
, &recvbuf
, NULL
, 100);
392 ERR("error in ustcomm_ustd_recv_message");
396 if(!strncmp(recvbuf
, "collect", 7)) {
401 result
= sscanf(recvbuf
, "%*s %d %50as", &pid
, &bufname
);
403 fprintf(stderr
, "parsing error: %s\n", recvbuf
);
406 result
= add_buffer(pid
, bufname
);
408 ERR("error in add_buffer");
This page took 0.043205 seconds and 3 git commands to generate.