Annotation of qemu/block/rbd.c, revision 1.1.1.1

1.1       root        1: /*
                      2:  * QEMU Block driver for RADOS (Ceph)
                      3:  *
                      4:  * Copyright (C) 2010 Christian Brunner <[email protected]>
                      5:  *
                      6:  * This work is licensed under the terms of the GNU GPL, version 2.  See
                      7:  * the COPYING file in the top-level directory.
                      8:  *
                      9:  */
                     10: 
                     11: #include "qemu-common.h"
                     12: #include "qemu-error.h"
                     13: 
                     14: #include "rbd_types.h"
                     15: #include "block_int.h"
                     16: 
                     17: #include <rados/librados.h>
                     18: 
                     19: 
                     20: 
                     21: /*
                     22:  * When specifying the image filename use:
                     23:  *
                     24:  * rbd:poolname/devicename
                     25:  *
                     26:  * poolname must be the name of an existing rados pool
                     27:  *
                     28:  * devicename is the basename for all objects used to
                     29:  * emulate the raw device.
                     30:  *
                     31:  * Metadata information (image size, ...) is stored in an
                     32:  * object with the name "devicename.rbd".
                     33:  *
                     34:  * The raw device is split into 4MB sized objects by default.
                     35:  * The sequencenumber is encoded in a 12 byte long hex-string,
                     36:  * and is attached to the devicename, separated by a dot.
                     37:  * e.g. "devicename.1234567890ab"
                     38:  *
                     39:  */
                     40: 
                     41: #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
                     42: 
                     43: typedef struct RBDAIOCB {
                     44:     BlockDriverAIOCB common;
                     45:     QEMUBH *bh;
                     46:     int ret;
                     47:     QEMUIOVector *qiov;
                     48:     char *bounce;
                     49:     int write;
                     50:     int64_t sector_num;
                     51:     int aiocnt;
                     52:     int error;
                     53:     struct BDRVRBDState *s;
                     54:     int cancelled;
                     55: } RBDAIOCB;
                     56: 
                     57: typedef struct RADOSCB {
                     58:     int rcbid;
                     59:     RBDAIOCB *acb;
                     60:     struct BDRVRBDState *s;
                     61:     int done;
                     62:     int64_t segsize;
                     63:     char *buf;
                     64:     int ret;
                     65: } RADOSCB;
                     66: 
                     67: #define RBD_FD_READ 0
                     68: #define RBD_FD_WRITE 1
                     69: 
                     70: typedef struct BDRVRBDState {
                     71:     int fds[2];
                     72:     rados_pool_t pool;
                     73:     rados_pool_t header_pool;
                     74:     char name[RBD_MAX_OBJ_NAME_SIZE];
                     75:     char block_name[RBD_MAX_BLOCK_NAME_SIZE];
                     76:     uint64_t size;
                     77:     uint64_t objsize;
                     78:     int qemu_aio_count;
                     79:     int event_reader_pos;
                     80:     RADOSCB *event_rcb;
                     81: } BDRVRBDState;
                     82: 
                     83: typedef struct rbd_obj_header_ondisk RbdHeader1;
                     84: 
                     85: static void rbd_aio_bh_cb(void *opaque);
                     86: 
                     87: static int rbd_next_tok(char *dst, int dst_len,
                     88:                         char *src, char delim,
                     89:                         const char *name,
                     90:                         char **p)
                     91: {
                     92:     int l;
                     93:     char *end;
                     94: 
                     95:     *p = NULL;
                     96: 
                     97:     if (delim != '\0') {
                     98:         end = strchr(src, delim);
                     99:         if (end) {
                    100:             *p = end + 1;
                    101:             *end = '\0';
                    102:         }
                    103:     }
                    104:     l = strlen(src);
                    105:     if (l >= dst_len) {
                    106:         error_report("%s too long", name);
                    107:         return -EINVAL;
                    108:     } else if (l == 0) {
                    109:         error_report("%s too short", name);
                    110:         return -EINVAL;
                    111:     }
                    112: 
                    113:     pstrcpy(dst, dst_len, src);
                    114: 
                    115:     return 0;
                    116: }
                    117: 
                    118: static int rbd_parsename(const char *filename,
                    119:                          char *pool, int pool_len,
                    120:                          char *snap, int snap_len,
                    121:                          char *name, int name_len)
                    122: {
                    123:     const char *start;
                    124:     char *p, *buf;
                    125:     int ret;
                    126: 
                    127:     if (!strstart(filename, "rbd:", &start)) {
                    128:         return -EINVAL;
                    129:     }
                    130: 
                    131:     buf = qemu_strdup(start);
                    132:     p = buf;
                    133: 
                    134:     ret = rbd_next_tok(pool, pool_len, p, '/', "pool name", &p);
                    135:     if (ret < 0 || !p) {
                    136:         ret = -EINVAL;
                    137:         goto done;
                    138:     }
                    139:     ret = rbd_next_tok(name, name_len, p, '@', "object name", &p);
                    140:     if (ret < 0) {
                    141:         goto done;
                    142:     }
                    143:     if (!p) {
                    144:         *snap = '\0';
                    145:         goto done;
                    146:     }
                    147: 
                    148:     ret = rbd_next_tok(snap, snap_len, p, '\0', "snap name", &p);
                    149: 
                    150: done:
                    151:     qemu_free(buf);
                    152:     return ret;
                    153: }
                    154: 
                    155: static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc)
                    156: {
                    157:     uint32_t len = strlen(name);
                    158:     uint32_t len_le = cpu_to_le32(len);
                    159:     /* total_len = encoding op + name + empty buffer */
                    160:     uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t);
                    161:     uint8_t *desc = NULL;
                    162: 
                    163:     desc = qemu_malloc(total_len);
                    164: 
                    165:     *tmap_desc = (char *)desc;
                    166: 
                    167:     *desc = op;
                    168:     desc++;
                    169:     memcpy(desc, &len_le, sizeof(len_le));
                    170:     desc += sizeof(len_le);
                    171:     memcpy(desc, name, len);
                    172:     desc += len;
                    173:     len = 0; /* no need for endian conversion for 0 */
                    174:     memcpy(desc, &len, sizeof(len));
                    175:     desc += sizeof(len);
                    176: 
                    177:     return (char *)desc - *tmap_desc;
                    178: }
                    179: 
                    180: static void free_tmap_op(char *tmap_desc)
                    181: {
                    182:     qemu_free(tmap_desc);
                    183: }
                    184: 
                    185: static int rbd_register_image(rados_pool_t pool, const char *name)
                    186: {
                    187:     char *tmap_desc;
                    188:     const char *dir = RBD_DIRECTORY;
                    189:     int ret;
                    190: 
                    191:     ret = create_tmap_op(CEPH_OSD_TMAP_SET, name, &tmap_desc);
                    192:     if (ret < 0) {
                    193:         return ret;
                    194:     }
                    195: 
                    196:     ret = rados_tmap_update(pool, dir, tmap_desc, ret);
                    197:     free_tmap_op(tmap_desc);
                    198: 
                    199:     return ret;
                    200: }
                    201: 
                    202: static int touch_rbd_info(rados_pool_t pool, const char *info_oid)
                    203: {
                    204:     int r = rados_write(pool, info_oid, 0, NULL, 0);
                    205:     if (r < 0) {
                    206:         return r;
                    207:     }
                    208:     return 0;
                    209: }
                    210: 
                    211: static int rbd_assign_bid(rados_pool_t pool, uint64_t *id)
                    212: {
                    213:     uint64_t out[1];
                    214:     const char *info_oid = RBD_INFO;
                    215: 
                    216:     *id = 0;
                    217: 
                    218:     int r = touch_rbd_info(pool, info_oid);
                    219:     if (r < 0) {
                    220:         return r;
                    221:     }
                    222: 
                    223:     r = rados_exec(pool, info_oid, "rbd", "assign_bid", NULL,
                    224:                    0, (char *)out, sizeof(out));
                    225:     if (r < 0) {
                    226:         return r;
                    227:     }
                    228: 
                    229:     le64_to_cpus(out);
                    230:     *id = out[0];
                    231: 
                    232:     return 0;
                    233: }
                    234: 
                    235: static int rbd_create(const char *filename, QEMUOptionParameter *options)
                    236: {
                    237:     int64_t bytes = 0;
                    238:     int64_t objsize;
                    239:     uint64_t size;
                    240:     time_t mtime;
                    241:     uint8_t obj_order = RBD_DEFAULT_OBJ_ORDER;
                    242:     char pool[RBD_MAX_SEG_NAME_SIZE];
                    243:     char n[RBD_MAX_SEG_NAME_SIZE];
                    244:     char name[RBD_MAX_OBJ_NAME_SIZE];
                    245:     char snap_buf[RBD_MAX_SEG_NAME_SIZE];
                    246:     char *snap = NULL;
                    247:     RbdHeader1 header;
                    248:     rados_pool_t p;
                    249:     uint64_t bid;
                    250:     uint32_t hi, lo;
                    251:     int ret;
                    252: 
                    253:     if (rbd_parsename(filename,
                    254:                       pool, sizeof(pool),
                    255:                       snap_buf, sizeof(snap_buf),
                    256:                       name, sizeof(name)) < 0) {
                    257:         return -EINVAL;
                    258:     }
                    259:     if (snap_buf[0] != '\0') {
                    260:         snap = snap_buf;
                    261:     }
                    262: 
                    263:     snprintf(n, sizeof(n), "%s%s", name, RBD_SUFFIX);
                    264: 
                    265:     /* Read out options */
                    266:     while (options && options->name) {
                    267:         if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
                    268:             bytes = options->value.n;
                    269:         } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
                    270:             if (options->value.n) {
                    271:                 objsize = options->value.n;
                    272:                 if ((objsize - 1) & objsize) {    /* not a power of 2? */
                    273:                     error_report("obj size needs to be power of 2");
                    274:                     return -EINVAL;
                    275:                 }
                    276:                 if (objsize < 4096) {
                    277:                     error_report("obj size too small");
                    278:                     return -EINVAL;
                    279:                 }
                    280:                obj_order = ffs(objsize) - 1;
                    281:             }
                    282:         }
                    283:         options++;
                    284:     }
                    285: 
                    286:     memset(&header, 0, sizeof(header));
                    287:     pstrcpy(header.text, sizeof(header.text), RBD_HEADER_TEXT);
                    288:     pstrcpy(header.signature, sizeof(header.signature), RBD_HEADER_SIGNATURE);
                    289:     pstrcpy(header.version, sizeof(header.version), RBD_HEADER_VERSION);
                    290:     header.image_size = cpu_to_le64(bytes);
                    291:     header.options.order = obj_order;
                    292:     header.options.crypt_type = RBD_CRYPT_NONE;
                    293:     header.options.comp_type = RBD_COMP_NONE;
                    294:     header.snap_seq = 0;
                    295:     header.snap_count = 0;
                    296: 
                    297:     if (rados_initialize(0, NULL) < 0) {
                    298:         error_report("error initializing");
                    299:         return -EIO;
                    300:     }
                    301: 
                    302:     if (rados_open_pool(pool, &p)) {
                    303:         error_report("error opening pool %s", pool);
                    304:         rados_deinitialize();
                    305:         return -EIO;
                    306:     }
                    307: 
                    308:     /* check for existing rbd header file */
                    309:     ret = rados_stat(p, n, &size, &mtime);
                    310:     if (ret == 0) {
                    311:         ret=-EEXIST;
                    312:         goto done;
                    313:     }
                    314: 
                    315:     ret = rbd_assign_bid(p, &bid);
                    316:     if (ret < 0) {
                    317:         error_report("failed assigning block id");
                    318:         rados_deinitialize();
                    319:         return -EIO;
                    320:     }
                    321:     hi = bid >> 32;
                    322:     lo = bid & 0xFFFFFFFF;
                    323:     snprintf(header.block_name, sizeof(header.block_name), "rb.%x.%x", hi, lo);
                    324: 
                    325:     /* create header file */
                    326:     ret = rados_write(p, n, 0, (const char *)&header, sizeof(header));
                    327:     if (ret < 0) {
                    328:         goto done;
                    329:     }
                    330: 
                    331:     ret = rbd_register_image(p, name);
                    332: done:
                    333:     rados_close_pool(p);
                    334:     rados_deinitialize();
                    335: 
                    336:     return ret;
                    337: }
                    338: 
                    339: /*
                    340:  * This aio completion is being called from rbd_aio_event_reader() and
                    341:  * runs in qemu context. It schedules a bh, but just in case the aio
                    342:  * was not cancelled before.
                    343:  */
                    344: static void rbd_complete_aio(RADOSCB *rcb)
                    345: {
                    346:     RBDAIOCB *acb = rcb->acb;
                    347:     int64_t r;
                    348: 
                    349:     acb->aiocnt--;
                    350: 
                    351:     if (acb->cancelled) {
                    352:         if (!acb->aiocnt) {
                    353:             qemu_vfree(acb->bounce);
                    354:             qemu_aio_release(acb);
                    355:         }
                    356:         goto done;
                    357:     }
                    358: 
                    359:     r = rcb->ret;
                    360: 
                    361:     if (acb->write) {
                    362:         if (r < 0) {
                    363:             acb->ret = r;
                    364:             acb->error = 1;
                    365:         } else if (!acb->error) {
                    366:             acb->ret += rcb->segsize;
                    367:         }
                    368:     } else {
                    369:         if (r == -ENOENT) {
                    370:             memset(rcb->buf, 0, rcb->segsize);
                    371:             if (!acb->error) {
                    372:                 acb->ret += rcb->segsize;
                    373:             }
                    374:         } else if (r < 0) {
                    375:            memset(rcb->buf, 0, rcb->segsize);
                    376:             acb->ret = r;
                    377:             acb->error = 1;
                    378:         } else if (r < rcb->segsize) {
                    379:             memset(rcb->buf + r, 0, rcb->segsize - r);
                    380:             if (!acb->error) {
                    381:                 acb->ret += rcb->segsize;
                    382:             }
                    383:         } else if (!acb->error) {
                    384:             acb->ret += r;
                    385:         }
                    386:     }
                    387:     /* Note that acb->bh can be NULL in case where the aio was cancelled */
                    388:     if (!acb->aiocnt) {
                    389:         acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
                    390:         qemu_bh_schedule(acb->bh);
                    391:     }
                    392: done:
                    393:     qemu_free(rcb);
                    394: }
                    395: 
                    396: /*
                    397:  * aio fd read handler. It runs in the qemu context and calls the
                    398:  * completion handling of completed rados aio operations.
                    399:  */
                    400: static void rbd_aio_event_reader(void *opaque)
                    401: {
                    402:     BDRVRBDState *s = opaque;
                    403: 
                    404:     ssize_t ret;
                    405: 
                    406:     do {
                    407:         char *p = (char *)&s->event_rcb;
                    408: 
                    409:         /* now read the rcb pointer that was sent from a non qemu thread */
                    410:         if ((ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos,
                    411:                         sizeof(s->event_rcb) - s->event_reader_pos)) > 0) {
                    412:             if (ret > 0) {
                    413:                 s->event_reader_pos += ret;
                    414:                 if (s->event_reader_pos == sizeof(s->event_rcb)) {
                    415:                     s->event_reader_pos = 0;
                    416:                     rbd_complete_aio(s->event_rcb);
                    417:                     s->qemu_aio_count --;
                    418:                 }
                    419:             }
                    420:         }
                    421:     } while (ret < 0 && errno == EINTR);
                    422: }
                    423: 
                    424: static int rbd_aio_flush_cb(void *opaque)
                    425: {
                    426:     BDRVRBDState *s = opaque;
                    427: 
                    428:     return (s->qemu_aio_count > 0);
                    429: }
                    430: 
                    431: 
                    432: static int rbd_set_snapc(rados_pool_t pool, const char *snap, RbdHeader1 *header)
                    433: {
                    434:     uint32_t snap_count = le32_to_cpu(header->snap_count);
                    435:     rados_snap_t *snaps = NULL;
                    436:     rados_snap_t seq;
                    437:     uint32_t i;
                    438:     uint64_t snap_names_len = le64_to_cpu(header->snap_names_len);
                    439:     int r;
                    440:     rados_snap_t snapid = 0;
                    441: 
                    442:     if (snap_count) {
                    443:         const char *header_snap = (const char *)&header->snaps[snap_count];
                    444:         const char *end = header_snap + snap_names_len;
                    445:         snaps = qemu_malloc(sizeof(rados_snap_t) * header->snap_count);
                    446: 
                    447:         for (i=0; i < snap_count; i++) {
                    448:             snaps[i] = le64_to_cpu(header->snaps[i].id);
                    449: 
                    450:             if (snap && strcmp(snap, header_snap) == 0) {
                    451:                 snapid = snaps[i];
                    452:             }
                    453: 
                    454:             header_snap += strlen(header_snap) + 1;
                    455:             if (header_snap > end) {
                    456:                 error_report("bad header, snapshot list broken");
                    457:             }
                    458:         }
                    459:     }
                    460: 
                    461:     if (snap && !snapid) {
                    462:         error_report("snapshot not found");
                    463:         qemu_free(snaps);
                    464:         return -ENOENT;
                    465:     }
                    466:     seq = le32_to_cpu(header->snap_seq);
                    467: 
                    468:     r = rados_set_snap_context(pool, seq, snaps, snap_count);
                    469: 
                    470:     rados_set_snap(pool, snapid);
                    471: 
                    472:     qemu_free(snaps);
                    473: 
                    474:     return r;
                    475: }
                    476: 
                    477: #define BUF_READ_START_LEN    4096
                    478: 
                    479: static int rbd_read_header(BDRVRBDState *s, char **hbuf)
                    480: {
                    481:     char *buf = NULL;
                    482:     char n[RBD_MAX_SEG_NAME_SIZE];
                    483:     uint64_t len = BUF_READ_START_LEN;
                    484:     int r;
                    485: 
                    486:     snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX);
                    487: 
                    488:     buf = qemu_malloc(len);
                    489: 
                    490:     r = rados_read(s->header_pool, n, 0, buf, len);
                    491:     if (r < 0) {
                    492:         goto failed;
                    493:     }
                    494: 
                    495:     if (r < len) {
                    496:         goto done;
                    497:     }
                    498: 
                    499:     qemu_free(buf);
                    500:     buf = qemu_malloc(len);
                    501: 
                    502:     r = rados_stat(s->header_pool, n, &len, NULL);
                    503:     if (r < 0) {
                    504:         goto failed;
                    505:     }
                    506: 
                    507:     r = rados_read(s->header_pool, n, 0, buf, len);
                    508:     if (r < 0) {
                    509:         goto failed;
                    510:     }
                    511: 
                    512: done:
                    513:     *hbuf = buf;
                    514:     return 0;
                    515: 
                    516: failed:
                    517:     qemu_free(buf);
                    518:     return r;
                    519: }
                    520: 
                    521: static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
                    522: {
                    523:     BDRVRBDState *s = bs->opaque;
                    524:     RbdHeader1 *header;
                    525:     char pool[RBD_MAX_SEG_NAME_SIZE];
                    526:     char snap_buf[RBD_MAX_SEG_NAME_SIZE];
                    527:     char *snap = NULL;
                    528:     char *hbuf = NULL;
                    529:     int r;
                    530: 
                    531:     if (rbd_parsename(filename, pool, sizeof(pool),
                    532:                       snap_buf, sizeof(snap_buf),
                    533:                       s->name, sizeof(s->name)) < 0) {
                    534:         return -EINVAL;
                    535:     }
                    536:     if (snap_buf[0] != '\0') {
                    537:         snap = snap_buf;
                    538:     }
                    539: 
                    540:     if ((r = rados_initialize(0, NULL)) < 0) {
                    541:         error_report("error initializing");
                    542:         return r;
                    543:     }
                    544: 
                    545:     if ((r = rados_open_pool(pool, &s->pool))) {
                    546:         error_report("error opening pool %s", pool);
                    547:         rados_deinitialize();
                    548:         return r;
                    549:     }
                    550: 
                    551:     if ((r = rados_open_pool(pool, &s->header_pool))) {
                    552:         error_report("error opening pool %s", pool);
                    553:         rados_deinitialize();
                    554:         return r;
                    555:     }
                    556: 
                    557:     if ((r = rbd_read_header(s, &hbuf)) < 0) {
                    558:         error_report("error reading header from %s", s->name);
                    559:         goto failed;
                    560:     }
                    561: 
                    562:     if (memcmp(hbuf + 64, RBD_HEADER_SIGNATURE, 4)) {
                    563:         error_report("Invalid header signature");
                    564:         r = -EMEDIUMTYPE;
                    565:         goto failed;
                    566:     }
                    567: 
                    568:     if (memcmp(hbuf + 68, RBD_HEADER_VERSION, 8)) {
                    569:         error_report("Unknown image version");
                    570:         r = -EMEDIUMTYPE;
                    571:         goto failed;
                    572:     }
                    573: 
                    574:     header = (RbdHeader1 *) hbuf;
                    575:     s->size = le64_to_cpu(header->image_size);
                    576:     s->objsize = 1ULL << header->options.order;
                    577:     memcpy(s->block_name, header->block_name, sizeof(header->block_name));
                    578: 
                    579:     r = rbd_set_snapc(s->pool, snap, header);
                    580:     if (r < 0) {
                    581:         error_report("failed setting snap context: %s", strerror(-r));
                    582:         goto failed;
                    583:     }
                    584: 
                    585:     bs->read_only = (snap != NULL);
                    586: 
                    587:     s->event_reader_pos = 0;
                    588:     r = qemu_pipe(s->fds);
                    589:     if (r < 0) {
                    590:         error_report("error opening eventfd");
                    591:         goto failed;
                    592:     }
                    593:     fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
                    594:     fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
                    595:     qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], rbd_aio_event_reader, NULL,
                    596:         rbd_aio_flush_cb, NULL, s);
                    597: 
                    598:     qemu_free(hbuf);
                    599: 
                    600:     return 0;
                    601: 
                    602: failed:
                    603:     qemu_free(hbuf);
                    604: 
                    605:     rados_close_pool(s->header_pool);
                    606:     rados_close_pool(s->pool);
                    607:     rados_deinitialize();
                    608:     return r;
                    609: }
                    610: 
                    611: static void rbd_close(BlockDriverState *bs)
                    612: {
                    613:     BDRVRBDState *s = bs->opaque;
                    614: 
                    615:     close(s->fds[0]);
                    616:     close(s->fds[1]);
                    617:     qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL , NULL, NULL, NULL,
                    618:         NULL);
                    619: 
                    620:     rados_close_pool(s->header_pool);
                    621:     rados_close_pool(s->pool);
                    622:     rados_deinitialize();
                    623: }
                    624: 
                    625: /*
                    626:  * Cancel aio. Since we don't reference acb in a non qemu threads,
                    627:  * it is safe to access it here.
                    628:  */
                    629: static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
                    630: {
                    631:     RBDAIOCB *acb = (RBDAIOCB *) blockacb;
                    632:     acb->cancelled = 1;
                    633: }
                    634: 
                    635: static AIOPool rbd_aio_pool = {
                    636:     .aiocb_size = sizeof(RBDAIOCB),
                    637:     .cancel = rbd_aio_cancel,
                    638: };
                    639: 
                    640: /*
                    641:  * This is the callback function for rados_aio_read and _write
                    642:  *
                    643:  * Note: this function is being called from a non qemu thread so
                    644:  * we need to be careful about what we do here. Generally we only
                    645:  * write to the block notification pipe, and do the rest of the
                    646:  * io completion handling from rbd_aio_event_reader() which
                    647:  * runs in a qemu context.
                    648:  */
                    649: static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
                    650: {
                    651:     int ret;
                    652:     rcb->ret = rados_aio_get_return_value(c);
                    653:     rados_aio_release(c);
                    654:     while (1) {
                    655:         fd_set wfd;
                    656:         int fd = rcb->s->fds[RBD_FD_WRITE];
                    657: 
                    658:         /* send the rcb pointer to the qemu thread that is responsible
                    659:            for the aio completion. Must do it in a qemu thread context */
                    660:         ret = write(fd, (void *)&rcb, sizeof(rcb));
                    661:         if (ret >= 0) {
                    662:             break;
                    663:         }
                    664:         if (errno == EINTR) {
                    665:             continue;
                    666:        }
                    667:         if (errno != EAGAIN) {
                    668:             break;
                    669:        }
                    670: 
                    671:         FD_ZERO(&wfd);
                    672:         FD_SET(fd, &wfd);
                    673:         do {
                    674:             ret = select(fd + 1, NULL, &wfd, NULL, NULL);
                    675:         } while (ret < 0 && errno == EINTR);
                    676:     }
                    677: 
                    678:     if (ret < 0) {
                    679:         error_report("failed writing to acb->s->fds\n");
                    680:         qemu_free(rcb);
                    681:     }
                    682: }
                    683: 
                    684: /* Callback when all queued rados_aio requests are complete */
                    685: 
                    686: static void rbd_aio_bh_cb(void *opaque)
                    687: {
                    688:     RBDAIOCB *acb = opaque;
                    689: 
                    690:     if (!acb->write) {
                    691:         qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
                    692:     }
                    693:     qemu_vfree(acb->bounce);
                    694:     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
                    695:     qemu_bh_delete(acb->bh);
                    696:     acb->bh = NULL;
                    697: 
                    698:     qemu_aio_release(acb);
                    699: }
                    700: 
                    701: static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
                    702:                                            int64_t sector_num,
                    703:                                            QEMUIOVector *qiov,
                    704:                                            int nb_sectors,
                    705:                                            BlockDriverCompletionFunc *cb,
                    706:                                            void *opaque, int write)
                    707: {
                    708:     RBDAIOCB *acb;
                    709:     RADOSCB *rcb;
                    710:     rados_completion_t c;
                    711:     char n[RBD_MAX_SEG_NAME_SIZE];
                    712:     int64_t segnr, segoffs, segsize, last_segnr;
                    713:     int64_t off, size;
                    714:     char *buf;
                    715: 
                    716:     BDRVRBDState *s = bs->opaque;
                    717: 
                    718:     acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
                    719:     acb->write = write;
                    720:     acb->qiov = qiov;
                    721:     acb->bounce = qemu_blockalign(bs, qiov->size);
                    722:     acb->aiocnt = 0;
                    723:     acb->ret = 0;
                    724:     acb->error = 0;
                    725:     acb->s = s;
                    726:     acb->cancelled = 0;
                    727:     acb->bh = NULL;
                    728: 
                    729:     if (write) {
                    730:         qemu_iovec_to_buffer(acb->qiov, acb->bounce);
                    731:     }
                    732: 
                    733:     buf = acb->bounce;
                    734: 
                    735:     off = sector_num * BDRV_SECTOR_SIZE;
                    736:     size = nb_sectors * BDRV_SECTOR_SIZE;
                    737:     segnr = off / s->objsize;
                    738:     segoffs = off % s->objsize;
                    739:     segsize = s->objsize - segoffs;
                    740: 
                    741:     last_segnr = ((off + size - 1) / s->objsize);
                    742:     acb->aiocnt = (last_segnr - segnr) + 1;
                    743: 
                    744:     s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */
                    745: 
                    746:     while (size > 0) {
                    747:         if (size < segsize) {
                    748:             segsize = size;
                    749:         }
                    750: 
                    751:         snprintf(n, sizeof(n), "%s.%012" PRIx64, s->block_name,
                    752:                  segnr);
                    753: 
                    754:         rcb = qemu_malloc(sizeof(RADOSCB));
                    755:         rcb->done = 0;
                    756:         rcb->acb = acb;
                    757:         rcb->segsize = segsize;
                    758:         rcb->buf = buf;
                    759:         rcb->s = acb->s;
                    760: 
                    761:         if (write) {
                    762:             rados_aio_create_completion(rcb, NULL,
                    763:                                         (rados_callback_t) rbd_finish_aiocb,
                    764:                                         &c);
                    765:             rados_aio_write(s->pool, n, segoffs, buf, segsize, c);
                    766:         } else {
                    767:             rados_aio_create_completion(rcb,
                    768:                                         (rados_callback_t) rbd_finish_aiocb,
                    769:                                         NULL, &c);
                    770:             rados_aio_read(s->pool, n, segoffs, buf, segsize, c);
                    771:         }
                    772: 
                    773:         buf += segsize;
                    774:         size -= segsize;
                    775:         segoffs = 0;
                    776:         segsize = s->objsize;
                    777:         segnr++;
                    778:     }
                    779: 
                    780:     return &acb->common;
                    781: }
                    782: 
                    783: static BlockDriverAIOCB *rbd_aio_readv(BlockDriverState * bs,
                    784:                                        int64_t sector_num, QEMUIOVector * qiov,
                    785:                                        int nb_sectors,
                    786:                                        BlockDriverCompletionFunc * cb,
                    787:                                        void *opaque)
                    788: {
                    789:     return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
                    790: }
                    791: 
                    792: static BlockDriverAIOCB *rbd_aio_writev(BlockDriverState * bs,
                    793:                                         int64_t sector_num, QEMUIOVector * qiov,
                    794:                                         int nb_sectors,
                    795:                                         BlockDriverCompletionFunc * cb,
                    796:                                         void *opaque)
                    797: {
                    798:     return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
                    799: }
                    800: 
                    801: static int rbd_getinfo(BlockDriverState * bs, BlockDriverInfo * bdi)
                    802: {
                    803:     BDRVRBDState *s = bs->opaque;
                    804:     bdi->cluster_size = s->objsize;
                    805:     return 0;
                    806: }
                    807: 
                    808: static int64_t rbd_getlength(BlockDriverState * bs)
                    809: {
                    810:     BDRVRBDState *s = bs->opaque;
                    811: 
                    812:     return s->size;
                    813: }
                    814: 
                    815: static int rbd_snap_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
                    816: {
                    817:     BDRVRBDState *s = bs->opaque;
                    818:     char inbuf[512], outbuf[128];
                    819:     uint64_t snap_id;
                    820:     int r;
                    821:     char *p = inbuf;
                    822:     char *end = inbuf + sizeof(inbuf);
                    823:     char n[RBD_MAX_SEG_NAME_SIZE];
                    824:     char *hbuf = NULL;
                    825:     RbdHeader1 *header;
                    826: 
                    827:     if (sn_info->name[0] == '\0') {
                    828:         return -EINVAL; /* we need a name for rbd snapshots */
                    829:     }
                    830: 
                    831:     /*
                    832:      * rbd snapshots are using the name as the user controlled unique identifier
                    833:      * we can't use the rbd snapid for that purpose, as it can't be set
                    834:      */
                    835:     if (sn_info->id_str[0] != '\0' &&
                    836:         strcmp(sn_info->id_str, sn_info->name) != 0) {
                    837:         return -EINVAL;
                    838:     }
                    839: 
                    840:     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
                    841:         return -ERANGE;
                    842:     }
                    843: 
                    844:     r = rados_selfmanaged_snap_create(s->header_pool, &snap_id);
                    845:     if (r < 0) {
                    846:         error_report("failed to create snap id: %s", strerror(-r));
                    847:         return r;
                    848:     }
                    849: 
                    850:     *(uint32_t *)p = strlen(sn_info->name);
                    851:     cpu_to_le32s((uint32_t *)p);
                    852:     p += sizeof(uint32_t);
                    853:     strncpy(p, sn_info->name, end - p);
                    854:     p += strlen(p);
                    855:     if (p + sizeof(snap_id) > end) {
                    856:         error_report("invalid input parameter");
                    857:         return -EINVAL;
                    858:     }
                    859: 
                    860:     *(uint64_t *)p = snap_id;
                    861:     cpu_to_le64s((uint64_t *)p);
                    862: 
                    863:     snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX);
                    864: 
                    865:     r = rados_exec(s->header_pool, n, "rbd", "snap_add", inbuf,
                    866:                    sizeof(inbuf), outbuf, sizeof(outbuf));
                    867:     if (r < 0) {
                    868:         error_report("rbd.snap_add execution failed failed: %s", strerror(-r));
                    869:         return r;
                    870:     }
                    871: 
                    872:     sprintf(sn_info->id_str, "%s", sn_info->name);
                    873: 
                    874:     r = rbd_read_header(s, &hbuf);
                    875:     if (r < 0) {
                    876:         error_report("failed reading header: %s", strerror(-r));
                    877:         return r;
                    878:     }
                    879: 
                    880:     header = (RbdHeader1 *) hbuf;
                    881:     r = rbd_set_snapc(s->pool, sn_info->name, header);
                    882:     if (r < 0) {
                    883:         error_report("failed setting snap context: %s", strerror(-r));
                    884:         goto failed;
                    885:     }
                    886: 
                    887:     return 0;
                    888: 
                    889: failed:
                    890:     qemu_free(header);
                    891:     return r;
                    892: }
                    893: 
                    894: static int decode32(char **p, const char *end, uint32_t *v)
                    895: {
                    896:     if (*p + 4 > end) {
                    897:        return -ERANGE;
                    898:     }
                    899: 
                    900:     *v = *(uint32_t *)(*p);
                    901:     le32_to_cpus(v);
                    902:     *p += 4;
                    903:     return 0;
                    904: }
                    905: 
                    906: static int decode64(char **p, const char *end, uint64_t *v)
                    907: {
                    908:     if (*p + 8 > end) {
                    909:         return -ERANGE;
                    910:     }
                    911: 
                    912:     *v = *(uint64_t *)(*p);
                    913:     le64_to_cpus(v);
                    914:     *p += 8;
                    915:     return 0;
                    916: }
                    917: 
                    918: static int decode_str(char **p, const char *end, char **s)
                    919: {
                    920:     uint32_t len;
                    921:     int r;
                    922: 
                    923:     if ((r = decode32(p, end, &len)) < 0) {
                    924:         return r;
                    925:     }
                    926: 
                    927:     *s = qemu_malloc(len + 1);
                    928:     memcpy(*s, *p, len);
                    929:     *p += len;
                    930:     (*s)[len] = '\0';
                    931: 
                    932:     return len;
                    933: }
                    934: 
                    935: static int rbd_snap_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
                    936: {
                    937:     BDRVRBDState *s = bs->opaque;
                    938:     char n[RBD_MAX_SEG_NAME_SIZE];
                    939:     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
                    940:     RbdHeader1 *header;
                    941:     char *hbuf = NULL;
                    942:     char *outbuf = NULL, *end, *buf;
                    943:     uint64_t len;
                    944:     uint64_t snap_seq;
                    945:     uint32_t snap_count;
                    946:     int r, i;
                    947: 
                    948:     /* read header to estimate how much space we need to read the snap
                    949:      * list */
                    950:     if ((r = rbd_read_header(s, &hbuf)) < 0) {
                    951:         goto done_err;
                    952:     }
                    953:     header = (RbdHeader1 *)hbuf;
                    954:     len = le64_to_cpu(header->snap_names_len);
                    955:     len += 1024; /* should have already been enough, but new snapshots might
                    956:                     already been created since we read the header. just allocate
                    957:                     a bit more, so that in most cases it'll suffice anyway */
                    958:     qemu_free(hbuf);
                    959: 
                    960:     snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX);
                    961:     while (1) {
                    962:         qemu_free(outbuf);
                    963:         outbuf = qemu_malloc(len);
                    964: 
                    965:         r = rados_exec(s->header_pool, n, "rbd", "snap_list", NULL, 0,
                    966:                        outbuf, len);
                    967:         if (r < 0) {
                    968:             error_report("rbd.snap_list execution failed failed: %s", strerror(-r));
                    969:             goto done_err;
                    970:         }
                    971:         if (r != len) {
                    972:             break;
                    973:        }
                    974: 
                    975:         /* if we're here, we probably raced with some snaps creation */
                    976:         len *= 2;
                    977:     }
                    978:     buf = outbuf;
                    979:     end = buf + len;
                    980: 
                    981:     if ((r = decode64(&buf, end, &snap_seq)) < 0) {
                    982:         goto done_err;
                    983:     }
                    984:     if ((r = decode32(&buf, end, &snap_count)) < 0) {
                    985:         goto done_err;
                    986:     }
                    987: 
                    988:     sn_tab = qemu_mallocz(snap_count * sizeof(QEMUSnapshotInfo));
                    989:     for (i = 0; i < snap_count; i++) {
                    990:         uint64_t id, image_size;
                    991:         char *snap_name;
                    992: 
                    993:         if ((r = decode64(&buf, end, &id)) < 0) {
                    994:             goto done_err;
                    995:         }
                    996:         if ((r = decode64(&buf, end, &image_size)) < 0) {
                    997:             goto done_err;
                    998:         }
                    999:         if ((r = decode_str(&buf, end, &snap_name)) < 0) {
                   1000:             goto done_err;
                   1001:         }
                   1002: 
                   1003:         sn_info = sn_tab + i;
                   1004:         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
                   1005:         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
                   1006:         qemu_free(snap_name);
                   1007: 
                   1008:         sn_info->vm_state_size = image_size;
                   1009:         sn_info->date_sec = 0;
                   1010:         sn_info->date_nsec = 0;
                   1011:         sn_info->vm_clock_nsec = 0;
                   1012:     }
                   1013:     *psn_tab = sn_tab;
                   1014:     qemu_free(outbuf);
                   1015:     return snap_count;
                   1016: done_err:
                   1017:     qemu_free(sn_tab);
                   1018:     qemu_free(outbuf);
                   1019:     return r;
                   1020: }
                   1021: 
                   1022: static QEMUOptionParameter rbd_create_options[] = {
                   1023:     {
                   1024:      .name = BLOCK_OPT_SIZE,
                   1025:      .type = OPT_SIZE,
                   1026:      .help = "Virtual disk size"
                   1027:     },
                   1028:     {
                   1029:      .name = BLOCK_OPT_CLUSTER_SIZE,
                   1030:      .type = OPT_SIZE,
                   1031:      .help = "RBD object size"
                   1032:     },
                   1033:     {NULL}
                   1034: };
                   1035: 
                   1036: static BlockDriver bdrv_rbd = {
                   1037:     .format_name        = "rbd",
                   1038:     .instance_size      = sizeof(BDRVRBDState),
                   1039:     .bdrv_file_open     = rbd_open,
                   1040:     .bdrv_close         = rbd_close,
                   1041:     .bdrv_create        = rbd_create,
                   1042:     .bdrv_get_info      = rbd_getinfo,
                   1043:     .create_options     = rbd_create_options,
                   1044:     .bdrv_getlength     = rbd_getlength,
                   1045:     .protocol_name      = "rbd",
                   1046: 
                   1047:     .bdrv_aio_readv     = rbd_aio_readv,
                   1048:     .bdrv_aio_writev    = rbd_aio_writev,
                   1049: 
                   1050:     .bdrv_snapshot_create = rbd_snap_create,
                   1051:     .bdrv_snapshot_list = rbd_snap_list,
                   1052: };
                   1053: 
                   1054: static void bdrv_rbd_init(void)
                   1055: {
                   1056:     bdrv_register(&bdrv_rbd);
                   1057: }
                   1058: 
                   1059: block_init(bdrv_rbd_init);

unix.superglobalmegacorp.com

This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.