d5884dfe040c8bed7667a03e06784e444023c0e1
[lttng-ust.git] / ustd / ustd.c
1 /*
2 * Copyright (C) 2009 Pierre-Marc Fournier
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 #define _GNU_SOURCE
19
20 #include <sys/types.h>
21 #include <sys/shm.h>
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <pthread.h>
25
26 #include <stdlib.h>
27 #include <stdio.h>
28 #include <string.h>
29
30 #include "localerr.h"
31 #include "ustcomm.h"
32
33 struct list_head buffers = LIST_HEAD_INIT(buffers);
34
35 struct buffer_info {
36 char *name;
37 pid_t pid;
38 struct ustcomm_connection conn;
39
40 int shmid;
41 void *mem;
42 int memlen;
43
44 int n_subbufs;
45 int subbuf_size;
46
47 int file_fd; /* output file */
48
49 struct list_head list;
50
51 long consumed_old;
52 };
53
54 /* return value: 0 = subbuffer is finished, it won't produce data anymore
55 * 1 = got subbuffer successfully
56 * <0 = error
57 */
58
59 int get_subbuffer(struct buffer_info *buf)
60 {
61 char *send_msg;
62 char *received_msg;
63 char *rep_code;
64 int retval;
65 int result;
66
67 asprintf(&send_msg, "get_subbuffer %s", buf->name);
68 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
69 if(result < 0) {
70 ERR("get_subbuffer: ustcomm_send_request failed");
71 return -1;
72 }
73 free(send_msg);
74
75 result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
76 if(result != 2 && result != 1) {
77 ERR("unable to parse response to get_subbuffer");
78 return -1;
79 }
80
81 DBG("received msg is %s", received_msg);
82
83 if(!strcmp(rep_code, "OK")) {
84 DBG("got subbuffer %s", buf->name);
85 retval = 1;
86 }
87 else if(nth_token_is(received_msg, "END", 0) == 1) {
88 return 0;
89 }
90 else {
91 DBG("error getting subbuffer %s", buf->name);
92 retval = -1;
93 }
94
95 /* FIMXE: free correctly the stuff */
96 free(received_msg);
97 free(rep_code);
98 return retval;
99 }
100
101 int put_subbuffer(struct buffer_info *buf)
102 {
103 char *send_msg;
104 char *received_msg;
105 char *rep_code;
106 int retval;
107 int result;
108
109 asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
110 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
111 if(result < 0) {
112 ERR("put_subbuffer: send_message failed");
113 return -1;
114 }
115 free(send_msg);
116
117 result = sscanf(received_msg, "%as", &rep_code);
118 if(result != 1) {
119 ERR("unable to parse response to put_subbuffer");
120 return -1;
121 }
122 free(received_msg);
123
124 if(!strcmp(rep_code, "OK")) {
125 DBG("subbuffer put %s", buf->name);
126 retval = 1;
127 }
128 else {
129 ERR("invalid response to put_subbuffer");
130 }
131
132 free(rep_code);
133 return retval;
134 }
135
136 ssize_t patient_write(int fd, const void *buf, size_t count)
137 {
138 const char *bufc = (const char *) buf;
139 int result;
140
141 for(;;) {
142 result = write(fd, bufc, count);
143 if(result <= 0) {
144 return result;
145 }
146 count -= result;
147 bufc += result;
148
149 if(count == 0) {
150 break;
151 }
152 }
153
154 return bufc-(const char *)buf;
155 }
156
157 void *consumer_thread(void *arg)
158 {
159 struct buffer_info *buf = (struct buffer_info *) arg;
160 int result;
161
162 for(;;) {
163 result = get_subbuffer(buf);
164 if(result == -1) {
165 ERR("error getting subbuffer");
166 continue;
167 }
168 if(result == 0) {
169 /* this is done */
170 break;
171 }
172
173 /* write data to file */
174 result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size);
175 if(result == -1) {
176 PERROR("write");
177 /* FIXME: maybe drop this trace */
178 }
179
180 result = put_subbuffer(buf);
181 if(result == -1) {
182 ERR("error putting subbuffer");
183 break;
184 }
185 }
186
187 DBG("thread for buffer %s is stopping", buf->name);
188
189 return NULL;
190 }
191
192 int add_buffer(pid_t pid, char *bufname)
193 {
194 struct buffer_info *buf;
195 char *send_msg;
196 char *received_msg;
197 int result;
198 char *tmp;
199 int fd;
200 pthread_t thr;
201
202 buf = (struct buffer_info *) malloc(sizeof(struct buffer_info));
203 if(buf == NULL) {
204 ERR("add_buffer: insufficient memory");
205 return -1;
206 }
207
208 buf->name = bufname;
209 buf->pid = pid;
210
211 /* connect to app */
212 result = ustcomm_connect_app(buf->pid, &buf->conn);
213 if(result) {
214 ERR("unable to connect to process");
215 return -1;
216 }
217
218 /* get shmid */
219 asprintf(&send_msg, "get_shmid %s", buf->name);
220 ustcomm_send_request(&buf->conn, send_msg, &received_msg);
221 free(send_msg);
222 DBG("got buffer name %s", buf->name);
223
224 result = sscanf(received_msg, "%d", &buf->shmid);
225 if(result != 1) {
226 ERR("unable to parse response to get_shmid");
227 return -1;
228 }
229 free(received_msg);
230 DBG("got shmid %d", buf->shmid);
231
232 /* get n_subbufs */
233 asprintf(&send_msg, "get_n_subbufs %s", buf->name);
234 ustcomm_send_request(&buf->conn, send_msg, &received_msg);
235 free(send_msg);
236
237 result = sscanf(received_msg, "%d", &buf->n_subbufs);
238 if(result != 1) {
239 ERR("unable to parse response to get_n_subbufs");
240 return -1;
241 }
242 free(received_msg);
243 DBG("got n_subbufs %d", buf->n_subbufs);
244
245 /* get subbuf size */
246 asprintf(&send_msg, "get_subbuf_size %s", buf->name);
247 ustcomm_send_request(&buf->conn, send_msg, &received_msg);
248 free(send_msg);
249
250 result = sscanf(received_msg, "%d", &buf->subbuf_size);
251 if(result != 1) {
252 ERR("unable to parse response to get_subbuf_size");
253 return -1;
254 }
255 free(received_msg);
256 DBG("got subbuf_size %d", buf->subbuf_size);
257
258 /* attach memory */
259 buf->mem = shmat(buf->shmid, NULL, 0);
260 if(buf->mem == (void *) 0) {
261 perror("shmat");
262 return -1;
263 }
264 DBG("successfully attached memory");
265
266 /* open file for output */
267 asprintf(&tmp, "/tmp/trace/%s_0", buf->name);
268 result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600);
269 if(result == -1) {
270 PERROR("open");
271 return -1;
272 }
273 buf->file_fd = fd;
274 free(tmp);
275
276 //list_add(&buf->list, &buffers);
277
278 pthread_create(&thr, NULL, consumer_thread, buf);
279
280 return 0;
281 }
282
283 int main(int argc, char **argv)
284 {
285 struct ustcomm_ustd ustd;
286 int result;
287
288 result = ustcomm_init_ustd(&ustd);
289 if(result == -1) {
290 ERR("failed to initialize socket");
291 return 1;
292 }
293
294 /* app loop */
295 for(;;) {
296 char *recvbuf;
297
298 /* check for requests on our public socket */
299 result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100);
300 if(result == -1) {
301 ERR("error in ustcomm_ustd_recv_message");
302 continue;
303 }
304 if(result > 0) {
305 if(!strncmp(recvbuf, "collect", 7)) {
306 pid_t pid;
307 char *bufname;
308 int result;
309
310 result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
311 if(result != 2) {
312 fprintf(stderr, "parsing error: %s\n", recvbuf);
313 }
314
315 result = add_buffer(pid, bufname);
316 if(result < 0) {
317 ERR("error in add_buffer");
318 continue;
319 }
320 }
321
322 free(recvbuf);
323 }
324 }
325
326 return 0;
327 }
This page took 0.035779 seconds and 3 git commands to generate.