Annotation of qemu/block/rbd.c, revision 1.1.1.4

1.1       root        1: /*
                      2:  * QEMU Block driver for RADOS (Ceph)
                      3:  *
1.1.1.2   root        4:  * Copyright (C) 2010-2011 Christian Brunner <[email protected]>,
                      5:  *                         Josh Durgin <[email protected]>
1.1       root        6:  *
                      7:  * This work is licensed under the terms of the GNU GPL, version 2.  See
                      8:  * the COPYING file in the top-level directory.
                      9:  *
1.1.1.4 ! root       10:  * Contributions after 2012-01-13 are licensed under the terms of the
        !            11:  * GNU GPL, version 2 or (at your option) any later version.
1.1       root       12:  */
                     13: 
1.1.1.2   root       14: #include <inttypes.h>
                     15: 
1.1       root       16: #include "qemu-common.h"
                     17: #include "qemu-error.h"
                     18: #include "block_int.h"
                     19: 
1.1.1.2   root       20: #include <rbd/librbd.h>
1.1       root       21: 
                     22: /*
                     23:  * When specifying the image filename use:
                     24:  *
1.1.1.2   root       25:  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
1.1       root       26:  *
1.1.1.3   root       27:  * poolname must be the name of an existing rados pool.
1.1       root       28:  *
1.1.1.3   root       29:  * devicename is the name of the rbd image.
1.1       root       30:  *
1.1.1.3   root       31:  * Each option given is used to configure rados, and may be any valid
                     32:  * Ceph option, "id", or "conf".
1.1.1.2   root       33:  *
1.1.1.3   root       34:  * The "id" option indicates what user we should authenticate as to
                     35:  * the Ceph cluster.  If it is excluded we will use the Ceph default
                     36:  * (normally 'admin').
1.1       root       37:  *
1.1.1.3   root       38:  * The "conf" option specifies a Ceph configuration file to read.  If
                     39:  * it is not specified, we will read from the default Ceph locations
                     40:  * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
                     41:  * file, specify conf=/dev/null.
1.1       root       42:  *
1.1.1.3   root       43:  * Configuration values containing :, @, or = can be escaped with a
                     44:  * leading "\".
1.1       root       45:  */
                     46: 
1.1.1.4 ! root       47: /* rbd_aio_discard added in 0.1.2 */
        !            48: #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
        !            49: #define LIBRBD_SUPPORTS_DISCARD
        !            50: #else
        !            51: #undef LIBRBD_SUPPORTS_DISCARD
        !            52: #endif
        !            53: 
1.1       root       54: #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
                     55: 
1.1.1.2   root       56: #define RBD_MAX_CONF_NAME_SIZE 128
                     57: #define RBD_MAX_CONF_VAL_SIZE 512
                     58: #define RBD_MAX_CONF_SIZE 1024
                     59: #define RBD_MAX_POOL_NAME_SIZE 128
                     60: #define RBD_MAX_SNAP_NAME_SIZE 128
                     61: #define RBD_MAX_SNAPS 100
                     62: 
1.1.1.4 ! root       63: typedef enum {
        !            64:     RBD_AIO_READ,
        !            65:     RBD_AIO_WRITE,
        !            66:     RBD_AIO_DISCARD
        !            67: } RBDAIOCmd;
        !            68: 
1.1       root       69: typedef struct RBDAIOCB {
                     70:     BlockDriverAIOCB common;
                     71:     QEMUBH *bh;
                     72:     int ret;
                     73:     QEMUIOVector *qiov;
                     74:     char *bounce;
1.1.1.4 ! root       75:     RBDAIOCmd cmd;
1.1       root       76:     int64_t sector_num;
                     77:     int error;
                     78:     struct BDRVRBDState *s;
                     79:     int cancelled;
                     80: } RBDAIOCB;
                     81: 
                     82: typedef struct RADOSCB {
                     83:     int rcbid;
                     84:     RBDAIOCB *acb;
                     85:     struct BDRVRBDState *s;
                     86:     int done;
1.1.1.2   root       87:     int64_t size;
1.1       root       88:     char *buf;
                     89:     int ret;
                     90: } RADOSCB;
                     91: 
                     92: #define RBD_FD_READ 0
                     93: #define RBD_FD_WRITE 1
                     94: 
                     95: typedef struct BDRVRBDState {
                     96:     int fds[2];
1.1.1.2   root       97:     rados_t cluster;
                     98:     rados_ioctx_t io_ctx;
                     99:     rbd_image_t image;
                    100:     char name[RBD_MAX_IMAGE_NAME_SIZE];
1.1       root      101:     int qemu_aio_count;
1.1.1.2   root      102:     char *snap;
1.1       root      103:     int event_reader_pos;
                    104:     RADOSCB *event_rcb;
                    105: } BDRVRBDState;
                    106: 
                    107: static void rbd_aio_bh_cb(void *opaque);
                    108: 
1.1.1.2   root      109: static int qemu_rbd_next_tok(char *dst, int dst_len,
                    110:                              char *src, char delim,
                    111:                              const char *name,
                    112:                              char **p)
