Annotation of qemu/block/rbd.c, revision 1.1.1.3

1.1       root        1: /*
                      2:  * QEMU Block driver for RADOS (Ceph)
                      3:  *
1.1.1.2   root        4:  * Copyright (C) 2010-2011 Christian Brunner <[email protected]>,
                      5:  *                         Josh Durgin <[email protected]>
1.1       root        6:  *
                      7:  * This work is licensed under the terms of the GNU GPL, version 2.  See
                      8:  * the COPYING file in the top-level directory.
                      9:  *
                     10:  */
                     11: 
1.1.1.2   root       12: #include <inttypes.h>
                     13: 
1.1       root       14: #include "qemu-common.h"
                     15: #include "qemu-error.h"
                     16: #include "block_int.h"
                     17: 
1.1.1.2   root       18: #include <rbd/librbd.h>
1.1       root       19: 
                     20: /*
                     21:  * When specifying the image filename use:
                     22:  *
1.1.1.2   root       23:  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
1.1       root       24:  *
1.1.1.3 ! root       25:  * poolname must be the name of an existing rados pool.
1.1       root       26:  *
1.1.1.3 ! root       27:  * devicename is the name of the rbd image.
1.1       root       28:  *
1.1.1.3 ! root       29:  * Each option given is used to configure rados, and may be any valid
        !            30:  * Ceph option, "id", or "conf".
1.1.1.2   root       31:  *
1.1.1.3 ! root       32:  * The "id" option indicates what user we should authenticate as to
        !            33:  * the Ceph cluster.  If it is excluded we will use the Ceph default
        !            34:  * (normally 'admin').
1.1       root       35:  *
1.1.1.3 ! root       36:  * The "conf" option specifies a Ceph configuration file to read.  If
        !            37:  * it is not specified, we will read from the default Ceph locations
        !            38:  * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
        !            39:  * file, specify conf=/dev/null.
1.1       root       40:  *
1.1.1.3 ! root       41:  * Configuration values containing :, @, or = can be escaped with a
        !            42:  * leading "\".
1.1       root       43:  */
                     44: 
                     45: #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
                     46: 
1.1.1.2   root       47: #define RBD_MAX_CONF_NAME_SIZE 128
                     48: #define RBD_MAX_CONF_VAL_SIZE 512
                     49: #define RBD_MAX_CONF_SIZE 1024
                     50: #define RBD_MAX_POOL_NAME_SIZE 128
                     51: #define RBD_MAX_SNAP_NAME_SIZE 128
                     52: #define RBD_MAX_SNAPS 100
                     53: 
1.1       root       54: typedef struct RBDAIOCB {
                     55:     BlockDriverAIOCB common;
                     56:     QEMUBH *bh;
                     57:     int ret;
                     58:     QEMUIOVector *qiov;
                     59:     char *bounce;
                     60:     int write;
                     61:     int64_t sector_num;
                     62:     int error;
                     63:     struct BDRVRBDState *s;
                     64:     int cancelled;
                     65: } RBDAIOCB;
                     66: 
                     67: typedef struct RADOSCB {
                     68:     int rcbid;
                     69:     RBDAIOCB *acb;
                     70:     struct BDRVRBDState *s;
                     71:     int done;
1.1.1.2   root       72:     int64_t size;
1.1       root       73:     char *buf;
                     74:     int ret;
                     75: } RADOSCB;
                     76: 
                     77: #define RBD_FD_READ 0
                     78: #define RBD_FD_WRITE 1
                     79: 
                     80: typedef struct BDRVRBDState {
                     81:     int fds[2];
1.1.1.2   root       82:     rados_t cluster;
                     83:     rados_ioctx_t io_ctx;
                     84:     rbd_image_t image;
                     85:     char name[RBD_MAX_IMAGE_NAME_SIZE];
1.1       root       86:     int qemu_aio_count;
1.1.1.2   root       87:     char *snap;
1.1       root       88:     int event_reader_pos;
                     89:     RADOSCB *event_rcb;
                     90: } BDRVRBDState;
                     91: 
                     92: static void rbd_aio_bh_cb(void *opaque);
                     93: 
1.1.1.2   root       94: static int qemu_rbd_next_tok(char *dst, int dst_len,
                     95:                              char *src, char delim,
                     96:                              const char *name,
                     97:                              char **p)
