Annotation of qemu/block/stream.c, revision 1.1.1.1

1.1       root        1: /*
                      2:  * Image streaming
                      3:  *
                      4:  * Copyright IBM, Corp. 2011
                      5:  *
                      6:  * Authors:
                      7:  *  Stefan Hajnoczi   <[email protected]>
                      8:  *
                      9:  * This work is licensed under the terms of the GNU LGPL, version 2 or later.
                     10:  * See the COPYING.LIB file in the top-level directory.
                     11:  *
                     12:  */
                     13: 
                     14: #include "trace.h"
                     15: #include "block_int.h"
                     16: 
                     17: enum {
                     18:     /*
                     19:      * Size of data buffer for populating the image file.  This should be large
                     20:      * enough to process multiple clusters in a single call, so that populating
                     21:      * contiguous regions of the image is efficient.
                     22:      */
                     23:     STREAM_BUFFER_SIZE = 512 * 1024, /* in bytes */
                     24: };
                     25: 
                     26: #define SLICE_TIME 100000000ULL /* ns */
                     27: 
                     28: typedef struct {
                     29:     int64_t next_slice_time;
                     30:     uint64_t slice_quota;
                     31:     uint64_t dispatched;
                     32: } RateLimit;
                     33: 
                     34: static int64_t ratelimit_calculate_delay(RateLimit *limit, uint64_t n)
                     35: {
                     36:     int64_t now = qemu_get_clock_ns(rt_clock);
                     37: 
                     38:     if (limit->next_slice_time < now) {
                     39:         limit->next_slice_time = now + SLICE_TIME;
                     40:         limit->dispatched = 0;
                     41:     }
                     42:     if (limit->dispatched == 0 || limit->dispatched + n <= limit->slice_quota) {
                     43:         limit->dispatched += n;
                     44:         return 0;
                     45:     } else {
                     46:         limit->dispatched = n;
                     47:         return limit->next_slice_time - now;
                     48:     }
                     49: }
                     50: 
                     51: static void ratelimit_set_speed(RateLimit *limit, uint64_t speed)
                     52: {
                     53:     limit->slice_quota = speed / (1000000000ULL / SLICE_TIME);
                     54: }
                     55: 
                     56: typedef struct StreamBlockJob {
                     57:     BlockJob common;
                     58:     RateLimit limit;
                     59:     BlockDriverState *base;
                     60:     char backing_file_id[1024];
                     61: } StreamBlockJob;
                     62: 
                     63: static int coroutine_fn stream_populate(BlockDriverState *bs,
                     64:                                         int64_t sector_num, int nb_sectors,
                     65:                                         void *buf)
                     66: {
                     67:     struct iovec iov = {
                     68:         .iov_base = buf,
                     69:         .iov_len  = nb_sectors * BDRV_SECTOR_SIZE,
                     70:     };
                     71:     QEMUIOVector qiov;
                     72: 
                     73:     qemu_iovec_init_external(&qiov, &iov, 1);
                     74: 
                     75:     /* Copy-on-read the unallocated clusters */
                     76:     return bdrv_co_copy_on_readv(bs, sector_num, nb_sectors, &qiov);
                     77: }
                     78: 
                     79: static void close_unused_images(BlockDriverState *top, BlockDriverState *base,
                     80:                                 const char *base_id)
                     81: {
                     82:     BlockDriverState *intermediate;
                     83:     intermediate = top->backing_hd;
                     84: 
                     85:     while (intermediate) {
                     86:         BlockDriverState *unused;
                     87: 
                     88:         /* reached base */
                     89:         if (intermediate == base) {
                     90:             break;
                     91:         }
                     92: 
                     93:         unused = intermediate;
                     94:         intermediate = intermediate->backing_hd;
                     95:         unused->backing_hd = NULL;
                     96:         bdrv_delete(unused);
                     97:     }
                     98:     top->backing_hd = base;
                     99: }
                    100: 
                    101: /*
                    102:  * Given an image chain: [BASE] -> [INTER1] -> [INTER2] -> [TOP]
                    103:  *
                    104:  * Return true if the given sector is allocated in top.
                    105:  * Return false if the given sector is allocated in intermediate images.
                    106:  * Return true otherwise.
                    107:  *
                    108:  * 'pnum' is set to the number of sectors (including and immediately following
                    109:  *  the specified sector) that are known to be in the same
                    110:  *  allocated/unallocated state.
                    111:  *
                    112:  */
                    113: static int coroutine_fn is_allocated_base(BlockDriverState *top,
                    114:                                           BlockDriverState *base,
                    115:                                           int64_t sector_num,
                    116:                                           int nb_sectors, int *pnum)
                    117: {
                    118:     BlockDriverState *intermediate;
                    119:     int ret, n;
                    120: 
                    121:     ret = bdrv_co_is_allocated(top, sector_num, nb_sectors, &n);
                    122:     if (ret) {
                    123:         *pnum = n;
                    124:         return ret;
                    125:     }
                    126: 
                    127:     /*
                    128:      * Is the unallocated chunk [sector_num, n] also
                    129:      * unallocated between base and top?
                    130:      */
                    131:     intermediate = top->backing_hd;
                    132: 
                    133:     while (intermediate != base) {
                    134:         int pnum_inter;
                    135: 
                    136:         ret = bdrv_co_is_allocated(intermediate, sector_num, nb_sectors,
                    137:                                    &pnum_inter);
                    138:         if (ret < 0) {
                    139:             return ret;
                    140:         } else if (ret) {
                    141:             *pnum = pnum_inter;
                    142:             return 0;
                    143:         }
                    144: 
                    145:         /*
                    146:          * [sector_num, nb_sectors] is unallocated on top but intermediate
                    147:          * might have
                    148:          *
                    149:          * [sector_num+x, nr_sectors] allocated.
                    150:          */
                    151:         if (n > pnum_inter) {
                    152:             n = pnum_inter;
                    153:         }
                    154: 
                    155:         intermediate = intermediate->backing_hd;
                    156:     }
                    157: 
                    158:     *pnum = n;
                    159:     return 1;
                    160: }
                    161: 
                    162: static void coroutine_fn stream_run(void *opaque)
                    163: {
                    164:     StreamBlockJob *s = opaque;
                    165:     BlockDriverState *bs = s->common.bs;
                    166:     BlockDriverState *base = s->base;
                    167:     int64_t sector_num, end;
                    168:     int ret = 0;
                    169:     int n = 0;
                    170:     void *buf;
                    171: 
                    172:     s->common.len = bdrv_getlength(bs);
                    173:     if (s->common.len < 0) {
                    174:         block_job_complete(&s->common, s->common.len);
                    175:         return;
                    176:     }
                    177: 
                    178:     end = s->common.len >> BDRV_SECTOR_BITS;
                    179:     buf = qemu_blockalign(bs, STREAM_BUFFER_SIZE);
                    180: 
                    181:     /* Turn on copy-on-read for the whole block device so that guest read
                    182:      * requests help us make progress.  Only do this when copying the entire
                    183:      * backing chain since the copy-on-read operation does not take base into
                    184:      * account.
                    185:      */
                    186:     if (!base) {
                    187:         bdrv_enable_copy_on_read(bs);
                    188:     }
                    189: 
                    190:     for (sector_num = 0; sector_num < end; sector_num += n) {
                    191:         uint64_t delay_ns = 0;
                    192: 
                    193: wait:
                    194:         /* Note that even when no rate limit is applied we need to yield
                    195:          * with no pending I/O here so that qemu_aio_flush() returns.
                    196:          */
                    197:         block_job_sleep_ns(&s->common, rt_clock, delay_ns);
                    198:         if (block_job_is_cancelled(&s->common)) {
                    199:             break;
                    200:         }
                    201: 
                    202:         ret = is_allocated_base(bs, base, sector_num,
                    203:                                 STREAM_BUFFER_SIZE / BDRV_SECTOR_SIZE, &n);
                    204:         trace_stream_one_iteration(s, sector_num, n, ret);
                    205:         if (ret == 0) {
                    206:             if (s->common.speed) {
                    207:                 delay_ns = ratelimit_calculate_delay(&s->limit, n);
                    208:                 if (delay_ns > 0) {
                    209:                     goto wait;
                    210:                 }
                    211:             }
                    212:             ret = stream_populate(bs, sector_num, n, buf);
                    213:         }
                    214:         if (ret < 0) {
                    215:             break;
                    216:         }
                    217:         ret = 0;
                    218: 
                    219:         /* Publish progress */
                    220:         s->common.offset += n * BDRV_SECTOR_SIZE;
                    221:     }
                    222: 
                    223:     if (!base) {
                    224:         bdrv_disable_copy_on_read(bs);
                    225:     }
                    226: 
                    227:     if (!block_job_is_cancelled(&s->common) && sector_num == end && ret == 0) {
                    228:         const char *base_id = NULL, *base_fmt = NULL;
                    229:         if (base) {
                    230:             base_id = s->backing_file_id;
                    231:             if (base->drv) {
                    232:                 base_fmt = base->drv->format_name;
                    233:             }
                    234:         }
                    235:         ret = bdrv_change_backing_file(bs, base_id, base_fmt);
                    236:         close_unused_images(bs, base, base_id);
                    237:     }
                    238: 
                    239:     qemu_vfree(buf);
                    240:     block_job_complete(&s->common, ret);
                    241: }
                    242: 
                    243: static void stream_set_speed(BlockJob *job, int64_t speed, Error **errp)
                    244: {
                    245:     StreamBlockJob *s = container_of(job, StreamBlockJob, common);
                    246: 
                    247:     if (speed < 0) {
                    248:         error_set(errp, QERR_INVALID_PARAMETER, "speed");
                    249:         return;
                    250:     }
                    251:     ratelimit_set_speed(&s->limit, speed / BDRV_SECTOR_SIZE);
                    252: }
                    253: 
                    254: static BlockJobType stream_job_type = {
                    255:     .instance_size = sizeof(StreamBlockJob),
                    256:     .job_type      = "stream",
                    257:     .set_speed     = stream_set_speed,
                    258: };
                    259: 
                    260: void stream_start(BlockDriverState *bs, BlockDriverState *base,
                    261:                   const char *base_id, int64_t speed,
                    262:                   BlockDriverCompletionFunc *cb,
                    263:                   void *opaque, Error **errp)
                    264: {
                    265:     StreamBlockJob *s;
                    266: 
                    267:     s = block_job_create(&stream_job_type, bs, speed, cb, opaque, errp);
                    268:     if (!s) {
                    269:         return;
                    270:     }
                    271: 
                    272:     s->base = base;
                    273:     if (base_id) {
                    274:         pstrcpy(s->backing_file_id, sizeof(s->backing_file_id), base_id);
                    275:     }
                    276: 
                    277:     s->common.co = qemu_coroutine_create(stream_run);
                    278:     trace_stream_start(bs, base, s, s->common.co, opaque);
                    279:     qemu_coroutine_enter(s->common.co, s);
                    280: }

unix.superglobalmegacorp.com

This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.