1.1       root      113: {
                    114:     int l;
                    115:     char *end;
                    116: 
                    117:     *p = NULL;
                    118: 
                    119:     if (delim != '\0') {
1.1.1.3   root      120:         for (end = src; *end; ++end) {
                    121:             if (*end == delim) {
                    122:                 break;
                    123:             }
                    124:             if (*end == '\\' && end[1] != '\0') {
                    125:                 end++;
                    126:             }
                    127:         }
                    128:         if (*end == delim) {
1.1       root      129:             *p = end + 1;
                    130:             *end = '\0';
                    131:         }
                    132:     }
                    133:     l = strlen(src);
                    134:     if (l >= dst_len) {
                    135:         error_report("%s too long", name);
                    136:         return -EINVAL;
                    137:     } else if (l == 0) {
                    138:         error_report("%s too short", name);
                    139:         return -EINVAL;
                    140:     }
                    141: 
                    142:     pstrcpy(dst, dst_len, src);
                    143: 
                    144:     return 0;
                    145: }
                    146: 
1.1.1.3   root      147: static void qemu_rbd_unescape(char *src)
                    148: {
                    149:     char *p;
                    150: 
                    151:     for (p = src; *src; ++src, ++p) {
                    152:         if (*src == '\\' && src[1] != '\0') {
                    153:             src++;
                    154:         }
                    155:         *p = *src;
                    156:     }
                    157:     *p = '\0';
                    158: }
                    159: 
1.1.1.2   root      160: static int qemu_rbd_parsename(const char *filename,
                    161:                               char *pool, int pool_len,
                    162:                               char *snap, int snap_len,
                    163:                               char *name, int name_len,
                    164:                               char *conf, int conf_len)
1.1       root      165: {
                    166:     const char *start;
                    167:     char *p, *buf;
                    168:     int ret;
                    169: 
                    170:     if (!strstart(filename, "rbd:", &start)) {
                    171:         return -EINVAL;
                    172:     }
                    173: 
1.1.1.3   root      174:     buf = g_strdup(start);
1.1       root      175:     p = buf;
1.1.1.2   root      176:     *snap = '\0';
                    177:     *conf = '\0';
1.1       root      178: 
1.1.1.2   root      179:     ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p);
1.1       root      180:     if (ret < 0 || !p) {
                    181:         ret = -EINVAL;
                    182:         goto done;
                    183:     }
1.1.1.3   root      184:     qemu_rbd_unescape(pool);
1.1.1.2   root      185: 
                    186:     if (strchr(p, '@')) {
                    187:         ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p);
                    188:         if (ret < 0) {
                    189:             goto done;
                    190:         }
                    191:         ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p);
1.1.1.3   root      192:         qemu_rbd_unescape(snap);
1.1.1.2   root      193:     } else {
                    194:         ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p);
1.1       root      195:     }
1.1.1.3   root      196:     qemu_rbd_unescape(name);
1.1.1.2   root      197:     if (ret < 0 || !p) {
1.1       root      198:         goto done;
                    199:     }
                    200: 
1.1.1.2   root      201:     ret = qemu_rbd_next_tok(conf, conf_len, p, '\0', "configuration", &p);
1.1       root      202: 
                    203: done:
1.1.1.3   root      204:     g_free(buf);
1.1       root      205:     return ret;
                    206: }
                    207: 
1.1.1.3   root      208: static char *qemu_rbd_parse_clientname(const char *conf, char *clientname)
                    209: {
                    210:     const char *p = conf;
                    211: 
                    212:     while (*p) {
                    213:         int len;
                    214:         const char *end = strchr(p, ':');
                    215: 
                    216:         if (end) {
                    217:             len = end - p;
                    218:         } else {
                    219:             len = strlen(p);
                    220:         }
                    221: 
                    222:         if (strncmp(p, "id=", 3) == 0) {
                    223:             len -= 3;
                    224:             strncpy(clientname, p + 3, len);
                    225:             clientname[len] = '\0';
                    226:             return clientname;
                    227:         }
                    228:         if (end == NULL) {
                    229:             break;
                    230:         }
                    231:         p = end + 1;
                    232:     }
                    233:     return NULL;
                    234: }
                    235: 