1.1       root       98: {
                     99:     int l;
                    100:     char *end;
                    101: 
                    102:     *p = NULL;
                    103: 
                    104:     if (delim != '\0') {
1.1.1.3 ! root      105:         for (end = src; *end; ++end) {
        !           106:             if (*end == delim) {
        !           107:                 break;
        !           108:             }
        !           109:             if (*end == '\\' && end[1] != '\0') {
        !           110:                 end++;
        !           111:             }
        !           112:         }
        !           113:         if (*end == delim) {
1.1       root      114:             *p = end + 1;
                    115:             *end = '\0';
                    116:         }
                    117:     }
                    118:     l = strlen(src);
                    119:     if (l >= dst_len) {
                    120:         error_report("%s too long", name);
                    121:         return -EINVAL;
                    122:     } else if (l == 0) {
                    123:         error_report("%s too short", name);
                    124:         return -EINVAL;
                    125:     }
                    126: 
                    127:     pstrcpy(dst, dst_len, src);
                    128: 
                    129:     return 0;
                    130: }
                    131: 
1.1.1.3 ! root      132: static void qemu_rbd_unescape(char *src)
        !           133: {
        !           134:     char *p;
        !           135: 
        !           136:     for (p = src; *src; ++src, ++p) {
        !           137:         if (*src == '\\' && src[1] != '\0') {
        !           138:             src++;
        !           139:         }
        !           140:         *p = *src;
        !           141:     }
        !           142:     *p = '\0';
        !           143: }
        !           144: 
1.1.1.2   root      145: static int qemu_rbd_parsename(const char *filename,
                    146:                               char *pool, int pool_len,
                    147:                               char *snap, int snap_len,
                    148:                               char *name, int name_len,
                    149:                               char *conf, int conf_len)
1.1       root      150: {
                    151:     const char *start;
                    152:     char *p, *buf;
                    153:     int ret;
                    154: 
                    155:     if (!strstart(filename, "rbd:", &start)) {
                    156:         return -EINVAL;
                    157:     }
                    158: 
1.1.1.3 ! root      159:     buf = g_strdup(start);
1.1       root      160:     p = buf;
1.1.1.2   root      161:     *snap = '\0';
                    162:     *conf = '\0';
1.1       root      163: 
1.1.1.2   root      164:     ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p);
1.1       root      165:     if (ret < 0 || !p) {
                    166:         ret = -EINVAL;
                    167:         goto done;
                    168:     }
1.1.1.3 ! root      169:     qemu_rbd_unescape(pool);
1.1.1.2   root      170: 
                    171:     if (strchr(p, '@')) {
                    172:         ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p);
                    173:         if (ret < 0) {
                    174:             goto done;
                    175:         }
                    176:         ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p);
1.1.1.3 ! root      177:         qemu_rbd_unescape(snap);
1.1.1.2   root      178:     } else {
                    179:         ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p);
1.1       root      180:     }
1.1.1.3 ! root      181:     qemu_rbd_unescape(name);
1.1.1.2   root      182:     if (ret < 0 || !p) {
1.1       root      183:         goto done;
                    184:     }
                    185: 
1.1.1.2   root      186:     ret = qemu_rbd_next_tok(conf, conf_len, p, '\0', "configuration", &p);
1.1       root      187: 
                    188: done:
1.1.1.3 ! root      189:     g_free(buf);
1.1       root      190:     return ret;
                    191: }
                    192: 
1.1.1.3 ! root      193: static char *qemu_rbd_parse_clientname(const char *conf, char *clientname)
        !           194: {
        !           195:     const char *p = conf;
        !           196: 
        !           197:     while (*p) {
        !           198:         int len;
        !           199:         const char *end = strchr(p, ':');
        !           200: 
        !           201:         if (end) {
        !           202:             len = end - p;
        !           203:         } else {
        !           204:             len = strlen(p);
        !           205:         }
        !           206: 
        !           207:         if (strncmp(p, "id=", 3) == 0) {
        !           208:             len -= 3;
        !           209:             strncpy(clientname, p + 3, len);
        !           210:             clientname[len] = '\0';
        !           211:             return clientname;
        !           212:         }
        !           213:         if (end == NULL) {
        !           214:             break;
        !           215:         }
        !           216:         p = end + 1;
        !           217:     }
        !           218:     return NULL;
        !           219: }
        !           220: 
