ust: first try at blocking support for consumer
[lttng-ust.git] / ustd / ustd.c
CommitLineData
3796af9b
PMF
1#define _GNU_SOURCE
2
3#include <sys/types.h>
4#include <sys/shm.h>
688760ef
PMF
5#include <fcntl.h>
6#include <unistd.h>
3796af9b
PMF
7
8#include <stdlib.h>
9#include <stdio.h>
10#include <string.h>
11
12#include "localerr.h"
13#include "ustcomm.h"
14
811e4b93
PMF
15struct list_head buffers = LIST_HEAD_INIT(buffers);
16
3796af9b
PMF
17struct buffer_info {
18 char *name;
19 pid_t pid;
20
21 int shmid;
22 void *mem;
23 int memlen;
24
811e4b93
PMF
25 int n_subbufs;
26 int subbuf_size;
27
28 int file_fd; /* output file */
688760ef
PMF
29
30 struct list_head list;
31
32 long consumed_old;
3796af9b
PMF
33};
34
35int add_buffer(pid_t pid, char *bufname)
36{
37 struct buffer_info *buf;
38 char *send_msg;
39 char *received_msg;
40 int result;
688760ef
PMF
41 char *tmp;
42 int fd;
3796af9b
PMF
43
44 buf = (struct buffer_info *) malloc(sizeof(struct buffer_info));
45 if(buf == NULL) {
46 ERR("add_buffer: insufficient memory");
47 return -1;
48 }
49
50 buf->name = bufname;
51 buf->pid = pid;
52
53 /* get shmid */
54 asprintf(&send_msg, "get_shmid %s", buf->name);
55 send_message(pid, send_msg, &received_msg);
56 free(send_msg);
811e4b93 57 DBG("got buffer name %s", buf->name);
3796af9b
PMF
58
59 result = sscanf(received_msg, "%d", &buf->shmid);
60 if(result != 1) {
61 ERR("unable to parse response to get_shmid");
62 return -1;
63 }
64 free(received_msg);
811e4b93 65 DBG("got shmid %d", buf->shmid);
3796af9b 66
811e4b93 67 /* get n_subbufs */
3796af9b
PMF
68 asprintf(&send_msg, "get_n_subbufs %s", buf->name);
69 send_message(pid, send_msg, &received_msg);
70 free(send_msg);
71
811e4b93 72 result = sscanf(received_msg, "%d", &buf->n_subbufs);
3796af9b 73 if(result != 1) {
811e4b93
PMF
74 ERR("unable to parse response to get_n_subbufs");
75 return -1;
76 }
77 free(received_msg);
78 DBG("got n_subbufs %d", buf->n_subbufs);
79
80 /* get subbuf size */
81 asprintf(&send_msg, "get_subbuf_size %s", buf->name);
82 send_message(pid, send_msg, &received_msg);
83 free(send_msg);
84
85 result = sscanf(received_msg, "%d", &buf->subbuf_size);
86 if(result != 1) {
87 ERR("unable to parse response to get_subbuf_size");
3796af9b
PMF
88 return -1;
89 }
90 free(received_msg);
811e4b93 91 DBG("got subbuf_size %d", buf->subbuf_size);
3796af9b
PMF
92
93 /* attach memory */
94 buf->mem = shmat(buf->shmid, NULL, 0);
95 if(buf->mem == (void *) 0) {
96 perror("shmat");
97 return -1;
98 }
811e4b93 99 DBG("successfully attached memory");
3796af9b 100
688760ef
PMF
101 /* open file for output */
102 asprintf(&tmp, "/tmp/trace/%s_0", buf->name);
103 result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600);
104 if(result == -1) {
105 PERROR("open");
106 return -1;
107 }
108 buf->file_fd = fd;
109 free(tmp);
110
111 list_add(&buf->list, &buffers);
112
3796af9b
PMF
113 return 0;
114}
115
688760ef
PMF
116int get_subbuffer(struct buffer_info *buf)
117{
118 char *send_msg;
119 char *received_msg;
120 char *rep_code;
121 int retval;
122 int result;
123
124 asprintf(&send_msg, "get_subbuffer %s", buf->name);
125 result = send_message(buf->pid, send_msg, &received_msg);
126 if(result < 0) {
127 ERR("get_subbuffer: send_message failed");
128 return -1;
129 }
130 free(send_msg);
131
132 result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
133 if(result != 2) {
134 ERR("unable to parse response to get_subbuffer");
135 return -1;
136 }
137 free(received_msg);
138
139 if(!strcmp(rep_code, "OK")) {
140 DBG("got subbuffer %s", buf->name);
141 retval = 1;
142 }
143 else {
144 DBG("did not get subbuffer %s", buf->name);
145 retval = 0;
146 }
147
148 free(rep_code);
149 return retval;
150}
151
152int put_subbuffer(struct buffer_info *buf)
153{
154 char *send_msg;
155 char *received_msg;
156 char *rep_code;
157 int retval;
158 int result;
159
160 asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
161 result = send_message(buf->pid, send_msg, &received_msg);
162 if(result < 0) {
163 ERR("put_subbuffer: send_message failed");
164 return -1;
165 }
166 free(send_msg);
167
168 result = sscanf(received_msg, "%as", &rep_code);
169 if(result != 1) {
170 ERR("unable to parse response to put_subbuffer");
171 return -1;
172 }
173 free(received_msg);
174
175 if(!strcmp(rep_code, "OK")) {
176 DBG("subbuffer put %s", buf->name);
177 retval = 1;
178 }
179 else {
180 ERR("invalid response to put_subbuffer");
181 }
182
183 free(rep_code);
184 return retval;
185}
186
187ssize_t patient_write(int fd, const void *buf, size_t count)
188{
189 const char *bufc = (const char *) buf;
190 int result;
191
192 for(;;) {
193 result = write(fd, bufc, count);
194 if(result <= 0) {
195 return result;
196 }
197 count -= result;
198 bufc += result;
199
200 if(count == 0) {
201 break;
202 }
203 }
204
205 return bufc-(const char *)buf;
206}
207
3796af9b
PMF
208int main(int argc, char **argv)
209{
210 struct ustcomm_ustd ustd;
211 int result;
212
213 result = ustcomm_init_ustd(&ustd);
214 if(result == -1) {
215 ERR("failed to initialize socket");
216 return 1;
217 }
218
688760ef 219 /* app loop */
3796af9b
PMF
220 for(;;) {
221 char *recvbuf;
688760ef 222 struct buffer_info *buf;
3796af9b 223
688760ef
PMF
224 /* 1. check for requests on our public socket */
225 result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100);
226 if(result == -1) {
227 ERR("error in ustcomm_ustd_recv_message");
228 continue;
229 }
230 if(result > 0) {
231 if(!strncmp(recvbuf, "collect", 7)) {
232 pid_t pid;
233 char *bufname;
234 int result;
3796af9b 235
688760ef
PMF
236 result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
237 if(result != 2) {
238 fprintf(stderr, "parsing error: %s\n", recvbuf);
239 }
3796af9b 240
688760ef
PMF
241 result = add_buffer(pid, bufname);
242 if(result < 0) {
243 ERR("error in add_buffer");
244 continue;
245 }
3796af9b
PMF
246 }
247
688760ef 248 free(recvbuf);
3796af9b
PMF
249 }
250
688760ef
PMF
251 /* 2. try to consume data from tracing apps */
252 list_for_each_entry(buf, &buffers, list) {
253 result = get_subbuffer(buf);
254 if(result == -1) {
255 ERR("error getting subbuffer");
256 continue;
257 }
258 if(result == 0)
259 continue;
260
261 /* write data to file */
262 //result = write(buf->file_fd, buf->, );
263 result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size);
264 if(result == -1) {
265 PERROR("write");
266 /* FIXME: maybe drop this trace */
267 }
268
269 result = put_subbuffer(buf);
270 if(result == -1) {
271 ERR("error putting subbuffer");
272 }
273 }
3796af9b
PMF
274 }
275
276 return 0;
277}
This page took 0.034372 seconds and 4 git commands to generate.