1.1.1.2   root      236: static int qemu_rbd_set_conf(rados_t cluster, const char *conf)
1.1       root      237: {
1.1.1.2   root      238:     char *p, *buf;
                    239:     char name[RBD_MAX_CONF_NAME_SIZE];
                    240:     char value[RBD_MAX_CONF_VAL_SIZE];
                    241:     int ret = 0;
1.1       root      242: 
1.1.1.3   root      243:     buf = g_strdup(conf);
1.1.1.2   root      244:     p = buf;
1.1       root      245: 
1.1.1.2   root      246:     while (p) {
                    247:         ret = qemu_rbd_next_tok(name, sizeof(name), p,
                    248:                                 '=', "conf option name", &p);
                    249:         if (ret < 0) {
                    250:             break;
                    251:         }
1.1.1.3   root      252:         qemu_rbd_unescape(name);
1.1       root      253: 
1.1.1.2   root      254:         if (!p) {
                    255:             error_report("conf option %s has no value", name);
                    256:             ret = -EINVAL;
                    257:             break;
                    258:         }
1.1       root      259: 
1.1.1.2   root      260:         ret = qemu_rbd_next_tok(value, sizeof(value), p,
                    261:                                 ':', "conf option value", &p);
                    262:         if (ret < 0) {
                    263:             break;
                    264:         }
1.1.1.3   root      265:         qemu_rbd_unescape(value);
1.1       root      266: 
1.1.1.3   root      267:         if (strcmp(name, "conf") == 0) {
                    268:             ret = rados_conf_read_file(cluster, value);
1.1.1.2   root      269:             if (ret < 0) {
1.1.1.3   root      270:                 error_report("error reading conf file %s", value);
1.1.1.2   root      271:                 break;
                    272:             }
1.1.1.3   root      273:         } else if (strcmp(name, "id") == 0) {
                    274:             /* ignore, this is parsed by qemu_rbd_parse_clientname() */
1.1.1.2   root      275:         } else {
1.1.1.3   root      276:             ret = rados_conf_set(cluster, name, value);
1.1.1.2   root      277:             if (ret < 0) {
1.1.1.3   root      278:                 error_report("invalid conf option %s", name);
                    279:                 ret = -EINVAL;
1.1.1.2   root      280:                 break;
                    281:             }
                    282:         }
1.1       root      283:     }
                    284: 
1.1.1.3   root      285:     g_free(buf);
1.1       root      286:     return ret;
                    287: }
                    288: 
1.1.1.2   root      289: static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options)
1.1       root      290: {
                    291:     int64_t bytes = 0;
                    292:     int64_t objsize;
1.1.1.2   root      293:     int obj_order = 0;
                    294:     char pool[RBD_MAX_POOL_NAME_SIZE];
                    295:     char name[RBD_MAX_IMAGE_NAME_SIZE];
                    296:     char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
                    297:     char conf[RBD_MAX_CONF_SIZE];
1.1.1.3   root      298:     char clientname_buf[RBD_MAX_CONF_SIZE];
                    299:     char *clientname;
1.1.1.2   root      300:     rados_t cluster;
                    301:     rados_ioctx_t io_ctx;
1.1       root      302:     int ret;
                    303: 
1.1.1.2   root      304:     if (qemu_rbd_parsename(filename, pool, sizeof(pool),
                    305:                            snap_buf, sizeof(snap_buf),
                    306:                            name, sizeof(name),
                    307:                            conf, sizeof(conf)) < 0) {
1.1       root      308:         return -EINVAL;
                    309:     }
                    310: 
                    311:     /* Read out options */
                    312:     while (options && options->name) {
                    313:         if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
                    314:             bytes = options->value.n;
                    315:         } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
                    316:             if (options->value.n) {
                    317:                 objsize = options->value.n;
                    318:                 if ((objsize - 1) & objsize) {    /* not a power of 2? */
                    319:                     error_report("obj size needs to be power of 2");
                    320:                     return -EINVAL;
                    321:                 }
                    322:                 if (objsize < 4096) {
                    323:                     error_report("obj size too small");
                    324:                     return -EINVAL;
                    325:                 }
1.1.1.2   root      326:                 obj_order = ffs(objsize) - 1;
1.1       root      327:             }
                    328:         }
                    329:         options++;
                    330:     }
                    331: 
1.1.1.3   root      332:     clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
                    333:     if (rados_create(&cluster, clientname) < 0) {
1.1       root      334:         error_report("error initializing");
                    335:         return -EIO;
                    336:     }
                    337: 
1.1.1.2   root      338:     if (strstr(conf, "conf=") == NULL) {
1.1.1.3   root      339:         /* try default location, but ignore failure */
                    340:         rados_conf_read_file(cluster, NULL);
1.1       root      341:     }
                    342: 
1.1.1.2   root      343:     if (conf[0] != '\0' &&
                    344:         qemu_rbd_set_conf(cluster, conf) < 0) {
                    345:         error_report("error setting config options");
                    346:         rados_shutdown(cluster);
                    347:         return -EIO;
1.1       root      348:     }
                    349: 
1.1.1.2   root      350:     if (rados_connect(cluster) < 0) {
                    351:         error_report("error connecting");
                    352:         rados_shutdown(cluster);
1.1       root      353:         return -EIO;
                    354:     }
                    355: 
1.1.1.2   root      356:     if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) {
                    357:         error_report("error opening pool %s", pool);
                    358:         rados_shutdown(cluster);
                    359:         return -EIO;
1.1       root      360:     }
                    361: 
1.1.1.2   root      362:     ret = rbd_create(io_ctx, name, bytes, &obj_order);
                    363:     rados_ioctx_destroy(io_ctx);
                    364:     rados_shutdown(cluster);