1.1.1.2   root      221: static int qemu_rbd_set_conf(rados_t cluster, const char *conf)
1.1       root      222: {
1.1.1.2   root      223:     char *p, *buf;
                    224:     char name[RBD_MAX_CONF_NAME_SIZE];
                    225:     char value[RBD_MAX_CONF_VAL_SIZE];
                    226:     int ret = 0;
1.1       root      227: 
1.1.1.3 ! root      228:     buf = g_strdup(conf);
1.1.1.2   root      229:     p = buf;
1.1       root      230: 
1.1.1.2   root      231:     while (p) {
                    232:         ret = qemu_rbd_next_tok(name, sizeof(name), p,
                    233:                                 '=', "conf option name", &p);
                    234:         if (ret < 0) {
                    235:             break;
                    236:         }
1.1.1.3 ! root      237:         qemu_rbd_unescape(name);
1.1       root      238: 
1.1.1.2   root      239:         if (!p) {
                    240:             error_report("conf option %s has no value", name);
                    241:             ret = -EINVAL;
                    242:             break;
                    243:         }
1.1       root      244: 
1.1.1.2   root      245:         ret = qemu_rbd_next_tok(value, sizeof(value), p,
                    246:                                 ':', "conf option value", &p);
                    247:         if (ret < 0) {
                    248:             break;
                    249:         }
1.1.1.3 ! root      250:         qemu_rbd_unescape(value);
1.1       root      251: 
1.1.1.3 ! root      252:         if (strcmp(name, "conf") == 0) {
        !           253:             ret = rados_conf_read_file(cluster, value);
1.1.1.2   root      254:             if (ret < 0) {
1.1.1.3 ! root      255:                 error_report("error reading conf file %s", value);
1.1.1.2   root      256:                 break;
                    257:             }
1.1.1.3 ! root      258:         } else if (strcmp(name, "id") == 0) {
        !           259:             /* ignore, this is parsed by qemu_rbd_parse_clientname() */
1.1.1.2   root      260:         } else {
1.1.1.3 ! root      261:             ret = rados_conf_set(cluster, name, value);
1.1.1.2   root      262:             if (ret < 0) {
1.1.1.3 ! root      263:                 error_report("invalid conf option %s", name);
        !           264:                 ret = -EINVAL;
1.1.1.2   root      265:                 break;
                    266:             }
                    267:         }
1.1       root      268:     }
                    269: 
1.1.1.3 ! root      270:     g_free(buf);
1.1       root      271:     return ret;
                    272: }
                    273: 
1.1.1.2   root      274: static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options)
1.1       root      275: {
                    276:     int64_t bytes = 0;
                    277:     int64_t objsize;
1.1.1.2   root      278:     int obj_order = 0;
                    279:     char pool[RBD_MAX_POOL_NAME_SIZE];
                    280:     char name[RBD_MAX_IMAGE_NAME_SIZE];
                    281:     char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
                    282:     char conf[RBD_MAX_CONF_SIZE];
1.1.1.3 ! root      283:     char clientname_buf[RBD_MAX_CONF_SIZE];
        !           284:     char *clientname;
1.1.1.2   root      285:     rados_t cluster;
                    286:     rados_ioctx_t io_ctx;
1.1       root      287:     int ret;
                    288: 
1.1.1.2   root      289:     if (qemu_rbd_parsename(filename, pool, sizeof(pool),
                    290:                            snap_buf, sizeof(snap_buf),
                    291:                            name, sizeof(name),
                    292:                            conf, sizeof(conf)) < 0) {
1.1       root      293:         return -EINVAL;
                    294:     }
                    295: 
                    296:     /* Read out options */
                    297:     while (options && options->name) {
                    298:         if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
                    299:             bytes = options->value.n;
                    300:         } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
                    301:             if (options->value.n) {
                    302:                 objsize = options->value.n;
                    303:                 if ((objsize - 1) & objsize) {    /* not a power of 2? */
                    304:                     error_report("obj size needs to be power of 2");
                    305:                     return -EINVAL;
                    306:                 }
                    307:                 if (objsize < 4096) {
                    308:                     error_report("obj size too small");
                    309:                     return -EINVAL;
                    310:                 }
1.1.1.2   root      311:                 obj_order = ffs(objsize) - 1;
1.1       root      312:             }
                    313:         }
                    314:         options++;
                    315:     }
                    316: 
1.1.1.3 ! root      317:     clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
        !           318:     if (rados_create(&cluster, clientname) < 0) {
1.1       root      319:         error_report("error initializing");
                    320:         return -EIO;
                    321:     }
                    322: 
1.1.1.2   root      323:     if (strstr(conf, "conf=") == NULL) {
1.1.1.3 ! root      324:         /* try default location, but ignore failure */
        !           325:         rados_conf_read_file(cluster, NULL);
1.1       root      326:     }
                    327: 
