|
|
1.1 root 1: /*
2: * QEMU buffered QEMUFile
3: *
4: * Copyright IBM, Corp. 2008
5: *
6: * Authors:
7: * Anthony Liguori <[email protected]>
8: *
9: * This work is licensed under the terms of the GNU GPL, version 2. See
10: * the COPYING file in the top-level directory.
11: *
1.1.1.8 ! root 12: * Contributions after 2012-01-13 are licensed under the terms of the
! 13: * GNU GPL, version 2 or (at your option) any later version.
1.1 root 14: */
15:
16: #include "qemu-common.h"
17: #include "hw/hw.h"
18: #include "qemu-timer.h"
19: #include "qemu-char.h"
20: #include "buffered_file.h"
21:
22: //#define DEBUG_BUFFERED_FILE
23:
24: typedef struct QEMUFileBuffered
25: {
26: BufferedPutFunc *put_buffer;
27: BufferedPutReadyFunc *put_ready;
28: BufferedWaitForUnfreezeFunc *wait_for_unfreeze;
29: BufferedCloseFunc *close;
30: void *opaque;
31: QEMUFile *file;
32: int freeze_output;
33: size_t bytes_xfer;
34: size_t xfer_limit;
35: uint8_t *buffer;
36: size_t buffer_size;
37: size_t buffer_capacity;
38: QEMUTimer *timer;
39: } QEMUFileBuffered;
40:
41: #ifdef DEBUG_BUFFERED_FILE
1.1.1.4 root 42: #define DPRINTF(fmt, ...) \
1.1 root 43: do { printf("buffered-file: " fmt, ## __VA_ARGS__); } while (0)
44: #else
1.1.1.4 root 45: #define DPRINTF(fmt, ...) \
1.1 root 46: do { } while (0)
47: #endif
48:
49: static void buffered_append(QEMUFileBuffered *s,
50: const uint8_t *buf, size_t size)
51: {
52: if (size > (s->buffer_capacity - s->buffer_size)) {
53: void *tmp;
54:
1.1.1.4 root 55: DPRINTF("increasing buffer capacity from %zu by %zu\n",
1.1 root 56: s->buffer_capacity, size + 1024);
57:
58: s->buffer_capacity += size + 1024;
59:
1.1.1.7 root 60: tmp = g_realloc(s->buffer, s->buffer_capacity);
1.1 root 61: if (tmp == NULL) {
62: fprintf(stderr, "qemu file buffer expansion failed\n");
63: exit(1);
64: }
65:
66: s->buffer = tmp;
67: }
68:
69: memcpy(s->buffer + s->buffer_size, buf, size);
70: s->buffer_size += size;
71: }
72:
73: static void buffered_flush(QEMUFileBuffered *s)
74: {
75: size_t offset = 0;
1.1.1.7 root 76: int error;
1.1 root 77:
1.1.1.7 root 78: error = qemu_file_get_error(s->file);
79: if (error != 0) {
80: DPRINTF("flush when error, bailing: %s\n", strerror(-error));
1.1 root 81: return;
82: }
83:
1.1.1.4 root 84: DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
1.1 root 85:
86: while (offset < s->buffer_size) {
87: ssize_t ret;
88:
89: ret = s->put_buffer(s->opaque, s->buffer + offset,
90: s->buffer_size - offset);
91: if (ret == -EAGAIN) {
1.1.1.4 root 92: DPRINTF("backend not ready, freezing\n");
1.1 root 93: s->freeze_output = 1;
94: break;
95: }
96:
97: if (ret <= 0) {
1.1.1.4 root 98: DPRINTF("error flushing data, %zd\n", ret);
1.1.1.7 root 99: qemu_file_set_error(s->file, ret);
1.1 root 100: break;
101: } else {
1.1.1.4 root 102: DPRINTF("flushed %zd byte(s)\n", ret);
1.1 root 103: offset += ret;
104: }
105: }
106:
1.1.1.4 root 107: DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
1.1 root 108: memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
109: s->buffer_size -= offset;
110: }
111:
112: static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
113: {
114: QEMUFileBuffered *s = opaque;
1.1.1.7 root 115: int offset = 0, error;
1.1 root 116: ssize_t ret;
117:
1.1.1.4 root 118: DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
1.1 root 119:
1.1.1.7 root 120: error = qemu_file_get_error(s->file);
121: if (error) {
122: DPRINTF("flush when error, bailing: %s\n", strerror(-error));
123: return error;
1.1 root 124: }
125:
1.1.1.4 root 126: DPRINTF("unfreezing output\n");
1.1 root 127: s->freeze_output = 0;
128:
129: buffered_flush(s);
130:
131: while (!s->freeze_output && offset < size) {
132: if (s->bytes_xfer > s->xfer_limit) {
1.1.1.4 root 133: DPRINTF("transfer limit exceeded when putting\n");
1.1 root 134: break;
135: }
136:
137: ret = s->put_buffer(s->opaque, buf + offset, size - offset);
138: if (ret == -EAGAIN) {
1.1.1.4 root 139: DPRINTF("backend not ready, freezing\n");
1.1 root 140: s->freeze_output = 1;
141: break;
142: }
143:
144: if (ret <= 0) {
1.1.1.4 root 145: DPRINTF("error putting\n");
1.1.1.7 root 146: qemu_file_set_error(s->file, ret);
1.1 root 147: offset = -EINVAL;
148: break;
149: }
150:
1.1.1.4 root 151: DPRINTF("put %zd byte(s)\n", ret);
1.1 root 152: offset += ret;
153: s->bytes_xfer += ret;
154: }
155:
156: if (offset >= 0) {
1.1.1.4 root 157: DPRINTF("buffering %d bytes\n", size - offset);
1.1 root 158: buffered_append(s, buf + offset, size - offset);
159: offset = size;
160: }
161:
1.1.1.4 root 162: if (pos == 0 && size == 0) {
163: DPRINTF("file is ready\n");
164: if (s->bytes_xfer <= s->xfer_limit) {
165: DPRINTF("notifying client\n");
166: s->put_ready(s->opaque);
167: }
168: }
169:
1.1 root 170: return offset;
171: }
172:
173: static int buffered_close(void *opaque)
174: {
175: QEMUFileBuffered *s = opaque;
176: int ret;
177:
1.1.1.4 root 178: DPRINTF("closing\n");
1.1 root 179:
1.1.1.7 root 180: while (!qemu_file_get_error(s->file) && s->buffer_size) {
1.1 root 181: buffered_flush(s);
182: if (s->freeze_output)
1.1.1.7 root 183: s->wait_for_unfreeze(s->opaque);
1.1 root 184: }
185:
186: ret = s->close(s->opaque);
187:
188: qemu_del_timer(s->timer);
189: qemu_free_timer(s->timer);
1.1.1.7 root 190: g_free(s->buffer);
191: g_free(s);
1.1 root 192:
193: return ret;
194: }
195:
1.1.1.7 root 196: /*
197: * The meaning of the return values is:
198: * 0: We can continue sending
199: * 1: Time to stop
200: * negative: There has been an error
201: */
1.1 root 202: static int buffered_rate_limit(void *opaque)
203: {
204: QEMUFileBuffered *s = opaque;
1.1.1.7 root 205: int ret;
1.1 root 206:
1.1.1.7 root 207: ret = qemu_file_get_error(s->file);
208: if (ret) {
209: return ret;
210: }
1.1 root 211: if (s->freeze_output)
212: return 1;
213:
214: if (s->bytes_xfer > s->xfer_limit)
215: return 1;
216:
217: return 0;
218: }
219:
1.1.1.5 root 220: static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate)
1.1.1.2 root 221: {
222: QEMUFileBuffered *s = opaque;
1.1.1.7 root 223: if (qemu_file_get_error(s->file)) {
1.1.1.2 root 224: goto out;
1.1.1.7 root 225: }
1.1.1.5 root 226: if (new_rate > SIZE_MAX) {
227: new_rate = SIZE_MAX;
228: }
229:
1.1.1.2 root 230: s->xfer_limit = new_rate / 10;
231:
232: out:
233: return s->xfer_limit;
234: }
235:
1.1.1.5 root 236: static int64_t buffered_get_rate_limit(void *opaque)
1.1.1.3 root 237: {
238: QEMUFileBuffered *s = opaque;
239:
240: return s->xfer_limit;
241: }
242:
1.1 root 243: static void buffered_rate_tick(void *opaque)
244: {
245: QEMUFileBuffered *s = opaque;
246:
1.1.1.7 root 247: if (qemu_file_get_error(s->file)) {
1.1.1.4 root 248: buffered_close(s);
1.1 root 249: return;
1.1.1.4 root 250: }
1.1 root 251:
1.1.1.6 root 252: qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
1.1 root 253:
254: if (s->freeze_output)
255: return;
256:
257: s->bytes_xfer = 0;
258:
259: buffered_flush(s);
260:
261: /* Add some checks around this */
262: s->put_ready(s->opaque);
263: }
264:
265: QEMUFile *qemu_fopen_ops_buffered(void *opaque,
266: size_t bytes_per_sec,
267: BufferedPutFunc *put_buffer,
268: BufferedPutReadyFunc *put_ready,
269: BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
270: BufferedCloseFunc *close)
271: {
272: QEMUFileBuffered *s;
273:
1.1.1.7 root 274: s = g_malloc0(sizeof(*s));
1.1 root 275:
276: s->opaque = opaque;
277: s->xfer_limit = bytes_per_sec / 10;
278: s->put_buffer = put_buffer;
279: s->put_ready = put_ready;
280: s->wait_for_unfreeze = wait_for_unfreeze;
281: s->close = close;
282:
283: s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
1.1.1.2 root 284: buffered_close, buffered_rate_limit,
1.1.1.3 root 285: buffered_set_rate_limit,
286: buffered_get_rate_limit);
1.1 root 287:
1.1.1.6 root 288: s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
1.1 root 289:
1.1.1.6 root 290: qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
1.1 root 291:
292: return s->file;
293: }
This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.