1.1       root      365: 
                    366:     return ret;
                    367: }
                    368: 
                    369: /*
1.1.1.2   root      370:  * This aio completion is being called from qemu_rbd_aio_event_reader()
                    371:  * and runs in qemu context. It schedules a bh, but just in case the aio
1.1       root      372:  * was not cancelled before.
                    373:  */
1.1.1.2   root      374: static void qemu_rbd_complete_aio(RADOSCB *rcb)
1.1       root      375: {
                    376:     RBDAIOCB *acb = rcb->acb;
                    377:     int64_t r;
                    378: 
                    379:     if (acb->cancelled) {
1.1.1.2   root      380:         qemu_vfree(acb->bounce);
                    381:         qemu_aio_release(acb);
1.1       root      382:         goto done;
                    383:     }
                    384: 
                    385:     r = rcb->ret;
                    386: 
1.1.1.4 ! root      387:     if (acb->cmd == RBD_AIO_WRITE ||
        !           388:         acb->cmd == RBD_AIO_DISCARD) {
1.1       root      389:         if (r < 0) {
                    390:             acb->ret = r;
                    391:             acb->error = 1;
                    392:         } else if (!acb->error) {
1.1.1.2   root      393:             acb->ret = rcb->size;
1.1       root      394:         }
                    395:     } else {
1.1.1.2   root      396:         if (r < 0) {
                    397:             memset(rcb->buf, 0, rcb->size);
1.1       root      398:             acb->ret = r;
                    399:             acb->error = 1;
1.1.1.2   root      400:         } else if (r < rcb->size) {
                    401:             memset(rcb->buf + r, 0, rcb->size - r);
1.1       root      402:             if (!acb->error) {
1.1.1.2   root      403:                 acb->ret = rcb->size;
1.1       root      404:             }
                    405:         } else if (!acb->error) {
1.1.1.2   root      406:             acb->ret = r;
1.1       root      407:         }
                    408:     }
                    409:     /* Note that acb->bh can be NULL in case where the aio was cancelled */
1.1.1.2   root      410:     acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
                    411:     qemu_bh_schedule(acb->bh);
1.1       root      412: done:
1.1.1.3   root      413:     g_free(rcb);
1.1       root      414: }
                    415: 
                    416: /*
                    417:  * aio fd read handler. It runs in the qemu context and calls the
                    418:  * completion handling of completed rados aio operations.
                    419:  */
1.1.1.2   root      420: static void qemu_rbd_aio_event_reader(void *opaque)
1.1       root      421: {
                    422:     BDRVRBDState *s = opaque;
                    423: 
                    424:     ssize_t ret;
                    425: 
                    426:     do {
                    427:         char *p = (char *)&s->event_rcb;
                    428: 
                    429:         /* now read the rcb pointer that was sent from a non qemu thread */
1.1.1.3   root      430:         ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos,
                    431:                    sizeof(s->event_rcb) - s->event_reader_pos);
                    432:         if (ret > 0) {
                    433:             s->event_reader_pos += ret;
                    434:             if (s->event_reader_pos == sizeof(s->event_rcb)) {
                    435:                 s->event_reader_pos = 0;
                    436:                 qemu_rbd_complete_aio(s->event_rcb);
                    437:                 s->qemu_aio_count--;
1.1       root      438:             }
                    439:         }
                    440:     } while (ret < 0 && errno == EINTR);
                    441: }
                    442: 
1.1.1.2   root      443: static int qemu_rbd_aio_flush_cb(void *opaque)
1.1       root      444: {
                    445:     BDRVRBDState *s = opaque;
                    446: 
                    447:     return (s->qemu_aio_count > 0);
                    448: }
                    449: 
1.1.1.2   root      450: static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags)
1.1       root      451: {
                    452:     BDRVRBDState *s = bs->opaque;
1.1.1.2   root      453:     char pool[RBD_MAX_POOL_NAME_SIZE];
                    454:     char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
                    455:     char conf[RBD_MAX_CONF_SIZE];
1.1.1.3   root      456:     char clientname_buf[RBD_MAX_CONF_SIZE];
                    457:     char *clientname;
1.1       root      458:     int r;
                    459: 
1.1.1.2   root      460:     if (qemu_rbd_parsename(filename, pool, sizeof(pool),
                    461:                            snap_buf, sizeof(snap_buf),
                    462:                            s->name, sizeof(s->name),
                    463:                            conf, sizeof(conf)) < 0) {
1.1       root      464:         return -EINVAL;
                    465:     }
                    466: 
1.1.1.3   root      467:     clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
                    468:     r = rados_create(&s->cluster, clientname);
1.1.1.2   root      469:     if (r < 0) {
1.1       root      470:         error_report("error initializing");
                    471:         return r;
                    472:     }
                    473: 
1.1.1.3   root      474:     s->snap = NULL;
                    475:     if (snap_buf[0] != '\0') {
                    476:         s->snap = g_strdup(snap_buf);
                    477:     }
                    478: 
1.1.1.2   root      479:     if (strstr(conf, "conf=") == NULL) {
1.1.1.3   root      480:         /* try default location, but ignore failure */
                    481:         rados_conf_read_file(s->cluster, NULL);
1.1       root      482:     }
                    483: 
1.1.1.2   root      484:     if (conf[0] != '\0') {
                    485:         r = qemu_rbd_set_conf(s->cluster, conf);
                    486:         if (r < 0) {
                    487:             error_report("error setting config options");
1.1.1.3   root      488:             goto failed_shutdown;
1.1.1.2   root      489:         }
1.1       root      490:     }
                    491: 
1.1.1.2   root      492:     r = rados_connect(s->cluster);
                    493:     if (r < 0) {
                    494:         error_report("error connecting");
1.1.1.3   root      495:         goto failed_shutdown;
1.1       root      496:     }
                    497: 
1.1.1.2   root      498:     r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);
                    499:     if (r < 0) {
                    500:         error_report("error opening pool %s", pool);
1.1.1.3   root      501:         goto failed_shutdown;
1.1       root      502:     }
                    503: 