1.1.1.2   root      328:     if (conf[0] != '\0' &&
                    329:         qemu_rbd_set_conf(cluster, conf) < 0) {
                    330:         error_report("error setting config options");
                    331:         rados_shutdown(cluster);
                    332:         return -EIO;
1.1       root      333:     }
                    334: 
1.1.1.2   root      335:     if (rados_connect(cluster) < 0) {
                    336:         error_report("error connecting");
                    337:         rados_shutdown(cluster);
1.1       root      338:         return -EIO;
                    339:     }
                    340: 
1.1.1.2   root      341:     if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) {
                    342:         error_report("error opening pool %s", pool);
                    343:         rados_shutdown(cluster);
                    344:         return -EIO;
1.1       root      345:     }
                    346: 
1.1.1.2   root      347:     ret = rbd_create(io_ctx, name, bytes, &obj_order);
                    348:     rados_ioctx_destroy(io_ctx);
                    349:     rados_shutdown(cluster);
1.1       root      350: 
                    351:     return ret;
                    352: }
                    353: 
                    354: /*
1.1.1.2   root      355:  * This aio completion is being called from qemu_rbd_aio_event_reader()
                    356:  * and runs in qemu context. It schedules a bh, but just in case the aio
1.1       root      357:  * was not cancelled before.
                    358:  */
1.1.1.2   root      359: static void qemu_rbd_complete_aio(RADOSCB *rcb)
1.1       root      360: {
                    361:     RBDAIOCB *acb = rcb->acb;
                    362:     int64_t r;
                    363: 
                    364:     if (acb->cancelled) {
1.1.1.2   root      365:         qemu_vfree(acb->bounce);
                    366:         qemu_aio_release(acb);
1.1       root      367:         goto done;
                    368:     }
                    369: 
                    370:     r = rcb->ret;
                    371: 
                    372:     if (acb->write) {
                    373:         if (r < 0) {
                    374:             acb->ret = r;
                    375:             acb->error = 1;
                    376:         } else if (!acb->error) {
1.1.1.2   root      377:             acb->ret = rcb->size;
1.1       root      378:         }
                    379:     } else {
1.1.1.2   root      380:         if (r < 0) {
                    381:             memset(rcb->buf, 0, rcb->size);
1.1       root      382:             acb->ret = r;
                    383:             acb->error = 1;
1.1.1.2   root      384:         } else if (r < rcb->size) {
                    385:             memset(rcb->buf + r, 0, rcb->size - r);
1.1       root      386:             if (!acb->error) {
1.1.1.2   root      387:                 acb->ret = rcb->size;
1.1       root      388:             }
                    389:         } else if (!acb->error) {
1.1.1.2   root      390:             acb->ret = r;
1.1       root      391:         }
                    392:     }
                    393:     /* Note that acb->bh can be NULL in case where the aio was cancelled */
1.1.1.2   root      394:     acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
                    395:     qemu_bh_schedule(acb->bh);
1.1       root      396: done:
1.1.1.3 ! root      397:     g_free(rcb);
1.1       root      398: }
                    399: 
                    400: /*
                    401:  * aio fd read handler. It runs in the qemu context and calls the
                    402:  * completion handling of completed rados aio operations.
                    403:  */
1.1.1.2   root      404: static void qemu_rbd_aio_event_reader(void *opaque)
1.1       root      405: {
                    406:     BDRVRBDState *s = opaque;
                    407: 
                    408:     ssize_t ret;
                    409: 
                    410:     do {
                    411:         char *p = (char *)&s->event_rcb;
                    412: 
                    413:         /* now read the rcb pointer that was sent from a non qemu thread */
1.1.1.3 ! root      414:         ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos,
        !           415:                    sizeof(s->event_rcb) - s->event_reader_pos);
        !           416:         if (ret > 0) {
        !           417:             s->event_reader_pos += ret;
        !           418:             if (s->event_reader_pos == sizeof(s->event_rcb)) {
        !           419:                 s->event_reader_pos = 0;
        !           420:                 qemu_rbd_complete_aio(s->event_rcb);
        !           421:                 s->qemu_aio_count--;
1.1       root      422:             }
                    423:         }
                    424:     } while (ret < 0 && errno == EINTR);
                    425: }
                    426: 
1.1.1.2   root      427: static int qemu_rbd_aio_flush_cb(void *opaque)
1.1       root      428: {
                    429:     BDRVRBDState *s = opaque;
                    430: 
                    431:     return (s->qemu_aio_count > 0);
                    432: }
                    433: 
