|
|
1.1 root 1: /*
2: * QEMU buffered QEMUFile
3: *
4: * Copyright IBM, Corp. 2008
5: *
6: * Authors:
7: * Anthony Liguori <[email protected]>
8: *
9: * This work is licensed under the terms of the GNU GPL, version 2. See
10: * the COPYING file in the top-level directory.
11: *
12: */
13:
14: #include "qemu-common.h"
15: #include "hw/hw.h"
16: #include "qemu-timer.h"
17: #include "qemu-char.h"
18: #include "buffered_file.h"
19:
20: //#define DEBUG_BUFFERED_FILE
21:
22: typedef struct QEMUFileBuffered
23: {
24: BufferedPutFunc *put_buffer;
25: BufferedPutReadyFunc *put_ready;
26: BufferedWaitForUnfreezeFunc *wait_for_unfreeze;
27: BufferedCloseFunc *close;
28: void *opaque;
29: QEMUFile *file;
30: int has_error;
31: int freeze_output;
32: size_t bytes_xfer;
33: size_t xfer_limit;
34: uint8_t *buffer;
35: size_t buffer_size;
36: size_t buffer_capacity;
37: QEMUTimer *timer;
38: } QEMUFileBuffered;
39:
40: #ifdef DEBUG_BUFFERED_FILE
1.1.1.4 root 41: #define DPRINTF(fmt, ...) \
1.1 root 42: do { printf("buffered-file: " fmt, ## __VA_ARGS__); } while (0)
43: #else
1.1.1.4 root 44: #define DPRINTF(fmt, ...) \
1.1 root 45: do { } while (0)
46: #endif
47:
48: static void buffered_append(QEMUFileBuffered *s,
49: const uint8_t *buf, size_t size)
50: {
51: if (size > (s->buffer_capacity - s->buffer_size)) {
52: void *tmp;
53:
1.1.1.4 root 54: DPRINTF("increasing buffer capacity from %zu by %zu\n",
1.1 root 55: s->buffer_capacity, size + 1024);
56:
57: s->buffer_capacity += size + 1024;
58:
59: tmp = qemu_realloc(s->buffer, s->buffer_capacity);
60: if (tmp == NULL) {
61: fprintf(stderr, "qemu file buffer expansion failed\n");
62: exit(1);
63: }
64:
65: s->buffer = tmp;
66: }
67:
68: memcpy(s->buffer + s->buffer_size, buf, size);
69: s->buffer_size += size;
70: }
71:
72: static void buffered_flush(QEMUFileBuffered *s)
73: {
74: size_t offset = 0;
75:
76: if (s->has_error) {
1.1.1.4 root 77: DPRINTF("flush when error, bailing\n");
1.1 root 78: return;
79: }
80:
1.1.1.4 root 81: DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
1.1 root 82:
83: while (offset < s->buffer_size) {
84: ssize_t ret;
85:
86: ret = s->put_buffer(s->opaque, s->buffer + offset,
87: s->buffer_size - offset);
88: if (ret == -EAGAIN) {
1.1.1.4 root 89: DPRINTF("backend not ready, freezing\n");
1.1 root 90: s->freeze_output = 1;
91: break;
92: }
93:
94: if (ret <= 0) {
1.1.1.4 root 95: DPRINTF("error flushing data, %zd\n", ret);
1.1 root 96: s->has_error = 1;
97: break;
98: } else {
1.1.1.4 root 99: DPRINTF("flushed %zd byte(s)\n", ret);
1.1 root 100: offset += ret;
101: }
102: }
103:
1.1.1.4 root 104: DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
1.1 root 105: memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
106: s->buffer_size -= offset;
107: }
108:
109: static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
110: {
111: QEMUFileBuffered *s = opaque;
112: int offset = 0;
113: ssize_t ret;
114:
1.1.1.4 root 115: DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
1.1 root 116:
117: if (s->has_error) {
1.1.1.4 root 118: DPRINTF("flush when error, bailing\n");
1.1 root 119: return -EINVAL;
120: }
121:
1.1.1.4 root 122: DPRINTF("unfreezing output\n");
1.1 root 123: s->freeze_output = 0;
124:
125: buffered_flush(s);
126:
127: while (!s->freeze_output && offset < size) {
128: if (s->bytes_xfer > s->xfer_limit) {
1.1.1.4 root 129: DPRINTF("transfer limit exceeded when putting\n");
1.1 root 130: break;
131: }
132:
133: ret = s->put_buffer(s->opaque, buf + offset, size - offset);
134: if (ret == -EAGAIN) {
1.1.1.4 root 135: DPRINTF("backend not ready, freezing\n");
1.1 root 136: s->freeze_output = 1;
137: break;
138: }
139:
140: if (ret <= 0) {
1.1.1.4 root 141: DPRINTF("error putting\n");
1.1 root 142: s->has_error = 1;
143: offset = -EINVAL;
144: break;
145: }
146:
1.1.1.4 root 147: DPRINTF("put %zd byte(s)\n", ret);
1.1 root 148: offset += ret;
149: s->bytes_xfer += ret;
150: }
151:
152: if (offset >= 0) {
1.1.1.4 root 153: DPRINTF("buffering %d bytes\n", size - offset);
1.1 root 154: buffered_append(s, buf + offset, size - offset);
155: offset = size;
156: }
157:
1.1.1.4 root 158: if (pos == 0 && size == 0) {
159: DPRINTF("file is ready\n");
160: if (s->bytes_xfer <= s->xfer_limit) {
161: DPRINTF("notifying client\n");
162: s->put_ready(s->opaque);
163: }
164: }
165:
1.1 root 166: return offset;
167: }
168:
169: static int buffered_close(void *opaque)
170: {
171: QEMUFileBuffered *s = opaque;
172: int ret;
173:
1.1.1.4 root 174: DPRINTF("closing\n");
1.1 root 175:
176: while (!s->has_error && s->buffer_size) {
177: buffered_flush(s);
178: if (s->freeze_output)
179: s->wait_for_unfreeze(s);
180: }
181:
182: ret = s->close(s->opaque);
183:
184: qemu_del_timer(s->timer);
185: qemu_free_timer(s->timer);
186: qemu_free(s->buffer);
187: qemu_free(s);
188:
189: return ret;
190: }
191:
192: static int buffered_rate_limit(void *opaque)
193: {
194: QEMUFileBuffered *s = opaque;
195:
196: if (s->has_error)
197: return 0;
198:
199: if (s->freeze_output)
200: return 1;
201:
202: if (s->bytes_xfer > s->xfer_limit)
203: return 1;
204:
205: return 0;
206: }
207:
1.1.1.5 root 208: static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate)
1.1.1.2 root 209: {
210: QEMUFileBuffered *s = opaque;
211: if (s->has_error)
212: goto out;
213:
1.1.1.5 root 214: if (new_rate > SIZE_MAX) {
215: new_rate = SIZE_MAX;
216: }
217:
1.1.1.2 root 218: s->xfer_limit = new_rate / 10;
219:
220: out:
221: return s->xfer_limit;
222: }
223:
1.1.1.5 root 224: static int64_t buffered_get_rate_limit(void *opaque)
1.1.1.3 root 225: {
226: QEMUFileBuffered *s = opaque;
227:
228: return s->xfer_limit;
229: }
230:
1.1 root 231: static void buffered_rate_tick(void *opaque)
232: {
233: QEMUFileBuffered *s = opaque;
234:
1.1.1.4 root 235: if (s->has_error) {
236: buffered_close(s);
1.1 root 237: return;
1.1.1.4 root 238: }
1.1 root 239:
1.1.1.6 ! root 240: qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
1.1 root 241:
242: if (s->freeze_output)
243: return;
244:
245: s->bytes_xfer = 0;
246:
247: buffered_flush(s);
248:
249: /* Add some checks around this */
250: s->put_ready(s->opaque);
251: }
252:
253: QEMUFile *qemu_fopen_ops_buffered(void *opaque,
254: size_t bytes_per_sec,
255: BufferedPutFunc *put_buffer,
256: BufferedPutReadyFunc *put_ready,
257: BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
258: BufferedCloseFunc *close)
259: {
260: QEMUFileBuffered *s;
261:
262: s = qemu_mallocz(sizeof(*s));
263:
264: s->opaque = opaque;
265: s->xfer_limit = bytes_per_sec / 10;
266: s->put_buffer = put_buffer;
267: s->put_ready = put_ready;
268: s->wait_for_unfreeze = wait_for_unfreeze;
269: s->close = close;
270:
271: s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
1.1.1.2 root 272: buffered_close, buffered_rate_limit,
1.1.1.3 root 273: buffered_set_rate_limit,
274: buffered_get_rate_limit);
1.1 root 275:
1.1.1.6 ! root 276: s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
1.1 root 277:
1.1.1.6 ! root 278: qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
1.1 root 279:
280: return s->file;
281: }
This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.