start implementing sharing of buffer info
[lttng-ust.git] / ustd / ustd.c
CommitLineData
1f8b0dff
PMF
1/*
2 * Copyright (C) 2009 Pierre-Marc Fournier
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
3796af9b
PMF
18#define _GNU_SOURCE
19
20#include <sys/types.h>
21#include <sys/shm.h>
688760ef
PMF
22#include <fcntl.h>
23#include <unistd.h>
3a7b90de 24#include <pthread.h>
3796af9b
PMF
25
26#include <stdlib.h>
27#include <stdio.h>
28#include <string.h>
29
30#include "localerr.h"
31#include "ustcomm.h"
32
811e4b93
PMF
33struct list_head buffers = LIST_HEAD_INIT(buffers);
34
3796af9b
PMF
35struct buffer_info {
36 char *name;
37 pid_t pid;
4e2a8808 38 struct ustcomm_connection conn;
3796af9b
PMF
39
40 int shmid;
8cefc145
PMF
41 int bufstruct_shmid;
42
43 /* the buffer memory */
3796af9b 44 void *mem;
8cefc145 45 /* buffer size */
3796af9b 46 int memlen;
8cefc145 47 /* number of subbuffers in buffer */
811e4b93 48 int n_subbufs;
8cefc145 49 /* size of each subbuffer */
811e4b93
PMF
50 int subbuf_size;
51
8cefc145
PMF
52 /* the buffer information struct */
53 void *bufstruct_mem;
54
811e4b93 55 int file_fd; /* output file */
688760ef
PMF
56
57 struct list_head list;
58
59 long consumed_old;
3796af9b
PMF
60};
61
3a7b90de
PMF
62/* return value: 0 = subbuffer is finished, it won't produce data anymore
63 * 1 = got subbuffer successfully
64 * <0 = error
65 */
3796af9b 66
8cefc145
PMF
67#define GET_SUBBUF_OK 1
68#define GET_SUBBUF_DONE 0
69#define GET_SUBBUF_DIED 2
70
688760ef
PMF
71int get_subbuffer(struct buffer_info *buf)
72{
73 char *send_msg;
74 char *received_msg;
75 char *rep_code;
76 int retval;
77 int result;
78
79 asprintf(&send_msg, "get_subbuffer %s", buf->name);
3bb56863 80 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
8cefc145 81 free(send_msg);
688760ef 82 if(result < 0) {
3bb56863 83 ERR("get_subbuffer: ustcomm_send_request failed");
688760ef
PMF
84 return -1;
85 }
8cefc145
PMF
86 else if(result == 0) {
87 DBG("app died while being traced");
88 return GET_SUBBUF_DIED;
89 }
688760ef
PMF
90
91 result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
3a7b90de 92 if(result != 2 && result != 1) {
688760ef
PMF
93 ERR("unable to parse response to get_subbuffer");
94 return -1;
95 }
3a7b90de
PMF
96
97 DBG("received msg is %s", received_msg);
688760ef
PMF
98
99 if(!strcmp(rep_code, "OK")) {
100 DBG("got subbuffer %s", buf->name);
8cefc145 101 retval = GET_SUBBUF_OK;
688760ef 102 }
3a7b90de 103 else if(nth_token_is(received_msg, "END", 0) == 1) {
8cefc145 104 return GET_SUBBUF_DONE;
3a7b90de 105 }
688760ef 106 else {
3a7b90de
PMF
107 DBG("error getting subbuffer %s", buf->name);
108 retval = -1;
688760ef
PMF
109 }
110
3a7b90de
PMF
111 /* FIMXE: free correctly the stuff */
112 free(received_msg);
688760ef
PMF
113 free(rep_code);
114 return retval;
115}
116
117int put_subbuffer(struct buffer_info *buf)
118{
119 char *send_msg;
120 char *received_msg;
121 char *rep_code;
122 int retval;
123 int result;
124
125 asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
3bb56863 126 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
688760ef
PMF
127 if(result < 0) {
128 ERR("put_subbuffer: send_message failed");
129 return -1;
130 }
131 free(send_msg);
132
133 result = sscanf(received_msg, "%as", &rep_code);
134 if(result != 1) {
135 ERR("unable to parse response to put_subbuffer");
136 return -1;
137 }
138 free(received_msg);
139
140 if(!strcmp(rep_code, "OK")) {
141 DBG("subbuffer put %s", buf->name);
142 retval = 1;
143 }
144 else {
145 ERR("invalid response to put_subbuffer");
146 }
147
148 free(rep_code);
149 return retval;
150}
151
152ssize_t patient_write(int fd, const void *buf, size_t count)
153{
154 const char *bufc = (const char *) buf;
155 int result;
156
157 for(;;) {
158 result = write(fd, bufc, count);
159 if(result <= 0) {
160 return result;
161 }
162 count -= result;
163 bufc += result;
164
165 if(count == 0) {
166 break;
167 }
168 }
169
170 return bufc-(const char *)buf;
171}
172
8cefc145
PMF
173int get_subbuffer_died(struct buffer_info *buf)
174{
175 return 0;
176}
177
178//int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, long *pconsumed_old)
179//{
180// struct ltt_channel_buf_struct *ltt_buf = buf->bufstruct_mem;
181//
182////ust// struct ltt_channel_struct *ltt_channel = (struct ltt_channel_struct *)buf->chan->private_data;
183// long consumed_old, consumed_idx, commit_count, write_offset;
184// consumed_old = atomic_long_read(&ltt_buf->consumed);
185// consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan);
186// commit_count = local_read(&ltt_buf->commit_count[consumed_idx]);
187// /*
188// * Make sure we read the commit count before reading the buffer
189// * data and the write offset. Correct consumed offset ordering
190// * wrt commit count is insured by the use of cmpxchg to update
191// * the consumed offset.
192// */
193// smp_rmb();
194// write_offset = local_read(&ltt_buf->offset);
195// /*
196// * Check that the subbuffer we are trying to consume has been
197// * already fully committed.
198// */
199// if (((commit_count - buf->chan->subbuf_size)
200// & ltt_channel->commit_count_mask)
201// - (BUFFER_TRUNC(consumed_old, buf->chan)
202// >> ltt_channel->n_subbufs_order)
203// != 0) {
204// return -EAGAIN;
205// }
206// /*
207// * Check that we are not about to read the same subbuffer in
208// * which the writer head is.
209// */
210// if ((SUBBUF_TRUNC(write_offset, buf->chan)
211// - SUBBUF_TRUNC(consumed_old, buf->chan))
212// == 0) {
213// return -EAGAIN;
214// }
215//
216// *pconsumed_old = consumed_old;
217// return 0;
218//}
219
3a7b90de
PMF
220void *consumer_thread(void *arg)
221{
222 struct buffer_info *buf = (struct buffer_info *) arg;
223 int result;
8cefc145 224 int died = 0;
3a7b90de
PMF
225
226 for(;;) {
8cefc145
PMF
227 /* get the subbuffer */
228 if(died == 0) {
229 result = get_subbuffer(buf);
230 if(result == -1) {
231 ERR("error getting subbuffer");
232 continue;
233 }
234 else if(result == GET_SUBBUF_DONE) {
235 /* this is done */
236 break;
237 }
238 else if(result == GET_SUBBUF_DIED) {
239 died = 1;
240 }
3a7b90de 241 }
8cefc145
PMF
242 if(died == 1) {
243 result = get_subbuffer_died(buf);
244 if(result <= 0) {
245 break;
246 }
3a7b90de
PMF
247 }
248
249 /* write data to file */
250 result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size);
251 if(result == -1) {
252 PERROR("write");
253 /* FIXME: maybe drop this trace */
254 }
255
8cefc145
PMF
256 /* put the subbuffer */
257 if(died == 0) {
258 result = put_subbuffer(buf);
259 if(result == -1) {
260 ERR("error putting subbuffer");
261 break;
262 }
263 }
264 else {
265// result = put_subbuffer_died(buf);
3a7b90de
PMF
266 }
267 }
268
269 DBG("thread for buffer %s is stopping", buf->name);
270
8cefc145
PMF
271 /* FIXME: destroy, unalloc... */
272
3a7b90de
PMF
273 return NULL;
274}
275
276int add_buffer(pid_t pid, char *bufname)
277{
278 struct buffer_info *buf;
279 char *send_msg;
280 char *received_msg;
281 int result;
282 char *tmp;
283 int fd;
284 pthread_t thr;
285
286 buf = (struct buffer_info *) malloc(sizeof(struct buffer_info));
287 if(buf == NULL) {
288 ERR("add_buffer: insufficient memory");
289 return -1;
290 }
291
292 buf->name = bufname;
293 buf->pid = pid;
294
4e2a8808
PMF
295 /* connect to app */
296 result = ustcomm_connect_app(buf->pid, &buf->conn);
297 if(result) {
298 ERR("unable to connect to process");
299 return -1;
300 }
301
3a7b90de
PMF
302 /* get shmid */
303 asprintf(&send_msg, "get_shmid %s", buf->name);
4e2a8808 304 ustcomm_send_request(&buf->conn, send_msg, &received_msg);
3a7b90de
PMF
305 free(send_msg);
306 DBG("got buffer name %s", buf->name);
307
8cefc145
PMF
308 result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid);
309 if(result != 2) {
3a7b90de
PMF
310 ERR("unable to parse response to get_shmid");
311 return -1;
312 }
313 free(received_msg);
8cefc145 314 DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid);
3a7b90de
PMF
315
316 /* get n_subbufs */
317 asprintf(&send_msg, "get_n_subbufs %s", buf->name);
4e2a8808 318 ustcomm_send_request(&buf->conn, send_msg, &received_msg);
3a7b90de
PMF
319 free(send_msg);
320
321 result = sscanf(received_msg, "%d", &buf->n_subbufs);
322 if(result != 1) {
323 ERR("unable to parse response to get_n_subbufs");
324 return -1;
325 }
326 free(received_msg);
327 DBG("got n_subbufs %d", buf->n_subbufs);
328
329 /* get subbuf size */
330 asprintf(&send_msg, "get_subbuf_size %s", buf->name);
4e2a8808 331 ustcomm_send_request(&buf->conn, send_msg, &received_msg);
3a7b90de
PMF
332 free(send_msg);
333
334 result = sscanf(received_msg, "%d", &buf->subbuf_size);
335 if(result != 1) {
336 ERR("unable to parse response to get_subbuf_size");
337 return -1;
338 }
339 free(received_msg);
340 DBG("got subbuf_size %d", buf->subbuf_size);
341
342 /* attach memory */
343 buf->mem = shmat(buf->shmid, NULL, 0);
344 if(buf->mem == (void *) 0) {
345 perror("shmat");
346 return -1;
347 }
8cefc145
PMF
348 DBG("successfully attached buffer memory");
349
350 buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0);
351 if(buf->bufstruct_mem == (void *) 0) {
352 perror("shmat");
353 return -1;
354 }
355 DBG("successfully attached buffer bufstruct memory");
3a7b90de
PMF
356
357 /* open file for output */
358 asprintf(&tmp, "/tmp/trace/%s_0", buf->name);
359 result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600);
360 if(result == -1) {
361 PERROR("open");
362 return -1;
363 }
364 buf->file_fd = fd;
365 free(tmp);
366
367 //list_add(&buf->list, &buffers);
368
369 pthread_create(&thr, NULL, consumer_thread, buf);
370
371 return 0;
372}
373
3796af9b
PMF
374int main(int argc, char **argv)
375{
376 struct ustcomm_ustd ustd;
377 int result;
378
379 result = ustcomm_init_ustd(&ustd);
380 if(result == -1) {
381 ERR("failed to initialize socket");
382 return 1;
383 }
384
688760ef 385 /* app loop */
3796af9b
PMF
386 for(;;) {
387 char *recvbuf;
388
3a7b90de 389 /* check for requests on our public socket */
688760ef
PMF
390 result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100);
391 if(result == -1) {
392 ERR("error in ustcomm_ustd_recv_message");
393 continue;
394 }
395 if(result > 0) {
396 if(!strncmp(recvbuf, "collect", 7)) {
397 pid_t pid;
398 char *bufname;
399 int result;
3796af9b 400
688760ef
PMF
401 result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
402 if(result != 2) {
403 fprintf(stderr, "parsing error: %s\n", recvbuf);
404 }
3796af9b 405
688760ef
PMF
406 result = add_buffer(pid, bufname);
407 if(result < 0) {
408 ERR("error in add_buffer");
409 continue;
410 }
3796af9b
PMF
411 }
412
688760ef 413 free(recvbuf);
3796af9b 414 }
3796af9b
PMF
415 }
416
417 return 0;
418}
This page took 0.041092 seconds and 4 git commands to generate.