1.1.1.2   root      434: static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags)
1.1       root      435: {
                    436:     BDRVRBDState *s = bs->opaque;
1.1.1.2   root      437:     char pool[RBD_MAX_POOL_NAME_SIZE];
                    438:     char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
                    439:     char conf[RBD_MAX_CONF_SIZE];
1.1.1.3 ! root      440:     char clientname_buf[RBD_MAX_CONF_SIZE];
        !           441:     char *clientname;
1.1       root      442:     int r;
                    443: 
1.1.1.2   root      444:     if (qemu_rbd_parsename(filename, pool, sizeof(pool),
                    445:                            snap_buf, sizeof(snap_buf),
                    446:                            s->name, sizeof(s->name),
                    447:                            conf, sizeof(conf)) < 0) {
1.1       root      448:         return -EINVAL;
                    449:     }
                    450: 
1.1.1.3 ! root      451:     clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
        !           452:     r = rados_create(&s->cluster, clientname);
1.1.1.2   root      453:     if (r < 0) {
1.1       root      454:         error_report("error initializing");
                    455:         return r;
                    456:     }
                    457: 
1.1.1.3 ! root      458:     s->snap = NULL;
        !           459:     if (snap_buf[0] != '\0') {
        !           460:         s->snap = g_strdup(snap_buf);
        !           461:     }
        !           462: 
1.1.1.2   root      463:     if (strstr(conf, "conf=") == NULL) {
1.1.1.3 ! root      464:         /* try default location, but ignore failure */
        !           465:         rados_conf_read_file(s->cluster, NULL);
1.1       root      466:     }
                    467: 
1.1.1.2   root      468:     if (conf[0] != '\0') {
                    469:         r = qemu_rbd_set_conf(s->cluster, conf);
                    470:         if (r < 0) {
                    471:             error_report("error setting config options");
1.1.1.3 ! root      472:             goto failed_shutdown;
1.1.1.2   root      473:         }
1.1       root      474:     }
                    475: 
1.1.1.2   root      476:     r = rados_connect(s->cluster);
                    477:     if (r < 0) {
                    478:         error_report("error connecting");
1.1.1.3 ! root      479:         goto failed_shutdown;
1.1       root      480:     }
                    481: 
1.1.1.2   root      482:     r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);
                    483:     if (r < 0) {
                    484:         error_report("error opening pool %s", pool);
1.1.1.3 ! root      485:         goto failed_shutdown;
1.1       root      486:     }
                    487: 
1.1.1.2   root      488:     r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);
1.1       root      489:     if (r < 0) {
1.1.1.2   root      490:         error_report("error reading header from %s", s->name);
1.1.1.3 ! root      491:         goto failed_open;
1.1       root      492:     }
                    493: 
1.1.1.2   root      494:     bs->read_only = (s->snap != NULL);
1.1       root      495: 
                    496:     s->event_reader_pos = 0;
                    497:     r = qemu_pipe(s->fds);
                    498:     if (r < 0) {
                    499:         error_report("error opening eventfd");
                    500:         goto failed;
                    501:     }
                    502:     fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
                    503:     fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
1.1.1.2   root      504:     qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader,
                    505:                             NULL, qemu_rbd_aio_flush_cb, NULL, s);
1.1       root      506: 
                    507: 
                    508:     return 0;
                    509: 
                    510: failed:
1.1.1.2   root      511:     rbd_close(s->image);
1.1.1.3 ! root      512: failed_open:
1.1.1.2   root      513:     rados_ioctx_destroy(s->io_ctx);
1.1.1.3 ! root      514: failed_shutdown:
1.1.1.2   root      515:     rados_shutdown(s->cluster);
1.1.1.3 ! root      516:     g_free(s->snap);
1.1       root      517:     return r;
                    518: }
                    519: 
1.1.1.2   root      520: static void qemu_rbd_close(BlockDriverState *bs)
1.1       root      521: {
                    522:     BDRVRBDState *s = bs->opaque;
                    523: 
                    524:     close(s->fds[0]);
                    525:     close(s->fds[1]);
                    526:     qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL , NULL, NULL, NULL,
                    527:         NULL);
                    528: 
1.1.1.2   root      529:     rbd_close(s->image);
                    530:     rados_ioctx_destroy(s->io_ctx);
1.1.1.3 ! root      531:     g_free(s->snap);
1.1.1.2   root      532:     rados_shutdown(s->cluster);
1.1       root      533: }
                    534: 
                    535: /*
                    536:  * Cancel aio. Since we don't reference acb in a non qemu threads,
                    537:  * it is safe to access it here.
                    538:  */
