|
|
1.1 root 1: /*
2: * QEMU Block driver for RADOS (Ceph)
3: *
1.1.1.2 root 4: * Copyright (C) 2010-2011 Christian Brunner <[email protected]>,
5: * Josh Durgin <[email protected]>
1.1 root 6: *
7: * This work is licensed under the terms of the GNU GPL, version 2. See
8: * the COPYING file in the top-level directory.
9: *
1.1.1.4 ! root 10: * Contributions after 2012-01-13 are licensed under the terms of the
! 11: * GNU GPL, version 2 or (at your option) any later version.
1.1 root 12: */
13:
1.1.1.2 root 14: #include <inttypes.h>
15:
1.1 root 16: #include "qemu-common.h"
17: #include "qemu-error.h"
18: #include "block_int.h"
19:
1.1.1.2 root 20: #include <rbd/librbd.h>
1.1 root 21:
22: /*
23: * When specifying the image filename use:
24: *
1.1.1.2 root 25: * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
1.1 root 26: *
1.1.1.3 root 27: * poolname must be the name of an existing rados pool.
1.1 root 28: *
1.1.1.3 root 29: * devicename is the name of the rbd image.
1.1 root 30: *
1.1.1.3 root 31: * Each option given is used to configure rados, and may be any valid
32: * Ceph option, "id", or "conf".
1.1.1.2 root 33: *
1.1.1.3 root 34: * The "id" option indicates what user we should authenticate as to
35: * the Ceph cluster. If it is excluded we will use the Ceph default
36: * (normally 'admin').
1.1 root 37: *
1.1.1.3 root 38: * The "conf" option specifies a Ceph configuration file to read. If
39: * it is not specified, we will read from the default Ceph locations
40: * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration
41: * file, specify conf=/dev/null.
1.1 root 42: *
1.1.1.3 root 43: * Configuration values containing :, @, or = can be escaped with a
44: * leading "\".
1.1 root 45: */
46:
1.1.1.4 ! root 47: /* rbd_aio_discard added in 0.1.2 */
! 48: #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
! 49: #define LIBRBD_SUPPORTS_DISCARD
! 50: #else
! 51: #undef LIBRBD_SUPPORTS_DISCARD
! 52: #endif
! 53:
1.1 root 54: #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
55:
1.1.1.2 root 56: #define RBD_MAX_CONF_NAME_SIZE 128
57: #define RBD_MAX_CONF_VAL_SIZE 512
58: #define RBD_MAX_CONF_SIZE 1024
59: #define RBD_MAX_POOL_NAME_SIZE 128
60: #define RBD_MAX_SNAP_NAME_SIZE 128
61: #define RBD_MAX_SNAPS 100
62:
1.1.1.4 ! root 63: typedef enum {
! 64: RBD_AIO_READ,
! 65: RBD_AIO_WRITE,
! 66: RBD_AIO_DISCARD
! 67: } RBDAIOCmd;
! 68:
1.1 root 69: typedef struct RBDAIOCB {
70: BlockDriverAIOCB common;
71: QEMUBH *bh;
72: int ret;
73: QEMUIOVector *qiov;
74: char *bounce;
1.1.1.4 ! root 75: RBDAIOCmd cmd;
1.1 root 76: int64_t sector_num;
77: int error;
78: struct BDRVRBDState *s;
79: int cancelled;
80: } RBDAIOCB;
81:
82: typedef struct RADOSCB {
83: int rcbid;
84: RBDAIOCB *acb;
85: struct BDRVRBDState *s;
86: int done;
1.1.1.2 root 87: int64_t size;
1.1 root 88: char *buf;
89: int ret;
90: } RADOSCB;
91:
92: #define RBD_FD_READ 0
93: #define RBD_FD_WRITE 1
94:
95: typedef struct BDRVRBDState {
96: int fds[2];
1.1.1.2 root 97: rados_t cluster;
98: rados_ioctx_t io_ctx;
99: rbd_image_t image;
100: char name[RBD_MAX_IMAGE_NAME_SIZE];
1.1 root 101: int qemu_aio_count;
1.1.1.2 root 102: char *snap;
1.1 root 103: int event_reader_pos;
104: RADOSCB *event_rcb;
105: } BDRVRBDState;
106:
107: static void rbd_aio_bh_cb(void *opaque);
108:
1.1.1.2 root 109: static int qemu_rbd_next_tok(char *dst, int dst_len,
110: char *src, char delim,
111: const char *name,
112: char **p)
1.1 root 113: {
114: int l;
115: char *end;
116:
117: *p = NULL;
118:
119: if (delim != '\0') {
1.1.1.3 root 120: for (end = src; *end; ++end) {
121: if (*end == delim) {
122: break;
123: }
124: if (*end == '\\' && end[1] != '\0') {
125: end++;
126: }
127: }
128: if (*end == delim) {
1.1 root 129: *p = end + 1;
130: *end = '\0';
131: }
132: }
133: l = strlen(src);
134: if (l >= dst_len) {
135: error_report("%s too long", name);
136: return -EINVAL;
137: } else if (l == 0) {
138: error_report("%s too short", name);
139: return -EINVAL;
140: }
141:
142: pstrcpy(dst, dst_len, src);
143:
144: return 0;
145: }
146:
1.1.1.3 root 147: static void qemu_rbd_unescape(char *src)
148: {
149: char *p;
150:
151: for (p = src; *src; ++src, ++p) {
152: if (*src == '\\' && src[1] != '\0') {
153: src++;
154: }
155: *p = *src;
156: }
157: *p = '\0';
158: }
159:
1.1.1.2 root 160: static int qemu_rbd_parsename(const char *filename,
161: char *pool, int pool_len,
162: char *snap, int snap_len,
163: char *name, int name_len,
164: char *conf, int conf_len)
1.1 root 165: {
166: const char *start;
167: char *p, *buf;
168: int ret;
169:
170: if (!strstart(filename, "rbd:", &start)) {
171: return -EINVAL;
172: }
173:
1.1.1.3 root 174: buf = g_strdup(start);
1.1 root 175: p = buf;
1.1.1.2 root 176: *snap = '\0';
177: *conf = '\0';
1.1 root 178:
1.1.1.2 root 179: ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p);
1.1 root 180: if (ret < 0 || !p) {
181: ret = -EINVAL;
182: goto done;
183: }
1.1.1.3 root 184: qemu_rbd_unescape(pool);
1.1.1.2 root 185:
186: if (strchr(p, '@')) {
187: ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p);
188: if (ret < 0) {
189: goto done;
190: }
191: ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p);
1.1.1.3 root 192: qemu_rbd_unescape(snap);
1.1.1.2 root 193: } else {
194: ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p);
1.1 root 195: }
1.1.1.3 root 196: qemu_rbd_unescape(name);
1.1.1.2 root 197: if (ret < 0 || !p) {
1.1 root 198: goto done;
199: }
200:
1.1.1.2 root 201: ret = qemu_rbd_next_tok(conf, conf_len, p, '\0', "configuration", &p);
1.1 root 202:
203: done:
1.1.1.3 root 204: g_free(buf);
1.1 root 205: return ret;
206: }
207:
1.1.1.3 root 208: static char *qemu_rbd_parse_clientname(const char *conf, char *clientname)
209: {
210: const char *p = conf;
211:
212: while (*p) {
213: int len;
214: const char *end = strchr(p, ':');
215:
216: if (end) {
217: len = end - p;
218: } else {
219: len = strlen(p);
220: }
221:
222: if (strncmp(p, "id=", 3) == 0) {
223: len -= 3;
224: strncpy(clientname, p + 3, len);
225: clientname[len] = '\0';
226: return clientname;
227: }
228: if (end == NULL) {
229: break;
230: }
231: p = end + 1;
232: }
233: return NULL;
234: }
235:
1.1.1.2 root 236: static int qemu_rbd_set_conf(rados_t cluster, const char *conf)
1.1 root 237: {
1.1.1.2 root 238: char *p, *buf;
239: char name[RBD_MAX_CONF_NAME_SIZE];
240: char value[RBD_MAX_CONF_VAL_SIZE];
241: int ret = 0;
1.1 root 242:
1.1.1.3 root 243: buf = g_strdup(conf);
1.1.1.2 root 244: p = buf;
1.1 root 245:
1.1.1.2 root 246: while (p) {
247: ret = qemu_rbd_next_tok(name, sizeof(name), p,
248: '=', "conf option name", &p);
249: if (ret < 0) {
250: break;
251: }
1.1.1.3 root 252: qemu_rbd_unescape(name);
1.1 root 253:
1.1.1.2 root 254: if (!p) {
255: error_report("conf option %s has no value", name);
256: ret = -EINVAL;
257: break;
258: }
1.1 root 259:
1.1.1.2 root 260: ret = qemu_rbd_next_tok(value, sizeof(value), p,
261: ':', "conf option value", &p);
262: if (ret < 0) {
263: break;
264: }
1.1.1.3 root 265: qemu_rbd_unescape(value);
1.1 root 266:
1.1.1.3 root 267: if (strcmp(name, "conf") == 0) {
268: ret = rados_conf_read_file(cluster, value);
1.1.1.2 root 269: if (ret < 0) {
1.1.1.3 root 270: error_report("error reading conf file %s", value);
1.1.1.2 root 271: break;
272: }
1.1.1.3 root 273: } else if (strcmp(name, "id") == 0) {
274: /* ignore, this is parsed by qemu_rbd_parse_clientname() */
1.1.1.2 root 275: } else {
1.1.1.3 root 276: ret = rados_conf_set(cluster, name, value);
1.1.1.2 root 277: if (ret < 0) {
1.1.1.3 root 278: error_report("invalid conf option %s", name);
279: ret = -EINVAL;
1.1.1.2 root 280: break;
281: }
282: }
1.1 root 283: }
284:
1.1.1.3 root 285: g_free(buf);
1.1 root 286: return ret;
287: }
288:
1.1.1.2 root 289: static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options)
1.1 root 290: {
291: int64_t bytes = 0;
292: int64_t objsize;
1.1.1.2 root 293: int obj_order = 0;
294: char pool[RBD_MAX_POOL_NAME_SIZE];
295: char name[RBD_MAX_IMAGE_NAME_SIZE];
296: char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
297: char conf[RBD_MAX_CONF_SIZE];
1.1.1.3 root 298: char clientname_buf[RBD_MAX_CONF_SIZE];
299: char *clientname;
1.1.1.2 root 300: rados_t cluster;
301: rados_ioctx_t io_ctx;
1.1 root 302: int ret;
303:
1.1.1.2 root 304: if (qemu_rbd_parsename(filename, pool, sizeof(pool),
305: snap_buf, sizeof(snap_buf),
306: name, sizeof(name),
307: conf, sizeof(conf)) < 0) {
1.1 root 308: return -EINVAL;
309: }
310:
311: /* Read out options */
312: while (options && options->name) {
313: if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
314: bytes = options->value.n;
315: } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
316: if (options->value.n) {
317: objsize = options->value.n;
318: if ((objsize - 1) & objsize) { /* not a power of 2? */
319: error_report("obj size needs to be power of 2");
320: return -EINVAL;
321: }
322: if (objsize < 4096) {
323: error_report("obj size too small");
324: return -EINVAL;
325: }
1.1.1.2 root 326: obj_order = ffs(objsize) - 1;
1.1 root 327: }
328: }
329: options++;
330: }
331:
1.1.1.3 root 332: clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
333: if (rados_create(&cluster, clientname) < 0) {
1.1 root 334: error_report("error initializing");
335: return -EIO;
336: }
337:
1.1.1.2 root 338: if (strstr(conf, "conf=") == NULL) {
1.1.1.3 root 339: /* try default location, but ignore failure */
340: rados_conf_read_file(cluster, NULL);
1.1 root 341: }
342:
1.1.1.2 root 343: if (conf[0] != '\0' &&
344: qemu_rbd_set_conf(cluster, conf) < 0) {
345: error_report("error setting config options");
346: rados_shutdown(cluster);
347: return -EIO;
1.1 root 348: }
349:
1.1.1.2 root 350: if (rados_connect(cluster) < 0) {
351: error_report("error connecting");
352: rados_shutdown(cluster);
1.1 root 353: return -EIO;
354: }
355:
1.1.1.2 root 356: if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) {
357: error_report("error opening pool %s", pool);
358: rados_shutdown(cluster);
359: return -EIO;
1.1 root 360: }
361:
1.1.1.2 root 362: ret = rbd_create(io_ctx, name, bytes, &obj_order);
363: rados_ioctx_destroy(io_ctx);
364: rados_shutdown(cluster);
1.1 root 365:
366: return ret;
367: }
368:
369: /*
1.1.1.2 root 370: * This aio completion is being called from qemu_rbd_aio_event_reader()
371: * and runs in qemu context. It schedules a bh, but just in case the aio
1.1 root 372: * was not cancelled before.
373: */
1.1.1.2 root 374: static void qemu_rbd_complete_aio(RADOSCB *rcb)
1.1 root 375: {
376: RBDAIOCB *acb = rcb->acb;
377: int64_t r;
378:
379: if (acb->cancelled) {
1.1.1.2 root 380: qemu_vfree(acb->bounce);
381: qemu_aio_release(acb);
1.1 root 382: goto done;
383: }
384:
385: r = rcb->ret;
386:
1.1.1.4 ! root 387: if (acb->cmd == RBD_AIO_WRITE ||
! 388: acb->cmd == RBD_AIO_DISCARD) {
1.1 root 389: if (r < 0) {
390: acb->ret = r;
391: acb->error = 1;
392: } else if (!acb->error) {
1.1.1.2 root 393: acb->ret = rcb->size;
1.1 root 394: }
395: } else {
1.1.1.2 root 396: if (r < 0) {
397: memset(rcb->buf, 0, rcb->size);
1.1 root 398: acb->ret = r;
399: acb->error = 1;
1.1.1.2 root 400: } else if (r < rcb->size) {
401: memset(rcb->buf + r, 0, rcb->size - r);
1.1 root 402: if (!acb->error) {
1.1.1.2 root 403: acb->ret = rcb->size;
1.1 root 404: }
405: } else if (!acb->error) {
1.1.1.2 root 406: acb->ret = r;
1.1 root 407: }
408: }
409: /* Note that acb->bh can be NULL in case where the aio was cancelled */
1.1.1.2 root 410: acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
411: qemu_bh_schedule(acb->bh);
1.1 root 412: done:
1.1.1.3 root 413: g_free(rcb);
1.1 root 414: }
415:
416: /*
417: * aio fd read handler. It runs in the qemu context and calls the
418: * completion handling of completed rados aio operations.
419: */
1.1.1.2 root 420: static void qemu_rbd_aio_event_reader(void *opaque)
1.1 root 421: {
422: BDRVRBDState *s = opaque;
423:
424: ssize_t ret;
425:
426: do {
427: char *p = (char *)&s->event_rcb;
428:
429: /* now read the rcb pointer that was sent from a non qemu thread */
1.1.1.3 root 430: ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos,
431: sizeof(s->event_rcb) - s->event_reader_pos);
432: if (ret > 0) {
433: s->event_reader_pos += ret;
434: if (s->event_reader_pos == sizeof(s->event_rcb)) {
435: s->event_reader_pos = 0;
436: qemu_rbd_complete_aio(s->event_rcb);
437: s->qemu_aio_count--;
1.1 root 438: }
439: }
440: } while (ret < 0 && errno == EINTR);
441: }
442:
1.1.1.2 root 443: static int qemu_rbd_aio_flush_cb(void *opaque)
1.1 root 444: {
445: BDRVRBDState *s = opaque;
446:
447: return (s->qemu_aio_count > 0);
448: }
449:
1.1.1.2 root 450: static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags)
1.1 root 451: {
452: BDRVRBDState *s = bs->opaque;
1.1.1.2 root 453: char pool[RBD_MAX_POOL_NAME_SIZE];
454: char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
455: char conf[RBD_MAX_CONF_SIZE];
1.1.1.3 root 456: char clientname_buf[RBD_MAX_CONF_SIZE];
457: char *clientname;
1.1 root 458: int r;
459:
1.1.1.2 root 460: if (qemu_rbd_parsename(filename, pool, sizeof(pool),
461: snap_buf, sizeof(snap_buf),
462: s->name, sizeof(s->name),
463: conf, sizeof(conf)) < 0) {
1.1 root 464: return -EINVAL;
465: }
466:
1.1.1.3 root 467: clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
468: r = rados_create(&s->cluster, clientname);
1.1.1.2 root 469: if (r < 0) {
1.1 root 470: error_report("error initializing");
471: return r;
472: }
473:
1.1.1.3 root 474: s->snap = NULL;
475: if (snap_buf[0] != '\0') {
476: s->snap = g_strdup(snap_buf);
477: }
478:
1.1.1.2 root 479: if (strstr(conf, "conf=") == NULL) {
1.1.1.3 root 480: /* try default location, but ignore failure */
481: rados_conf_read_file(s->cluster, NULL);
1.1 root 482: }
483:
1.1.1.2 root 484: if (conf[0] != '\0') {
485: r = qemu_rbd_set_conf(s->cluster, conf);
486: if (r < 0) {
487: error_report("error setting config options");
1.1.1.3 root 488: goto failed_shutdown;
1.1.1.2 root 489: }
1.1 root 490: }
491:
1.1.1.2 root 492: r = rados_connect(s->cluster);
493: if (r < 0) {
494: error_report("error connecting");
1.1.1.3 root 495: goto failed_shutdown;
1.1 root 496: }
497:
1.1.1.2 root 498: r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);
499: if (r < 0) {
500: error_report("error opening pool %s", pool);
1.1.1.3 root 501: goto failed_shutdown;
1.1 root 502: }
503:
1.1.1.2 root 504: r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);
1.1 root 505: if (r < 0) {
1.1.1.2 root 506: error_report("error reading header from %s", s->name);
1.1.1.3 root 507: goto failed_open;
1.1 root 508: }
509:
1.1.1.2 root 510: bs->read_only = (s->snap != NULL);
1.1 root 511:
512: s->event_reader_pos = 0;
513: r = qemu_pipe(s->fds);
514: if (r < 0) {
515: error_report("error opening eventfd");
516: goto failed;
517: }
518: fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
519: fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
1.1.1.2 root 520: qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader,
1.1.1.4 ! root 521: NULL, qemu_rbd_aio_flush_cb, s);
1.1 root 522:
523:
524: return 0;
525:
526: failed:
1.1.1.2 root 527: rbd_close(s->image);
1.1.1.3 root 528: failed_open:
1.1.1.2 root 529: rados_ioctx_destroy(s->io_ctx);
1.1.1.3 root 530: failed_shutdown:
1.1.1.2 root 531: rados_shutdown(s->cluster);
1.1.1.3 root 532: g_free(s->snap);
1.1 root 533: return r;
534: }
535:
1.1.1.2 root 536: static void qemu_rbd_close(BlockDriverState *bs)
1.1 root 537: {
538: BDRVRBDState *s = bs->opaque;
539:
540: close(s->fds[0]);
541: close(s->fds[1]);
1.1.1.4 ! root 542: qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL, NULL, NULL, NULL);
1.1 root 543:
1.1.1.2 root 544: rbd_close(s->image);
545: rados_ioctx_destroy(s->io_ctx);
1.1.1.3 root 546: g_free(s->snap);
1.1.1.2 root 547: rados_shutdown(s->cluster);
1.1 root 548: }
549:
550: /*
551: * Cancel aio. Since we don't reference acb in a non qemu threads,
552: * it is safe to access it here.
553: */
1.1.1.2 root 554: static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
1.1 root 555: {
556: RBDAIOCB *acb = (RBDAIOCB *) blockacb;
557: acb->cancelled = 1;
558: }
559:
560: static AIOPool rbd_aio_pool = {
561: .aiocb_size = sizeof(RBDAIOCB),
1.1.1.2 root 562: .cancel = qemu_rbd_aio_cancel,
1.1 root 563: };
564:
1.1.1.2 root 565: static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb)
1.1 root 566: {
1.1.1.2 root 567: int ret = 0;
1.1 root 568: while (1) {
569: fd_set wfd;
1.1.1.2 root 570: int fd = s->fds[RBD_FD_WRITE];
1.1 root 571:
1.1.1.2 root 572: /* send the op pointer to the qemu thread that is responsible
573: for the aio/op completion. Must do it in a qemu thread context */
1.1 root 574: ret = write(fd, (void *)&rcb, sizeof(rcb));
575: if (ret >= 0) {
576: break;
577: }
578: if (errno == EINTR) {
579: continue;
1.1.1.2 root 580: }
1.1 root 581: if (errno != EAGAIN) {
582: break;
1.1.1.2 root 583: }
1.1 root 584:
585: FD_ZERO(&wfd);
586: FD_SET(fd, &wfd);
587: do {
588: ret = select(fd + 1, NULL, &wfd, NULL, NULL);
589: } while (ret < 0 && errno == EINTR);
590: }
591:
1.1.1.2 root 592: return ret;
593: }
594:
595: /*
596: * This is the callback function for rbd_aio_read and _write
597: *
598: * Note: this function is being called from a non qemu thread so
599: * we need to be careful about what we do here. Generally we only
600: * write to the block notification pipe, and do the rest of the
601: * io completion handling from qemu_rbd_aio_event_reader() which
602: * runs in a qemu context.
603: */
604: static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
605: {
606: int ret;
607: rcb->ret = rbd_aio_get_return_value(c);
608: rbd_aio_release(c);
609: ret = qemu_rbd_send_pipe(rcb->s, rcb);
1.1 root 610: if (ret < 0) {
1.1.1.2 root 611: error_report("failed writing to acb->s->fds");
1.1.1.3 root 612: g_free(rcb);
1.1 root 613: }
614: }
615:
1.1.1.2 root 616: /* Callback when all queued rbd_aio requests are complete */
1.1 root 617:
618: static void rbd_aio_bh_cb(void *opaque)
619: {
620: RBDAIOCB *acb = opaque;
621:
1.1.1.4 ! root 622: if (acb->cmd == RBD_AIO_READ) {
1.1 root 623: qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
624: }
625: qemu_vfree(acb->bounce);
626: acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
627: qemu_bh_delete(acb->bh);
628: acb->bh = NULL;
629:
630: qemu_aio_release(acb);
631: }
632:
1.1.1.4 ! root 633: static int rbd_aio_discard_wrapper(rbd_image_t image,
! 634: uint64_t off,
! 635: uint64_t len,
! 636: rbd_completion_t comp)
! 637: {
! 638: #ifdef LIBRBD_SUPPORTS_DISCARD
! 639: return rbd_aio_discard(image, off, len, comp);
! 640: #else
! 641: return -ENOTSUP;
! 642: #endif
! 643: }
! 644:
! 645: static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
! 646: int64_t sector_num,
! 647: QEMUIOVector *qiov,
! 648: int nb_sectors,
! 649: BlockDriverCompletionFunc *cb,
! 650: void *opaque,
! 651: RBDAIOCmd cmd)
1.1 root 652: {
653: RBDAIOCB *acb;
654: RADOSCB *rcb;
1.1.1.2 root 655: rbd_completion_t c;
1.1 root 656: int64_t off, size;
657: char *buf;
1.1.1.2 root 658: int r;
1.1 root 659:
660: BDRVRBDState *s = bs->opaque;
661:
662: acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
1.1.1.4 ! root 663: acb->cmd = cmd;
1.1 root 664: acb->qiov = qiov;
1.1.1.4 ! root 665: if (cmd == RBD_AIO_DISCARD) {
! 666: acb->bounce = NULL;
! 667: } else {
! 668: acb->bounce = qemu_blockalign(bs, qiov->size);
! 669: }
1.1 root 670: acb->ret = 0;
671: acb->error = 0;
672: acb->s = s;
673: acb->cancelled = 0;
674: acb->bh = NULL;
675:
1.1.1.4 ! root 676: if (cmd == RBD_AIO_WRITE) {
1.1 root 677: qemu_iovec_to_buffer(acb->qiov, acb->bounce);
678: }
679:
680: buf = acb->bounce;
681:
682: off = sector_num * BDRV_SECTOR_SIZE;
683: size = nb_sectors * BDRV_SECTOR_SIZE;
684:
1.1.1.2 root 685: s->qemu_aio_count++; /* All the RADOSCB */
1.1 root 686:
1.1.1.3 root 687: rcb = g_malloc(sizeof(RADOSCB));
1.1.1.2 root 688: rcb->done = 0;
689: rcb->acb = acb;
690: rcb->buf = buf;
691: rcb->s = acb->s;
692: rcb->size = size;
693: r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
694: if (r < 0) {
695: goto failed;
696: }
697:
1.1.1.4 ! root 698: switch (cmd) {
! 699: case RBD_AIO_WRITE:
1.1.1.2 root 700: r = rbd_aio_write(s->image, off, size, buf, c);
1.1.1.4 ! root 701: break;
! 702: case RBD_AIO_READ:
1.1.1.2 root 703: r = rbd_aio_read(s->image, off, size, buf, c);
1.1.1.4 ! root 704: break;
! 705: case RBD_AIO_DISCARD:
! 706: r = rbd_aio_discard_wrapper(s->image, off, size, c);
! 707: break;
! 708: default:
! 709: r = -EINVAL;
1.1.1.2 root 710: }
1.1 root 711:
1.1.1.2 root 712: if (r < 0) {
713: goto failed;
1.1 root 714: }
715:
716: return &acb->common;
1.1.1.2 root 717:
718: failed:
1.1.1.3 root 719: g_free(rcb);
1.1.1.2 root 720: s->qemu_aio_count--;
721: qemu_aio_release(acb);
722: return NULL;
1.1 root 723: }
724:
1.1.1.2 root 725: static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
726: int64_t sector_num,
727: QEMUIOVector *qiov,
728: int nb_sectors,
729: BlockDriverCompletionFunc *cb,
730: void *opaque)
1.1 root 731: {
1.1.1.4 ! root 732: return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
! 733: RBD_AIO_READ);
1.1 root 734: }
735:
1.1.1.2 root 736: static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
737: int64_t sector_num,
738: QEMUIOVector *qiov,
739: int nb_sectors,
740: BlockDriverCompletionFunc *cb,
741: void *opaque)
1.1 root 742: {
1.1.1.4 ! root 743: return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
! 744: RBD_AIO_WRITE);
1.1 root 745: }
746:
1.1.1.3 root 747: static int qemu_rbd_co_flush(BlockDriverState *bs)
748: {
749: #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
750: /* rbd_flush added in 0.1.1 */
751: BDRVRBDState *s = bs->opaque;
752: return rbd_flush(s->image);
753: #else
754: return 0;
755: #endif
756: }
757:
1.1.1.2 root 758: static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
1.1 root 759: {
760: BDRVRBDState *s = bs->opaque;
1.1.1.2 root 761: rbd_image_info_t info;
762: int r;
763:
764: r = rbd_stat(s->image, &info, sizeof(info));
765: if (r < 0) {
766: return r;
767: }
768:
769: bdi->cluster_size = info.obj_size;
1.1 root 770: return 0;
771: }
772:
1.1.1.2 root 773: static int64_t qemu_rbd_getlength(BlockDriverState *bs)
774: {
775: BDRVRBDState *s = bs->opaque;
776: rbd_image_info_t info;
777: int r;
778:
779: r = rbd_stat(s->image, &info, sizeof(info));
780: if (r < 0) {
781: return r;
782: }
783:
784: return info.size;
785: }
786:
787: static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset)
1.1 root 788: {
789: BDRVRBDState *s = bs->opaque;
1.1.1.2 root 790: int r;
791:
792: r = rbd_resize(s->image, offset);
793: if (r < 0) {
794: return r;
795: }
1.1 root 796:
1.1.1.2 root 797: return 0;
1.1 root 798: }
799:
1.1.1.2 root 800: static int qemu_rbd_snap_create(BlockDriverState *bs,
801: QEMUSnapshotInfo *sn_info)
1.1 root 802: {
803: BDRVRBDState *s = bs->opaque;
804: int r;
805:
806: if (sn_info->name[0] == '\0') {
807: return -EINVAL; /* we need a name for rbd snapshots */
808: }
809:
810: /*
811: * rbd snapshots are using the name as the user controlled unique identifier
812: * we can't use the rbd snapid for that purpose, as it can't be set
813: */
814: if (sn_info->id_str[0] != '\0' &&
815: strcmp(sn_info->id_str, sn_info->name) != 0) {
816: return -EINVAL;
817: }
818:
819: if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
820: return -ERANGE;
821: }
822:
1.1.1.2 root 823: r = rbd_snap_create(s->image, sn_info->name);
1.1 root 824: if (r < 0) {
1.1.1.2 root 825: error_report("failed to create snap: %s", strerror(-r));
1.1 root 826: return r;
827: }
828:
829: return 0;
830: }
831:
1.1.1.4 ! root 832: static int qemu_rbd_snap_remove(BlockDriverState *bs,
! 833: const char *snapshot_name)
! 834: {
! 835: BDRVRBDState *s = bs->opaque;
! 836: int r;
! 837:
! 838: r = rbd_snap_remove(s->image, snapshot_name);
! 839: return r;
! 840: }
! 841:
! 842: static int qemu_rbd_snap_rollback(BlockDriverState *bs,
! 843: const char *snapshot_name)
! 844: {
! 845: BDRVRBDState *s = bs->opaque;
! 846: int r;
! 847:
! 848: r = rbd_snap_rollback(s->image, snapshot_name);
! 849: return r;
! 850: }
! 851:
1.1.1.2 root 852: static int qemu_rbd_snap_list(BlockDriverState *bs,
853: QEMUSnapshotInfo **psn_tab)
1.1 root 854: {
855: BDRVRBDState *s = bs->opaque;
856: QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1.1.1.2 root 857: int i, snap_count;
858: rbd_snap_info_t *snaps;
859: int max_snaps = RBD_MAX_SNAPS;
1.1 root 860:
1.1.1.2 root 861: do {
1.1.1.3 root 862: snaps = g_malloc(sizeof(*snaps) * max_snaps);
1.1.1.2 root 863: snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
864: if (snap_count < 0) {
1.1.1.3 root 865: g_free(snaps);
1.1 root 866: }
1.1.1.2 root 867: } while (snap_count == -ERANGE);
1.1 root 868:
1.1.1.2 root 869: if (snap_count <= 0) {
1.1.1.3 root 870: goto done;
1.1 root 871: }
872:
1.1.1.3 root 873: sn_tab = g_malloc0(snap_count * sizeof(QEMUSnapshotInfo));
1.1 root 874:
1.1.1.2 root 875: for (i = 0; i < snap_count; i++) {
876: const char *snap_name = snaps[i].name;
1.1 root 877:
878: sn_info = sn_tab + i;
879: pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
880: pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
881:
1.1.1.2 root 882: sn_info->vm_state_size = snaps[i].size;
1.1 root 883: sn_info->date_sec = 0;
884: sn_info->date_nsec = 0;
885: sn_info->vm_clock_nsec = 0;
886: }
1.1.1.2 root 887: rbd_snap_list_end(snaps);
888:
1.1.1.3 root 889: done:
1.1 root 890: *psn_tab = sn_tab;
891: return snap_count;
892: }
893:
1.1.1.4 ! root 894: #ifdef LIBRBD_SUPPORTS_DISCARD
! 895: static BlockDriverAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs,
! 896: int64_t sector_num,
! 897: int nb_sectors,
! 898: BlockDriverCompletionFunc *cb,
! 899: void *opaque)
! 900: {
! 901: return rbd_start_aio(bs, sector_num, NULL, nb_sectors, cb, opaque,
! 902: RBD_AIO_DISCARD);
! 903: }
! 904: #endif
! 905:
1.1.1.2 root 906: static QEMUOptionParameter qemu_rbd_create_options[] = {
1.1 root 907: {
908: .name = BLOCK_OPT_SIZE,
909: .type = OPT_SIZE,
910: .help = "Virtual disk size"
911: },
912: {
913: .name = BLOCK_OPT_CLUSTER_SIZE,
914: .type = OPT_SIZE,
915: .help = "RBD object size"
916: },
917: {NULL}
918: };
919:
920: static BlockDriver bdrv_rbd = {
921: .format_name = "rbd",
922: .instance_size = sizeof(BDRVRBDState),
1.1.1.2 root 923: .bdrv_file_open = qemu_rbd_open,
924: .bdrv_close = qemu_rbd_close,
925: .bdrv_create = qemu_rbd_create,
926: .bdrv_get_info = qemu_rbd_getinfo,
927: .create_options = qemu_rbd_create_options,
928: .bdrv_getlength = qemu_rbd_getlength,
929: .bdrv_truncate = qemu_rbd_truncate,
1.1 root 930: .protocol_name = "rbd",
931:
1.1.1.3 root 932: .bdrv_aio_readv = qemu_rbd_aio_readv,
933: .bdrv_aio_writev = qemu_rbd_aio_writev,
934: .bdrv_co_flush_to_disk = qemu_rbd_co_flush,
1.1 root 935:
1.1.1.4 ! root 936: #ifdef LIBRBD_SUPPORTS_DISCARD
! 937: .bdrv_aio_discard = qemu_rbd_aio_discard,
! 938: #endif
! 939:
1.1.1.3 root 940: .bdrv_snapshot_create = qemu_rbd_snap_create,
1.1.1.4 ! root 941: .bdrv_snapshot_delete = qemu_rbd_snap_remove,
1.1.1.3 root 942: .bdrv_snapshot_list = qemu_rbd_snap_list,
1.1.1.4 ! root 943: .bdrv_snapshot_goto = qemu_rbd_snap_rollback,
1.1 root 944: };
945:
946: static void bdrv_rbd_init(void)
947: {
948: bdrv_register(&bdrv_rbd);
949: }
950:
951: block_init(bdrv_rbd_init);
This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.