continue working on build system
[ust.git] / ustd / ustd.c
1 /*
2 * Copyright (C) 2009 Pierre-Marc Fournier
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 #define _GNU_SOURCE
19
20 #include <sys/types.h>
21 #include <sys/shm.h>
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <pthread.h>
25
26 #include <stdlib.h>
27 #include <stdio.h>
28 #include <string.h>
29
30 #include "ustd.h"
31 #include "localerr.h"
32 #include "ustcomm.h"
33
34 struct list_head buffers = LIST_HEAD_INIT(buffers);
35
36 /* return value: 0 = subbuffer is finished, it won't produce data anymore
37 * 1 = got subbuffer successfully
38 * <0 = error
39 */
40
41 #define GET_SUBBUF_OK 1
42 #define GET_SUBBUF_DONE 0
43 #define GET_SUBBUF_DIED 2
44
45 int get_subbuffer(struct buffer_info *buf)
46 {
47 char *send_msg;
48 char *received_msg;
49 char *rep_code;
50 int retval;
51 int result;
52
53 asprintf(&send_msg, "get_subbuffer %s", buf->name);
54 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
55 free(send_msg);
56 if(result < 0) {
57 ERR("get_subbuffer: ustcomm_send_request failed");
58 return -1;
59 }
60 else if(result == 0) {
61 DBG("app died while being traced");
62 return GET_SUBBUF_DIED;
63 }
64
65 result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
66 if(result != 2 && result != 1) {
67 ERR("unable to parse response to get_subbuffer");
68 return -1;
69 }
70
71 DBG("received msg is %s", received_msg);
72
73 if(!strcmp(rep_code, "OK")) {
74 DBG("got subbuffer %s", buf->name);
75 retval = GET_SUBBUF_OK;
76 }
77 else if(nth_token_is(received_msg, "END", 0) == 1) {
78 return GET_SUBBUF_DONE;
79 }
80 else {
81 DBG("error getting subbuffer %s", buf->name);
82 retval = -1;
83 }
84
85 /* FIMXE: free correctly the stuff */
86 free(received_msg);
87 free(rep_code);
88 return retval;
89 }
90
91 int put_subbuffer(struct buffer_info *buf)
92 {
93 char *send_msg;
94 char *received_msg;
95 char *rep_code;
96 int retval;
97 int result;
98
99 asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
100 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
101 if(result < 0) {
102 ERR("put_subbuffer: send_message failed");
103 return -1;
104 }
105 free(send_msg);
106
107 result = sscanf(received_msg, "%as", &rep_code);
108 if(result != 1) {
109 ERR("unable to parse response to put_subbuffer");
110 return -1;
111 }
112 free(received_msg);
113
114 if(!strcmp(rep_code, "OK")) {
115 DBG("subbuffer put %s", buf->name);
116 retval = 1;
117 }
118 else {
119 ERR("invalid response to put_subbuffer");
120 }
121
122 free(rep_code);
123 return retval;
124 }
125
126 ssize_t patient_write(int fd, const void *buf, size_t count)
127 {
128 const char *bufc = (const char *) buf;
129 int result;
130
131 for(;;) {
132 result = write(fd, bufc, count);
133 if(result <= 0) {
134 return result;
135 }
136 count -= result;
137 bufc += result;
138
139 if(count == 0) {
140 break;
141 }
142 }
143
144 return bufc-(const char *)buf;
145 }
146
147 void *consumer_thread(void *arg)
148 {
149 struct buffer_info *buf = (struct buffer_info *) arg;
150 int result;
151
152 for(;;) {
153 /* get the subbuffer */
154 result = get_subbuffer(buf);
155 if(result == -1) {
156 ERR("error getting subbuffer");
157 continue;
158 }
159 else if(result == GET_SUBBUF_DONE) {
160 /* this is done */
161 break;
162 }
163 else if(result == GET_SUBBUF_DIED) {
164 finish_consuming_dead_subbuffer(buf);
165 break;
166 }
167
168 /* write data to file */
169 result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size);
170 if(result == -1) {
171 PERROR("write");
172 /* FIXME: maybe drop this trace */
173 }
174
175 /* put the subbuffer */
176 result = put_subbuffer(buf);
177 if(result == -1) {
178 ERR("error putting subbuffer");
179 break;
180 }
181 }
182
183 DBG("thread for buffer %s is stopping", buf->name);
184
185 /* FIXME: destroy, unalloc... */
186
187 return NULL;
188 }
189
190 int add_buffer(pid_t pid, char *bufname)
191 {
192 struct buffer_info *buf;
193 char *send_msg;
194 char *received_msg;
195 int result;
196 char *tmp;
197 int fd;
198 pthread_t thr;
199
200 buf = (struct buffer_info *) malloc(sizeof(struct buffer_info));
201 if(buf == NULL) {
202 ERR("add_buffer: insufficient memory");
203 return -1;
204 }
205
206 buf->name = bufname;
207 buf->pid = pid;
208
209 /* connect to app */
210 result = ustcomm_connect_app(buf->pid, &buf->conn);
211 if(result) {
212 ERR("unable to connect to process");
213 return -1;
214 }
215
216 /* get shmid */
217 asprintf(&send_msg, "get_shmid %s", buf->name);
218 ustcomm_send_request(&buf->conn, send_msg, &received_msg);
219 free(send_msg);
220 DBG("got buffer name %s", buf->name);
221
222 result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid);
223 if(result != 2) {
224 ERR("unable to parse response to get_shmid");
225 return -1;
226 }
227 free(received_msg);
228 DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid);
229
230 /* get n_subbufs */
231 asprintf(&send_msg, "get_n_subbufs %s", buf->name);
232 ustcomm_send_request(&buf->conn, send_msg, &received_msg);
233 free(send_msg);
234
235 result = sscanf(received_msg, "%d", &buf->n_subbufs);
236 if(result != 1) {
237 ERR("unable to parse response to get_n_subbufs");
238 return -1;
239 }
240 free(received_msg);
241 DBG("got n_subbufs %d", buf->n_subbufs);
242
243 /* get subbuf size */
244 asprintf(&send_msg, "get_subbuf_size %s", buf->name);
245 ustcomm_send_request(&buf->conn, send_msg, &received_msg);
246 free(send_msg);
247
248 result = sscanf(received_msg, "%d", &buf->subbuf_size);
249 if(result != 1) {
250 ERR("unable to parse response to get_subbuf_size");
251 return -1;
252 }
253 free(received_msg);
254 DBG("got subbuf_size %d", buf->subbuf_size);
255
256 /* attach memory */
257 buf->mem = shmat(buf->shmid, NULL, 0);
258 if(buf->mem == (void *) 0) {
259 perror("shmat");
260 return -1;
261 }
262 DBG("successfully attached buffer memory");
263
264 buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0);
265 if(buf->bufstruct_mem == (void *) 0) {
266 perror("shmat");
267 return -1;
268 }
269 DBG("successfully attached buffer bufstruct memory");
270
271 /* open file for output */
272 asprintf(&tmp, "/tmp/trace/%s_0", buf->name);
273 result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600);
274 if(result == -1) {
275 PERROR("open");
276 ERR("failed opening trace file %s", tmp);
277 return -1;
278 }
279 buf->file_fd = fd;
280 free(tmp);
281
282 //list_add(&buf->list, &buffers);
283
284 pthread_create(&thr, NULL, consumer_thread, buf);
285
286 return 0;
287 }
288
289 int main(int argc, char **argv)
290 {
291 struct ustcomm_ustd ustd;
292 int result;
293
294 result = ustcomm_init_ustd(&ustd);
295 if(result == -1) {
296 ERR("failed to initialize socket");
297 return 1;
298 }
299
300 /* app loop */
301 for(;;) {
302 char *recvbuf;
303
304 /* check for requests on our public socket */
305 result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100);
306 if(result == -1) {
307 ERR("error in ustcomm_ustd_recv_message");
308 continue;
309 }
310 if(result > 0) {
311 if(!strncmp(recvbuf, "collect", 7)) {
312 pid_t pid;
313 char *bufname;
314 int result;
315
316 result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
317 if(result != 2) {
318 fprintf(stderr, "parsing error: %s\n", recvbuf);
319 }
320
321 result = add_buffer(pid, bufname);
322 if(result < 0) {
323 ERR("error in add_buffer");
324 continue;
325 }
326 }
327
328 free(recvbuf);
329 }
330 }
331
332 return 0;
333 }
This page took 0.038599 seconds and 5 git commands to generate.