1.1.1.2   root      539: static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
1.1       root      540: {
                    541:     RBDAIOCB *acb = (RBDAIOCB *) blockacb;
                    542:     acb->cancelled = 1;
                    543: }
                    544: 
                    545: static AIOPool rbd_aio_pool = {
                    546:     .aiocb_size = sizeof(RBDAIOCB),
1.1.1.2   root      547:     .cancel = qemu_rbd_aio_cancel,
1.1       root      548: };
                    549: 
1.1.1.2   root      550: static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb)
1.1       root      551: {
1.1.1.2   root      552:     int ret = 0;
1.1       root      553:     while (1) {
                    554:         fd_set wfd;
1.1.1.2   root      555:         int fd = s->fds[RBD_FD_WRITE];
1.1       root      556: 
1.1.1.2   root      557:         /* send the op pointer to the qemu thread that is responsible
                    558:            for the aio/op completion. Must do it in a qemu thread context */
1.1       root      559:         ret = write(fd, (void *)&rcb, sizeof(rcb));
                    560:         if (ret >= 0) {
                    561:             break;
                    562:         }
                    563:         if (errno == EINTR) {
                    564:             continue;
1.1.1.2   root      565:         }
1.1       root      566:         if (errno != EAGAIN) {
                    567:             break;
1.1.1.2   root      568:         }
1.1       root      569: 
                    570:         FD_ZERO(&wfd);
                    571:         FD_SET(fd, &wfd);
                    572:         do {
                    573:             ret = select(fd + 1, NULL, &wfd, NULL, NULL);
                    574:         } while (ret < 0 && errno == EINTR);
                    575:     }
                    576: 
1.1.1.2   root      577:     return ret;
                    578: }
                    579: 
                    580: /*
                    581:  * This is the callback function for rbd_aio_read and _write
                    582:  *
                    583:  * Note: this function is being called from a non qemu thread so
                    584:  * we need to be careful about what we do here. Generally we only
                    585:  * write to the block notification pipe, and do the rest of the
                    586:  * io completion handling from qemu_rbd_aio_event_reader() which
                    587:  * runs in a qemu context.
                    588:  */
                    589: static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
                    590: {
                    591:     int ret;
                    592:     rcb->ret = rbd_aio_get_return_value(c);
                    593:     rbd_aio_release(c);
                    594:     ret = qemu_rbd_send_pipe(rcb->s, rcb);
1.1       root      595:     if (ret < 0) {
1.1.1.2   root      596:         error_report("failed writing to acb->s->fds");
1.1.1.3 ! root      597:         g_free(rcb);
1.1       root      598:     }
                    599: }
                    600: 
1.1.1.2   root      601: /* Callback when all queued rbd_aio requests are complete */
1.1       root      602: 
                    603: static void rbd_aio_bh_cb(void *opaque)
                    604: {
                    605:     RBDAIOCB *acb = opaque;
                    606: 
                    607:     if (!acb->write) {
                    608:         qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
                    609:     }
                    610:     qemu_vfree(acb->bounce);
                    611:     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
                    612:     qemu_bh_delete(acb->bh);
                    613:     acb->bh = NULL;
                    614: 
                    615:     qemu_aio_release(acb);
                    616: }
                    617: 
                    618: static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
                    619:                                            int64_t sector_num,
                    620:                                            QEMUIOVector *qiov,
                    621:                                            int nb_sectors,
                    622:                                            BlockDriverCompletionFunc *cb,
                    623:                                            void *opaque, int write)
                    624: {
                    625:     RBDAIOCB *acb;
                    626:     RADOSCB *rcb;
1.1.1.2   root      627:     rbd_completion_t c;
1.1       root      628:     int64_t off, size;
                    629:     char *buf;
1.1.1.2   root      630:     int r;
1.1       root      631: 
                    632:     BDRVRBDState *s = bs->opaque;
                    633: 
                    634:     acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
1.1.1.2   root      635:     if (!acb) {
                    636:         return NULL;
                    637:     }
1.1       root      638:     acb->write = write;
                    639:     acb->qiov = qiov;
                    640:     acb->bounce = qemu_blockalign(bs, qiov->size);
                    641:     acb->ret = 0;
                    642:     acb->error = 0;
                    643:     acb->s = s;
                    644:     acb->cancelled = 0;
                    645:     acb->bh = NULL;
                    646: 
                    647:     if (write) {
                    648:         qemu_iovec_to_buffer(acb->qiov, acb->bounce);
                    649:     }
                    650: 
                    651:     buf = acb->bounce;
                    652: 
                    653:     off = sector_num * BDRV_SECTOR_SIZE;
                    654:     size = nb_sectors * BDRV_SECTOR_SIZE;
                    655: 
1.1.1.2   root      656:     s->qemu_aio_count++; /* All the RADOSCB */
1.1       root      657: 
1.1.1.3 ! root      658:     rcb = g_malloc(sizeof(RADOSCB));
1.1.1.2   root      659:     rcb->done = 0;
                    660:     rcb->acb = acb;
                    661:     rcb->buf = buf;
                    662:     rcb->s = acb->s;
                    663:     rcb->size = size;
                    664:     r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
                    665:     if (r < 0) {
                    666:         goto failed;
                    667:     }
                    668: 
                    669:     if (write) {
                    670:         r = rbd_aio_write(s->image, off, size, buf, c);
                    671:     } else {
                    672:         r = rbd_aio_read(s->image, off, size, buf, c);
                    673:     }
1.1       root      674: 
1.1.1.2   root      675:     if (r < 0) {
                    676:         goto failed;
1.1       root      677:     }
                    678: 
                    679:     return &acb->common;
1.1.1.2   root      680: 
                    681: failed:
1.1.1.3 ! root      682:     g_free(rcb);
1.1.1.2   root      683:     s->qemu_aio_count--;
                    684:     qemu_aio_release(acb);
                    685:     return NULL;
1.1       root      686: }
                    687: 