1.1.1.2   root      504:     r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);
1.1       root      505:     if (r < 0) {
1.1.1.2   root      506:         error_report("error reading header from %s", s->name);
1.1.1.3   root      507:         goto failed_open;
1.1       root      508:     }
                    509: 
1.1.1.2   root      510:     bs->read_only = (s->snap != NULL);
1.1       root      511: 
                    512:     s->event_reader_pos = 0;
                    513:     r = qemu_pipe(s->fds);
                    514:     if (r < 0) {
                    515:         error_report("error opening eventfd");
                    516:         goto failed;
                    517:     }
                    518:     fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
                    519:     fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
1.1.1.2   root      520:     qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader,
1.1.1.4 ! root      521:                             NULL, qemu_rbd_aio_flush_cb, s);
1.1       root      522: 
                    523: 
                    524:     return 0;
                    525: 
                    526: failed:
1.1.1.2   root      527:     rbd_close(s->image);
1.1.1.3   root      528: failed_open:
1.1.1.2   root      529:     rados_ioctx_destroy(s->io_ctx);
1.1.1.3   root      530: failed_shutdown:
1.1.1.2   root      531:     rados_shutdown(s->cluster);
1.1.1.3   root      532:     g_free(s->snap);
1.1       root      533:     return r;
                    534: }
                    535: 
1.1.1.2   root      536: static void qemu_rbd_close(BlockDriverState *bs)
1.1       root      537: {
                    538:     BDRVRBDState *s = bs->opaque;
                    539: 
                    540:     close(s->fds[0]);
                    541:     close(s->fds[1]);
1.1.1.4 ! root      542:     qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL, NULL, NULL, NULL);
1.1       root      543: 
1.1.1.2   root      544:     rbd_close(s->image);
                    545:     rados_ioctx_destroy(s->io_ctx);
1.1.1.3   root      546:     g_free(s->snap);
1.1.1.2   root      547:     rados_shutdown(s->cluster);
1.1       root      548: }
                    549: 
                    550: /*
                    551:  * Cancel aio. Since we don't reference acb in a non qemu threads,
                    552:  * it is safe to access it here.
                    553:  */
1.1.1.2   root      554: static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
1.1       root      555: {
                    556:     RBDAIOCB *acb = (RBDAIOCB *) blockacb;
                    557:     acb->cancelled = 1;
                    558: }
                    559: 
                    560: static AIOPool rbd_aio_pool = {
                    561:     .aiocb_size = sizeof(RBDAIOCB),
1.1.1.2   root      562:     .cancel = qemu_rbd_aio_cancel,
1.1       root      563: };
                    564: 
1.1.1.2   root      565: static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb)
1.1       root      566: {
1.1.1.2   root      567:     int ret = 0;
1.1       root      568:     while (1) {
                    569:         fd_set wfd;
1.1.1.2   root      570:         int fd = s->fds[RBD_FD_WRITE];
1.1       root      571: 
1.1.1.2   root      572:         /* send the op pointer to the qemu thread that is responsible
                    573:            for the aio/op completion. Must do it in a qemu thread context */
1.1       root      574:         ret = write(fd, (void *)&rcb, sizeof(rcb));
                    575:         if (ret >= 0) {
                    576:             break;
                    577:         }
                    578:         if (errno == EINTR) {
                    579:             continue;
1.1.1.2   root      580:         }
1.1       root      581:         if (errno != EAGAIN) {
                    582:             break;
1.1.1.2   root      583:         }
1.1       root      584: 
                    585:         FD_ZERO(&wfd);
                    586:         FD_SET(fd, &wfd);
                    587:         do {
                    588:             ret = select(fd + 1, NULL, &wfd, NULL, NULL);
                    589:         } while (ret < 0 && errno == EINTR);
                    590:     }
                    591: 
1.1.1.2   root      592:     return ret;
                    593: }
                    594: 
                    595: /*
                    596:  * This is the callback function for rbd_aio_read and _write
                    597:  *
                    598:  * Note: this function is being called from a non qemu thread so
                    599:  * we need to be careful about what we do here. Generally we only
                    600:  * write to the block notification pipe, and do the rest of the
                    601:  * io completion handling from qemu_rbd_aio_event_reader() which
                    602:  * runs in a qemu context.
                    603:  */
                    604: static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
                    605: {
                    606:     int ret;
                    607:     rcb->ret = rbd_aio_get_return_value(c);
                    608:     rbd_aio_release(c);
                    609:     ret = qemu_rbd_send_pipe(rcb->s, rcb);
1.1       root      610:     if (ret < 0) {
1.1.1.2   root      611:         error_report("failed writing to acb->s->fds");
1.1.1.3   root      612:         g_free(rcb);
1.1       root      613:     }
                    614: }
                    615: 
