File:  [Qemu by Fabrice Bellard] / qemu / buffered_file.c
Revision 1.1.1.6 (vendor branch): download - view: text, annotated - select for diffs
Tue Apr 24 18:56:31 2018 UTC (3 years, 1 month ago) by root
Branches: qemu, MAIN
CVS tags: qemu1000, qemu0151, HEAD
qemu 0.15.1

    1: /*
    2:  * QEMU buffered QEMUFile
    3:  *
    4:  * Copyright IBM, Corp. 2008
    5:  *
    6:  * Authors:
    7:  *  Anthony Liguori   <aliguori@us.ibm.com>
    8:  *
    9:  * This work is licensed under the terms of the GNU GPL, version 2.  See
   10:  * the COPYING file in the top-level directory.
   11:  *
   12:  */
   13: 
   14: #include "qemu-common.h"
   15: #include "hw/hw.h"
   16: #include "qemu-timer.h"
   17: #include "qemu-char.h"
   18: #include "buffered_file.h"
   19: 
   20: //#define DEBUG_BUFFERED_FILE
   21: 
   22: typedef struct QEMUFileBuffered
   23: {
   24:     BufferedPutFunc *put_buffer;
   25:     BufferedPutReadyFunc *put_ready;
   26:     BufferedWaitForUnfreezeFunc *wait_for_unfreeze;
   27:     BufferedCloseFunc *close;
   28:     void *opaque;
   29:     QEMUFile *file;
   30:     int has_error;
   31:     int freeze_output;
   32:     size_t bytes_xfer;
   33:     size_t xfer_limit;
   34:     uint8_t *buffer;
   35:     size_t buffer_size;
   36:     size_t buffer_capacity;
   37:     QEMUTimer *timer;
   38: } QEMUFileBuffered;
   39: 
   40: #ifdef DEBUG_BUFFERED_FILE
   41: #define DPRINTF(fmt, ...) \
   42:     do { printf("buffered-file: " fmt, ## __VA_ARGS__); } while (0)
   43: #else
   44: #define DPRINTF(fmt, ...) \
   45:     do { } while (0)
   46: #endif
   47: 
   48: static void buffered_append(QEMUFileBuffered *s,
   49:                             const uint8_t *buf, size_t size)
   50: {
   51:     if (size > (s->buffer_capacity - s->buffer_size)) {
   52:         void *tmp;
   53: 
   54:         DPRINTF("increasing buffer capacity from %zu by %zu\n",
   55:                 s->buffer_capacity, size + 1024);
   56: 
   57:         s->buffer_capacity += size + 1024;
   58: 
   59:         tmp = qemu_realloc(s->buffer, s->buffer_capacity);
   60:         if (tmp == NULL) {
   61:             fprintf(stderr, "qemu file buffer expansion failed\n");
   62:             exit(1);
   63:         }
   64: 
   65:         s->buffer = tmp;
   66:     }
   67: 
   68:     memcpy(s->buffer + s->buffer_size, buf, size);
   69:     s->buffer_size += size;
   70: }
   71: 
   72: static void buffered_flush(QEMUFileBuffered *s)
   73: {
   74:     size_t offset = 0;
   75: 
   76:     if (s->has_error) {
   77:         DPRINTF("flush when error, bailing\n");
   78:         return;
   79:     }
   80: 
   81:     DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
   82: 
   83:     while (offset < s->buffer_size) {
   84:         ssize_t ret;
   85: 
   86:         ret = s->put_buffer(s->opaque, s->buffer + offset,
   87:                             s->buffer_size - offset);
   88:         if (ret == -EAGAIN) {
   89:             DPRINTF("backend not ready, freezing\n");
   90:             s->freeze_output = 1;
   91:             break;
   92:         }
   93: 
   94:         if (ret <= 0) {
   95:             DPRINTF("error flushing data, %zd\n", ret);
   96:             s->has_error = 1;
   97:             break;
   98:         } else {
   99:             DPRINTF("flushed %zd byte(s)\n", ret);
  100:             offset += ret;
  101:         }
  102:     }
  103: 
  104:     DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
  105:     memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
  106:     s->buffer_size -= offset;
  107: }
  108: 
  109: static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
  110: {
  111:     QEMUFileBuffered *s = opaque;
  112:     int offset = 0;
  113:     ssize_t ret;
  114: 
  115:     DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
  116: 
  117:     if (s->has_error) {
  118:         DPRINTF("flush when error, bailing\n");
  119:         return -EINVAL;
  120:     }
  121: 
  122:     DPRINTF("unfreezing output\n");
  123:     s->freeze_output = 0;
  124: 
  125:     buffered_flush(s);
  126: 
  127:     while (!s->freeze_output && offset < size) {
  128:         if (s->bytes_xfer > s->xfer_limit) {
  129:             DPRINTF("transfer limit exceeded when putting\n");
  130:             break;
  131:         }
  132: 
  133:         ret = s->put_buffer(s->opaque, buf + offset, size - offset);
  134:         if (ret == -EAGAIN) {
  135:             DPRINTF("backend not ready, freezing\n");
  136:             s->freeze_output = 1;
  137:             break;
  138:         }
  139: 
  140:         if (ret <= 0) {
  141:             DPRINTF("error putting\n");
  142:             s->has_error = 1;
  143:             offset = -EINVAL;
  144:             break;
  145:         }
  146: 
  147:         DPRINTF("put %zd byte(s)\n", ret);
  148:         offset += ret;
  149:         s->bytes_xfer += ret;
  150:     }
  151: 
  152:     if (offset >= 0) {
  153:         DPRINTF("buffering %d bytes\n", size - offset);
  154:         buffered_append(s, buf + offset, size - offset);
  155:         offset = size;
  156:     }
  157: 
  158:     if (pos == 0 && size == 0) {
  159:         DPRINTF("file is ready\n");
  160:         if (s->bytes_xfer <= s->xfer_limit) {
  161:             DPRINTF("notifying client\n");
  162:             s->put_ready(s->opaque);
  163:         }
  164:     }
  165: 
  166:     return offset;
  167: }
  168: 
  169: static int buffered_close(void *opaque)
  170: {
  171:     QEMUFileBuffered *s = opaque;
  172:     int ret;
  173: 
  174:     DPRINTF("closing\n");
  175: 
  176:     while (!s->has_error && s->buffer_size) {
  177:         buffered_flush(s);
  178:         if (s->freeze_output)
  179:             s->wait_for_unfreeze(s);
  180:     }
  181: 
  182:     ret = s->close(s->opaque);
  183: 
  184:     qemu_del_timer(s->timer);
  185:     qemu_free_timer(s->timer);
  186:     qemu_free(s->buffer);
  187:     qemu_free(s);
  188: 
  189:     return ret;
  190: }
  191: 
  192: static int buffered_rate_limit(void *opaque)
  193: {
  194:     QEMUFileBuffered *s = opaque;
  195: 
  196:     if (s->has_error)
  197:         return 0;
  198: 
  199:     if (s->freeze_output)
  200:         return 1;
  201: 
  202:     if (s->bytes_xfer > s->xfer_limit)
  203:         return 1;
  204: 
  205:     return 0;
  206: }
  207: 
  208: static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate)
  209: {
  210:     QEMUFileBuffered *s = opaque;
  211:     if (s->has_error)
  212:         goto out;
  213: 
  214:     if (new_rate > SIZE_MAX) {
  215:         new_rate = SIZE_MAX;
  216:     }
  217: 
  218:     s->xfer_limit = new_rate / 10;
  219:     
  220: out:
  221:     return s->xfer_limit;
  222: }
  223: 
  224: static int64_t buffered_get_rate_limit(void *opaque)
  225: {
  226:     QEMUFileBuffered *s = opaque;
  227:   
  228:     return s->xfer_limit;
  229: }
  230: 
  231: static void buffered_rate_tick(void *opaque)
  232: {
  233:     QEMUFileBuffered *s = opaque;
  234: 
  235:     if (s->has_error) {
  236:         buffered_close(s);
  237:         return;
  238:     }
  239: 
  240:     qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
  241: 
  242:     if (s->freeze_output)
  243:         return;
  244: 
  245:     s->bytes_xfer = 0;
  246: 
  247:     buffered_flush(s);
  248: 
  249:     /* Add some checks around this */
  250:     s->put_ready(s->opaque);
  251: }
  252: 
  253: QEMUFile *qemu_fopen_ops_buffered(void *opaque,
  254:                                   size_t bytes_per_sec,
  255:                                   BufferedPutFunc *put_buffer,
  256:                                   BufferedPutReadyFunc *put_ready,
  257:                                   BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
  258:                                   BufferedCloseFunc *close)
  259: {
  260:     QEMUFileBuffered *s;
  261: 
  262:     s = qemu_mallocz(sizeof(*s));
  263: 
  264:     s->opaque = opaque;
  265:     s->xfer_limit = bytes_per_sec / 10;
  266:     s->put_buffer = put_buffer;
  267:     s->put_ready = put_ready;
  268:     s->wait_for_unfreeze = wait_for_unfreeze;
  269:     s->close = close;
  270: 
  271:     s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
  272:                              buffered_close, buffered_rate_limit,
  273:                              buffered_set_rate_limit,
  274: 			     buffered_get_rate_limit);
  275: 
  276:     s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
  277: 
  278:     qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
  279: 
  280:     return s->file;
  281: }

unix.superglobalmegacorp.com