1.1.1.2   root      688: static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
                    689:                                             int64_t sector_num,
                    690:                                             QEMUIOVector *qiov,
                    691:                                             int nb_sectors,
                    692:                                             BlockDriverCompletionFunc *cb,
                    693:                                             void *opaque)
1.1       root      694: {
                    695:     return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
                    696: }
                    697: 
1.1.1.2   root      698: static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
                    699:                                              int64_t sector_num,
                    700:                                              QEMUIOVector *qiov,
                    701:                                              int nb_sectors,
                    702:                                              BlockDriverCompletionFunc *cb,
                    703:                                              void *opaque)
1.1       root      704: {
                    705:     return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
                    706: }
                    707: 
1.1.1.3 ! root      708: static int qemu_rbd_co_flush(BlockDriverState *bs)
        !           709: {
        !           710: #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
        !           711:     /* rbd_flush added in 0.1.1 */
        !           712:     BDRVRBDState *s = bs->opaque;
        !           713:     return rbd_flush(s->image);
        !           714: #else
        !           715:     return 0;
        !           716: #endif
        !           717: }
        !           718: 
1.1.1.2   root      719: static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
1.1       root      720: {
                    721:     BDRVRBDState *s = bs->opaque;
1.1.1.2   root      722:     rbd_image_info_t info;
                    723:     int r;
                    724: 
                    725:     r = rbd_stat(s->image, &info, sizeof(info));
                    726:     if (r < 0) {
                    727:         return r;
                    728:     }
                    729: 
                    730:     bdi->cluster_size = info.obj_size;
1.1       root      731:     return 0;
                    732: }
                    733: 
1.1.1.2   root      734: static int64_t qemu_rbd_getlength(BlockDriverState *bs)
                    735: {
                    736:     BDRVRBDState *s = bs->opaque;
                    737:     rbd_image_info_t info;
                    738:     int r;
                    739: 
                    740:     r = rbd_stat(s->image, &info, sizeof(info));
                    741:     if (r < 0) {
                    742:         return r;
                    743:     }
                    744: 
                    745:     return info.size;
                    746: }
                    747: 
                    748: static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset)
1.1       root      749: {
                    750:     BDRVRBDState *s = bs->opaque;
1.1.1.2   root      751:     int r;
                    752: 
                    753:     r = rbd_resize(s->image, offset);
                    754:     if (r < 0) {
                    755:         return r;
                    756:     }
1.1       root      757: 
1.1.1.2   root      758:     return 0;
1.1       root      759: }
                    760: 
1.1.1.2   root      761: static int qemu_rbd_snap_create(BlockDriverState *bs,
                    762:                                 QEMUSnapshotInfo *sn_info)
1.1       root      763: {
                    764:     BDRVRBDState *s = bs->opaque;
                    765:     int r;
                    766: 
                    767:     if (sn_info->name[0] == '\0') {
                    768:         return -EINVAL; /* we need a name for rbd snapshots */
                    769:     }
                    770: 
                    771:     /*
                    772:      * rbd snapshots are using the name as the user controlled unique identifier
                    773:      * we can't use the rbd snapid for that purpose, as it can't be set
                    774:      */
                    775:     if (sn_info->id_str[0] != '\0' &&
                    776:         strcmp(sn_info->id_str, sn_info->name) != 0) {
                    777:         return -EINVAL;
                    778:     }
                    779: 
                    780:     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
                    781:         return -ERANGE;
                    782:     }
                    783: 