1.1.1.2   root      616: /* Callback when all queued rbd_aio requests are complete */
1.1       root      617: 
                    618: static void rbd_aio_bh_cb(void *opaque)
                    619: {
                    620:     RBDAIOCB *acb = opaque;
                    621: 
1.1.1.4 ! root      622:     if (acb->cmd == RBD_AIO_READ) {
1.1       root      623:         qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
                    624:     }
                    625:     qemu_vfree(acb->bounce);
                    626:     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
                    627:     qemu_bh_delete(acb->bh);
                    628:     acb->bh = NULL;
                    629: 
                    630:     qemu_aio_release(acb);
                    631: }
                    632: 
1.1.1.4 ! root      633: static int rbd_aio_discard_wrapper(rbd_image_t image,
        !           634:                                    uint64_t off,
        !           635:                                    uint64_t len,
        !           636:                                    rbd_completion_t comp)
        !           637: {
        !           638: #ifdef LIBRBD_SUPPORTS_DISCARD
        !           639:     return rbd_aio_discard(image, off, len, comp);
        !           640: #else
        !           641:     return -ENOTSUP;
        !           642: #endif
        !           643: }
        !           644: 
        !           645: static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
        !           646:                                        int64_t sector_num,
        !           647:                                        QEMUIOVector *qiov,
        !           648:                                        int nb_sectors,
        !           649:                                        BlockDriverCompletionFunc *cb,
        !           650:                                        void *opaque,
        !           651:                                        RBDAIOCmd cmd)
1.1       root      652: {
                    653:     RBDAIOCB *acb;
                    654:     RADOSCB *rcb;
1.1.1.2   root      655:     rbd_completion_t c;
1.1       root      656:     int64_t off, size;
                    657:     char *buf;
1.1.1.2   root      658:     int r;
1.1       root      659: 
                    660:     BDRVRBDState *s = bs->opaque;
                    661: 
                    662:     acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
1.1.1.4 ! root      663:     acb->cmd = cmd;
1.1       root      664:     acb->qiov = qiov;
1.1.1.4 ! root      665:     if (cmd == RBD_AIO_DISCARD) {
        !           666:         acb->bounce = NULL;
        !           667:     } else {
        !           668:         acb->bounce = qemu_blockalign(bs, qiov->size);
        !           669:     }
1.1       root      670:     acb->ret = 0;
                    671:     acb->error = 0;
                    672:     acb->s = s;
                    673:     acb->cancelled = 0;
                    674:     acb->bh = NULL;
                    675: 
1.1.1.4 ! root      676:     if (cmd == RBD_AIO_WRITE) {
1.1       root      677:         qemu_iovec_to_buffer(acb->qiov, acb->bounce);
                    678:     }
                    679: 
                    680:     buf = acb->bounce;
                    681: 
                    682:     off = sector_num * BDRV_SECTOR_SIZE;
                    683:     size = nb_sectors * BDRV_SECTOR_SIZE;
                    684: 
1.1.1.2   root      685:     s->qemu_aio_count++; /* All the RADOSCB */
1.1       root      686: 
1.1.1.3   root      687:     rcb = g_malloc(sizeof(RADOSCB));
1.1.1.2   root      688:     rcb->done = 0;
                    689:     rcb->acb = acb;
                    690:     rcb->buf = buf;
                    691:     rcb->s = acb->s;
                    692:     rcb->size = size;
                    693:     r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
                    694:     if (r < 0) {
                    695:         goto failed;
                    696:     }
                    697: 
1.1.1.4 ! root      698:     switch (cmd) {
        !           699:     case RBD_AIO_WRITE:
1.1.1.2   root      700:         r = rbd_aio_write(s->image, off, size, buf, c);
1.1.1.4 ! root      701:         break;
        !           702:     case RBD_AIO_READ:
1.1.1.2   root      703:         r = rbd_aio_read(s->image, off, size, buf, c);
1.1.1.4 ! root      704:         break;
        !           705:     case RBD_AIO_DISCARD:
        !           706:         r = rbd_aio_discard_wrapper(s->image, off, size, c);
        !           707:         break;
        !           708:     default:
        !           709:         r = -EINVAL;
1.1.1.2   root      710:     }
1.1       root      711: 
1.1.1.2   root      712:     if (r < 0) {
                    713:         goto failed;
1.1       root      714:     }
                    715: 
                    716:     return &acb->common;
1.1.1.2   root      717: 
                    718: failed:
1.1.1.3   root      719:     g_free(rcb);
1.1.1.2   root      720:     s->qemu_aio_count--;
                    721:     qemu_aio_release(acb);
                    722:     return NULL;
1.1       root      723: }
                    724: 