1.1.1.2   root      784:     r = rbd_snap_create(s->image, sn_info->name);
1.1       root      785:     if (r < 0) {
1.1.1.2   root      786:         error_report("failed to create snap: %s", strerror(-r));
1.1       root      787:         return r;
                    788:     }
                    789: 
                    790:     return 0;
                    791: }
                    792: 
1.1.1.2   root      793: static int qemu_rbd_snap_list(BlockDriverState *bs,
                    794:                               QEMUSnapshotInfo **psn_tab)
1.1       root      795: {
                    796:     BDRVRBDState *s = bs->opaque;
                    797:     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1.1.1.2   root      798:     int i, snap_count;
                    799:     rbd_snap_info_t *snaps;
                    800:     int max_snaps = RBD_MAX_SNAPS;
1.1       root      801: 
1.1.1.2   root      802:     do {
1.1.1.3 ! root      803:         snaps = g_malloc(sizeof(*snaps) * max_snaps);
1.1.1.2   root      804:         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
                    805:         if (snap_count < 0) {
1.1.1.3 ! root      806:             g_free(snaps);
1.1       root      807:         }
1.1.1.2   root      808:     } while (snap_count == -ERANGE);
1.1       root      809: 
1.1.1.2   root      810:     if (snap_count <= 0) {
1.1.1.3 ! root      811:         goto done;
1.1       root      812:     }
                    813: 
1.1.1.3 ! root      814:     sn_tab = g_malloc0(snap_count * sizeof(QEMUSnapshotInfo));
1.1       root      815: 
1.1.1.2   root      816:     for (i = 0; i < snap_count; i++) {
                    817:         const char *snap_name = snaps[i].name;
1.1       root      818: 
                    819:         sn_info = sn_tab + i;
                    820:         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
                    821:         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
                    822: 
1.1.1.2   root      823:         sn_info->vm_state_size = snaps[i].size;
1.1       root      824:         sn_info->date_sec = 0;
                    825:         sn_info->date_nsec = 0;
                    826:         sn_info->vm_clock_nsec = 0;
                    827:     }
1.1.1.2   root      828:     rbd_snap_list_end(snaps);
                    829: 
1.1.1.3 ! root      830:  done:
1.1       root      831:     *psn_tab = sn_tab;
                    832:     return snap_count;
                    833: }
                    834: 
1.1.1.2   root      835: static QEMUOptionParameter qemu_rbd_create_options[] = {
1.1       root      836:     {
                    837:      .name = BLOCK_OPT_SIZE,
                    838:      .type = OPT_SIZE,
                    839:      .help = "Virtual disk size"
                    840:     },
                    841:     {
                    842:      .name = BLOCK_OPT_CLUSTER_SIZE,
                    843:      .type = OPT_SIZE,
                    844:      .help = "RBD object size"
                    845:     },
                    846:     {NULL}
                    847: };
                    848: 
                    849: static BlockDriver bdrv_rbd = {
                    850:     .format_name        = "rbd",
                    851:     .instance_size      = sizeof(BDRVRBDState),
1.1.1.2   root      852:     .bdrv_file_open     = qemu_rbd_open,
                    853:     .bdrv_close         = qemu_rbd_close,
                    854:     .bdrv_create        = qemu_rbd_create,
                    855:     .bdrv_get_info      = qemu_rbd_getinfo,
                    856:     .create_options     = qemu_rbd_create_options,
                    857:     .bdrv_getlength     = qemu_rbd_getlength,
                    858:     .bdrv_truncate      = qemu_rbd_truncate,
1.1       root      859:     .protocol_name      = "rbd",
                    860: 
1.1.1.3 ! root      861:     .bdrv_aio_readv         = qemu_rbd_aio_readv,
        !           862:     .bdrv_aio_writev        = qemu_rbd_aio_writev,
        !           863:     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1.1       root      864: 
1.1.1.3 ! root      865:     .bdrv_snapshot_create   = qemu_rbd_snap_create,
        !           866:     .bdrv_snapshot_list     = qemu_rbd_snap_list,
1.1       root      867: };
                    868: 
                    869: static void bdrv_rbd_init(void)
                    870: {
                    871:     bdrv_register(&bdrv_rbd);
                    872: }
                    873: 
                    874: block_init(bdrv_rbd_init);

unix.superglobalmegacorp.com

This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.