1.1.1.2   root      725: static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
                    726:                                             int64_t sector_num,
                    727:                                             QEMUIOVector *qiov,
                    728:                                             int nb_sectors,
                    729:                                             BlockDriverCompletionFunc *cb,
                    730:                                             void *opaque)
1.1       root      731: {
1.1.1.4 ! root      732:     return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
        !           733:                          RBD_AIO_READ);
1.1       root      734: }
                    735: 
1.1.1.2   root      736: static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
                    737:                                              int64_t sector_num,
                    738:                                              QEMUIOVector *qiov,
                    739:                                              int nb_sectors,
                    740:                                              BlockDriverCompletionFunc *cb,
                    741:                                              void *opaque)
1.1       root      742: {
1.1.1.4 ! root      743:     return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
        !           744:                          RBD_AIO_WRITE);
1.1       root      745: }
                    746: 
1.1.1.3   root      747: static int qemu_rbd_co_flush(BlockDriverState *bs)
                    748: {
                    749: #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
                    750:     /* rbd_flush added in 0.1.1 */
                    751:     BDRVRBDState *s = bs->opaque;
                    752:     return rbd_flush(s->image);
                    753: #else
                    754:     return 0;
                    755: #endif
                    756: }
                    757: 
1.1.1.2   root      758: static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
1.1       root      759: {
                    760:     BDRVRBDState *s = bs->opaque;
1.1.1.2   root      761:     rbd_image_info_t info;
                    762:     int r;
                    763: 
                    764:     r = rbd_stat(s->image, &info, sizeof(info));
                    765:     if (r < 0) {
                    766:         return r;
                    767:     }
                    768: 
                    769:     bdi->cluster_size = info.obj_size;
1.1       root      770:     return 0;
                    771: }
                    772: 
1.1.1.2   root      773: static int64_t qemu_rbd_getlength(BlockDriverState *bs)
                    774: {
                    775:     BDRVRBDState *s = bs->opaque;
                    776:     rbd_image_info_t info;
                    777:     int r;
                    778: 
                    779:     r = rbd_stat(s->image, &info, sizeof(info));
                    780:     if (r < 0) {
                    781:         return r;
                    782:     }
                    783: 
                    784:     return info.size;
                    785: }
                    786: 
                    787: static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset)
1.1       root      788: {
                    789:     BDRVRBDState *s = bs->opaque;
1.1.1.2   root      790:     int r;
                    791: 
                    792:     r = rbd_resize(s->image, offset);
                    793:     if (r < 0) {
                    794:         return r;
                    795:     }
1.1       root      796: 
1.1.1.2   root      797:     return 0;
1.1       root      798: }
                    799: 
1.1.1.2   root      800: static int qemu_rbd_snap_create(BlockDriverState *bs,
                    801:                                 QEMUSnapshotInfo *sn_info)
1.1       root      802: {
                    803:     BDRVRBDState *s = bs->opaque;
                    804:     int r;
                    805: 
                    806:     if (sn_info->name[0] == '\0') {
                    807:         return -EINVAL; /* we need a name for rbd snapshots */
                    808:     }
                    809: 
                    810:     /*
                    811:      * rbd snapshots are using the name as the user controlled unique identifier
                    812:      * we can't use the rbd snapid for that purpose, as it can't be set
                    813:      */
                    814:     if (sn_info->id_str[0] != '\0' &&
                    815:         strcmp(sn_info->id_str, sn_info->name) != 0) {
                    816:         return -EINVAL;
                    817:     }
                    818: 
                    819:     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
                    820:         return -ERANGE;
                    821:     }
                    822: 
1.1.1.2   root      823:     r = rbd_snap_create(s->image, sn_info->name);
1.1       root      824:     if (r < 0) {
1.1.1.2   root      825:         error_report("failed to create snap: %s", strerror(-r));
1.1       root      826:         return r;
                    827:     }
                    828: 
                    829:     return 0;
                    830: }
                    831: 
1.1.1.4 ! root      832: static int qemu_rbd_snap_remove(BlockDriverState *bs,
        !           833:                                 const char *snapshot_name)
        !           834: {
        !           835:     BDRVRBDState *s = bs->opaque;
        !           836:     int r;
        !           837: 
        !           838:     r = rbd_snap_remove(s->image, snapshot_name);
        !           839:     return r;
        !           840: }
        !           841: 
        !           842: static int qemu_rbd_snap_rollback(BlockDriverState *bs,
        !           843:                                   const char *snapshot_name)
        !           844: {
        !           845:     BDRVRBDState *s = bs->opaque;
        !           846:     int r;
        !           847: 
        !           848:     r = rbd_snap_rollback(s->image, snapshot_name);
        !           849:     return r;
        !           850: }
        !           851: 
1.1.1.2   root      852: static int qemu_rbd_snap_list(BlockDriverState *bs,
                    853:                               QEMUSnapshotInfo **psn_tab)
1.1       root      854: {
                    855:     BDRVRBDState *s = bs->opaque;
                    856:     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1.1.1.2   root      857:     int i, snap_count;
                    858:     rbd_snap_info_t *snaps;
                    859:     int max_snaps = RBD_MAX_SNAPS;
1.1       root      860: 
1.1.1.2   root      861:     do {
1.1.1.3   root      862:         snaps = g_malloc(sizeof(*snaps) * max_snaps);
1.1.1.2   root      863:         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
                    864:         if (snap_count < 0) {
1.1.1.3   root      865:             g_free(snaps);
1.1       root      866:         }
1.1.1.2   root      867:     } while (snap_count == -ERANGE);
1.1       root      868: 
1.1.1.2   root      869:     if (snap_count <= 0) {
1.1.1.3   root      870:         goto done;
1.1       root      871:     }
                    872: 
1.1.1.3   root      873:     sn_tab = g_malloc0(snap_count * sizeof(QEMUSnapshotInfo));
1.1       root      874: 
1.1.1.2   root      875:     for (i = 0; i < snap_count; i++) {
                    876:         const char *snap_name = snaps[i].name;
1.1       root      877: 
                    878:         sn_info = sn_tab + i;
                    879:         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
                    880:         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
                    881: 
1.1.1.2   root      882:         sn_info->vm_state_size = snaps[i].size;
1.1       root      883:         sn_info->date_sec = 0;
                    884:         sn_info->date_nsec = 0;
                    885:         sn_info->vm_clock_nsec = 0;
                    886:     }
1.1.1.2   root      887:     rbd_snap_list_end(snaps);
                    888: 
1.1.1.3   root      889:  done:
1.1       root      890:     *psn_tab = sn_tab;
                    891:     return snap_count;
                    892: }
                    893: 
1.1.1.4 ! root      894: #ifdef LIBRBD_SUPPORTS_DISCARD
        !           895: static BlockDriverAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs,
        !           896:                                               int64_t sector_num,
        !           897:                                               int nb_sectors,
        !           898:                                               BlockDriverCompletionFunc *cb,
        !           899:                                               void *opaque)
        !           900: {
        !           901:     return rbd_start_aio(bs, sector_num, NULL, nb_sectors, cb, opaque,
        !           902:                          RBD_AIO_DISCARD);
        !           903: }
        !           904: #endif
        !           905: 
1.1.1.2   root      906: static QEMUOptionParameter qemu_rbd_create_options[] = {
1.1       root      907:     {
                    908:      .name = BLOCK_OPT_SIZE,
                    909:      .type = OPT_SIZE,
                    910:      .help = "Virtual disk size"
                    911:     },
                    912:     {
                    913:      .name = BLOCK_OPT_CLUSTER_SIZE,
                    914:      .type = OPT_SIZE,
                    915:      .help = "RBD object size"
                    916:     },
                    917:     {NULL}
                    918: };
                    919: 
                    920: static BlockDriver bdrv_rbd = {
                    921:     .format_name        = "rbd",
                    922:     .instance_size      = sizeof(BDRVRBDState),
1.1.1.2   root      923:     .bdrv_file_open     = qemu_rbd_open,
                    924:     .bdrv_close         = qemu_rbd_close,
                    925:     .bdrv_create        = qemu_rbd_create,
                    926:     .bdrv_get_info      = qemu_rbd_getinfo,
                    927:     .create_options     = qemu_rbd_create_options,
                    928:     .bdrv_getlength     = qemu_rbd_getlength,
                    929:     .bdrv_truncate      = qemu_rbd_truncate,
1.1       root      930:     .protocol_name      = "rbd",
                    931: 
1.1.1.3   root      932:     .bdrv_aio_readv         = qemu_rbd_aio_readv,
                    933:     .bdrv_aio_writev        = qemu_rbd_aio_writev,
                    934:     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1.1       root      935: 
1.1.1.4 ! root      936: #ifdef LIBRBD_SUPPORTS_DISCARD
        !           937:     .bdrv_aio_discard       = qemu_rbd_aio_discard,
        !           938: #endif
        !           939: 
1.1.1.3   root      940:     .bdrv_snapshot_create   = qemu_rbd_snap_create,
1.1.1.4 ! root      941:     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1.1.1.3   root      942:     .bdrv_snapshot_list     = qemu_rbd_snap_list,
1.1.1.4 ! root      943:     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1.1       root      944: };
                    945: 
                    946: static void bdrv_rbd_init(void)
                    947: {
                    948:     bdrv_register(&bdrv_rbd);
                    949: }
                    950: 
                    951: block_init(bdrv_rbd_init);

unix.superglobalmegacorp.com

This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.