|
|
1.1 root 1: /*
2: * QEMU Block driver for RADOS (Ceph)
3: *
4: * Copyright (C) 2010 Christian Brunner <[email protected]>
5: *
6: * This work is licensed under the terms of the GNU GPL, version 2. See
7: * the COPYING file in the top-level directory.
8: *
9: */
10:
11: #include "qemu-common.h"
12: #include "qemu-error.h"
13:
14: #include "rbd_types.h"
15: #include "block_int.h"
16:
17: #include <rados/librados.h>
18:
19:
20:
21: /*
22: * When specifying the image filename use:
23: *
24: * rbd:poolname/devicename
25: *
26: * poolname must be the name of an existing rados pool
27: *
28: * devicename is the basename for all objects used to
29: * emulate the raw device.
30: *
31: * Metadata information (image size, ...) is stored in an
32: * object with the name "devicename.rbd".
33: *
34: * The raw device is split into 4MB sized objects by default.
35: * The sequencenumber is encoded in a 12 byte long hex-string,
36: * and is attached to the devicename, separated by a dot.
37: * e.g. "devicename.1234567890ab"
38: *
39: */
40:
41: #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
42:
43: typedef struct RBDAIOCB {
44: BlockDriverAIOCB common;
45: QEMUBH *bh;
46: int ret;
47: QEMUIOVector *qiov;
48: char *bounce;
49: int write;
50: int64_t sector_num;
51: int aiocnt;
52: int error;
53: struct BDRVRBDState *s;
54: int cancelled;
55: } RBDAIOCB;
56:
57: typedef struct RADOSCB {
58: int rcbid;
59: RBDAIOCB *acb;
60: struct BDRVRBDState *s;
61: int done;
62: int64_t segsize;
63: char *buf;
64: int ret;
65: } RADOSCB;
66:
67: #define RBD_FD_READ 0
68: #define RBD_FD_WRITE 1
69:
70: typedef struct BDRVRBDState {
71: int fds[2];
72: rados_pool_t pool;
73: rados_pool_t header_pool;
74: char name[RBD_MAX_OBJ_NAME_SIZE];
75: char block_name[RBD_MAX_BLOCK_NAME_SIZE];
76: uint64_t size;
77: uint64_t objsize;
78: int qemu_aio_count;
79: int event_reader_pos;
80: RADOSCB *event_rcb;
81: } BDRVRBDState;
82:
83: typedef struct rbd_obj_header_ondisk RbdHeader1;
84:
85: static void rbd_aio_bh_cb(void *opaque);
86:
87: static int rbd_next_tok(char *dst, int dst_len,
88: char *src, char delim,
89: const char *name,
90: char **p)
91: {
92: int l;
93: char *end;
94:
95: *p = NULL;
96:
97: if (delim != '\0') {
98: end = strchr(src, delim);
99: if (end) {
100: *p = end + 1;
101: *end = '\0';
102: }
103: }
104: l = strlen(src);
105: if (l >= dst_len) {
106: error_report("%s too long", name);
107: return -EINVAL;
108: } else if (l == 0) {
109: error_report("%s too short", name);
110: return -EINVAL;
111: }
112:
113: pstrcpy(dst, dst_len, src);
114:
115: return 0;
116: }
117:
118: static int rbd_parsename(const char *filename,
119: char *pool, int pool_len,
120: char *snap, int snap_len,
121: char *name, int name_len)
122: {
123: const char *start;
124: char *p, *buf;
125: int ret;
126:
127: if (!strstart(filename, "rbd:", &start)) {
128: return -EINVAL;
129: }
130:
131: buf = qemu_strdup(start);
132: p = buf;
133:
134: ret = rbd_next_tok(pool, pool_len, p, '/', "pool name", &p);
135: if (ret < 0 || !p) {
136: ret = -EINVAL;
137: goto done;
138: }
139: ret = rbd_next_tok(name, name_len, p, '@', "object name", &p);
140: if (ret < 0) {
141: goto done;
142: }
143: if (!p) {
144: *snap = '\0';
145: goto done;
146: }
147:
148: ret = rbd_next_tok(snap, snap_len, p, '\0', "snap name", &p);
149:
150: done:
151: qemu_free(buf);
152: return ret;
153: }
154:
155: static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc)
156: {
157: uint32_t len = strlen(name);
158: uint32_t len_le = cpu_to_le32(len);
159: /* total_len = encoding op + name + empty buffer */
160: uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t);
161: uint8_t *desc = NULL;
162:
163: desc = qemu_malloc(total_len);
164:
165: *tmap_desc = (char *)desc;
166:
167: *desc = op;
168: desc++;
169: memcpy(desc, &len_le, sizeof(len_le));
170: desc += sizeof(len_le);
171: memcpy(desc, name, len);
172: desc += len;
173: len = 0; /* no need for endian conversion for 0 */
174: memcpy(desc, &len, sizeof(len));
175: desc += sizeof(len);
176:
177: return (char *)desc - *tmap_desc;
178: }
179:
180: static void free_tmap_op(char *tmap_desc)
181: {
182: qemu_free(tmap_desc);
183: }
184:
185: static int rbd_register_image(rados_pool_t pool, const char *name)
186: {
187: char *tmap_desc;
188: const char *dir = RBD_DIRECTORY;
189: int ret;
190:
191: ret = create_tmap_op(CEPH_OSD_TMAP_SET, name, &tmap_desc);
192: if (ret < 0) {
193: return ret;
194: }
195:
196: ret = rados_tmap_update(pool, dir, tmap_desc, ret);
197: free_tmap_op(tmap_desc);
198:
199: return ret;
200: }
201:
202: static int touch_rbd_info(rados_pool_t pool, const char *info_oid)
203: {
204: int r = rados_write(pool, info_oid, 0, NULL, 0);
205: if (r < 0) {
206: return r;
207: }
208: return 0;
209: }
210:
211: static int rbd_assign_bid(rados_pool_t pool, uint64_t *id)
212: {
213: uint64_t out[1];
214: const char *info_oid = RBD_INFO;
215:
216: *id = 0;
217:
218: int r = touch_rbd_info(pool, info_oid);
219: if (r < 0) {
220: return r;
221: }
222:
223: r = rados_exec(pool, info_oid, "rbd", "assign_bid", NULL,
224: 0, (char *)out, sizeof(out));
225: if (r < 0) {
226: return r;
227: }
228:
229: le64_to_cpus(out);
230: *id = out[0];
231:
232: return 0;
233: }
234:
235: static int rbd_create(const char *filename, QEMUOptionParameter *options)
236: {
237: int64_t bytes = 0;
238: int64_t objsize;
239: uint64_t size;
240: time_t mtime;
241: uint8_t obj_order = RBD_DEFAULT_OBJ_ORDER;
242: char pool[RBD_MAX_SEG_NAME_SIZE];
243: char n[RBD_MAX_SEG_NAME_SIZE];
244: char name[RBD_MAX_OBJ_NAME_SIZE];
245: char snap_buf[RBD_MAX_SEG_NAME_SIZE];
246: char *snap = NULL;
247: RbdHeader1 header;
248: rados_pool_t p;
249: uint64_t bid;
250: uint32_t hi, lo;
251: int ret;
252:
253: if (rbd_parsename(filename,
254: pool, sizeof(pool),
255: snap_buf, sizeof(snap_buf),
256: name, sizeof(name)) < 0) {
257: return -EINVAL;
258: }
259: if (snap_buf[0] != '\0') {
260: snap = snap_buf;
261: }
262:
263: snprintf(n, sizeof(n), "%s%s", name, RBD_SUFFIX);
264:
265: /* Read out options */
266: while (options && options->name) {
267: if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
268: bytes = options->value.n;
269: } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
270: if (options->value.n) {
271: objsize = options->value.n;
272: if ((objsize - 1) & objsize) { /* not a power of 2? */
273: error_report("obj size needs to be power of 2");
274: return -EINVAL;
275: }
276: if (objsize < 4096) {
277: error_report("obj size too small");
278: return -EINVAL;
279: }
280: obj_order = ffs(objsize) - 1;
281: }
282: }
283: options++;
284: }
285:
286: memset(&header, 0, sizeof(header));
287: pstrcpy(header.text, sizeof(header.text), RBD_HEADER_TEXT);
288: pstrcpy(header.signature, sizeof(header.signature), RBD_HEADER_SIGNATURE);
289: pstrcpy(header.version, sizeof(header.version), RBD_HEADER_VERSION);
290: header.image_size = cpu_to_le64(bytes);
291: header.options.order = obj_order;
292: header.options.crypt_type = RBD_CRYPT_NONE;
293: header.options.comp_type = RBD_COMP_NONE;
294: header.snap_seq = 0;
295: header.snap_count = 0;
296:
297: if (rados_initialize(0, NULL) < 0) {
298: error_report("error initializing");
299: return -EIO;
300: }
301:
302: if (rados_open_pool(pool, &p)) {
303: error_report("error opening pool %s", pool);
304: rados_deinitialize();
305: return -EIO;
306: }
307:
308: /* check for existing rbd header file */
309: ret = rados_stat(p, n, &size, &mtime);
310: if (ret == 0) {
311: ret=-EEXIST;
312: goto done;
313: }
314:
315: ret = rbd_assign_bid(p, &bid);
316: if (ret < 0) {
317: error_report("failed assigning block id");
318: rados_deinitialize();
319: return -EIO;
320: }
321: hi = bid >> 32;
322: lo = bid & 0xFFFFFFFF;
323: snprintf(header.block_name, sizeof(header.block_name), "rb.%x.%x", hi, lo);
324:
325: /* create header file */
326: ret = rados_write(p, n, 0, (const char *)&header, sizeof(header));
327: if (ret < 0) {
328: goto done;
329: }
330:
331: ret = rbd_register_image(p, name);
332: done:
333: rados_close_pool(p);
334: rados_deinitialize();
335:
336: return ret;
337: }
338:
339: /*
340: * This aio completion is being called from rbd_aio_event_reader() and
341: * runs in qemu context. It schedules a bh, but just in case the aio
342: * was not cancelled before.
343: */
344: static void rbd_complete_aio(RADOSCB *rcb)
345: {
346: RBDAIOCB *acb = rcb->acb;
347: int64_t r;
348:
349: acb->aiocnt--;
350:
351: if (acb->cancelled) {
352: if (!acb->aiocnt) {
353: qemu_vfree(acb->bounce);
354: qemu_aio_release(acb);
355: }
356: goto done;
357: }
358:
359: r = rcb->ret;
360:
361: if (acb->write) {
362: if (r < 0) {
363: acb->ret = r;
364: acb->error = 1;
365: } else if (!acb->error) {
366: acb->ret += rcb->segsize;
367: }
368: } else {
369: if (r == -ENOENT) {
370: memset(rcb->buf, 0, rcb->segsize);
371: if (!acb->error) {
372: acb->ret += rcb->segsize;
373: }
374: } else if (r < 0) {
375: memset(rcb->buf, 0, rcb->segsize);
376: acb->ret = r;
377: acb->error = 1;
378: } else if (r < rcb->segsize) {
379: memset(rcb->buf + r, 0, rcb->segsize - r);
380: if (!acb->error) {
381: acb->ret += rcb->segsize;
382: }
383: } else if (!acb->error) {
384: acb->ret += r;
385: }
386: }
387: /* Note that acb->bh can be NULL in case where the aio was cancelled */
388: if (!acb->aiocnt) {
389: acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
390: qemu_bh_schedule(acb->bh);
391: }
392: done:
393: qemu_free(rcb);
394: }
395:
396: /*
397: * aio fd read handler. It runs in the qemu context and calls the
398: * completion handling of completed rados aio operations.
399: */
400: static void rbd_aio_event_reader(void *opaque)
401: {
402: BDRVRBDState *s = opaque;
403:
404: ssize_t ret;
405:
406: do {
407: char *p = (char *)&s->event_rcb;
408:
409: /* now read the rcb pointer that was sent from a non qemu thread */
410: if ((ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos,
411: sizeof(s->event_rcb) - s->event_reader_pos)) > 0) {
412: if (ret > 0) {
413: s->event_reader_pos += ret;
414: if (s->event_reader_pos == sizeof(s->event_rcb)) {
415: s->event_reader_pos = 0;
416: rbd_complete_aio(s->event_rcb);
417: s->qemu_aio_count --;
418: }
419: }
420: }
421: } while (ret < 0 && errno == EINTR);
422: }
423:
424: static int rbd_aio_flush_cb(void *opaque)
425: {
426: BDRVRBDState *s = opaque;
427:
428: return (s->qemu_aio_count > 0);
429: }
430:
431:
432: static int rbd_set_snapc(rados_pool_t pool, const char *snap, RbdHeader1 *header)
433: {
434: uint32_t snap_count = le32_to_cpu(header->snap_count);
435: rados_snap_t *snaps = NULL;
436: rados_snap_t seq;
437: uint32_t i;
438: uint64_t snap_names_len = le64_to_cpu(header->snap_names_len);
439: int r;
440: rados_snap_t snapid = 0;
441:
442: if (snap_count) {
443: const char *header_snap = (const char *)&header->snaps[snap_count];
444: const char *end = header_snap + snap_names_len;
445: snaps = qemu_malloc(sizeof(rados_snap_t) * header->snap_count);
446:
447: for (i=0; i < snap_count; i++) {
448: snaps[i] = le64_to_cpu(header->snaps[i].id);
449:
450: if (snap && strcmp(snap, header_snap) == 0) {
451: snapid = snaps[i];
452: }
453:
454: header_snap += strlen(header_snap) + 1;
455: if (header_snap > end) {
456: error_report("bad header, snapshot list broken");
457: }
458: }
459: }
460:
461: if (snap && !snapid) {
462: error_report("snapshot not found");
463: qemu_free(snaps);
464: return -ENOENT;
465: }
466: seq = le32_to_cpu(header->snap_seq);
467:
468: r = rados_set_snap_context(pool, seq, snaps, snap_count);
469:
470: rados_set_snap(pool, snapid);
471:
472: qemu_free(snaps);
473:
474: return r;
475: }
476:
477: #define BUF_READ_START_LEN 4096
478:
479: static int rbd_read_header(BDRVRBDState *s, char **hbuf)
480: {
481: char *buf = NULL;
482: char n[RBD_MAX_SEG_NAME_SIZE];
483: uint64_t len = BUF_READ_START_LEN;
484: int r;
485:
486: snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX);
487:
488: buf = qemu_malloc(len);
489:
490: r = rados_read(s->header_pool, n, 0, buf, len);
491: if (r < 0) {
492: goto failed;
493: }
494:
495: if (r < len) {
496: goto done;
497: }
498:
499: qemu_free(buf);
500: buf = qemu_malloc(len);
501:
502: r = rados_stat(s->header_pool, n, &len, NULL);
503: if (r < 0) {
504: goto failed;
505: }
506:
507: r = rados_read(s->header_pool, n, 0, buf, len);
508: if (r < 0) {
509: goto failed;
510: }
511:
512: done:
513: *hbuf = buf;
514: return 0;
515:
516: failed:
517: qemu_free(buf);
518: return r;
519: }
520:
521: static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
522: {
523: BDRVRBDState *s = bs->opaque;
524: RbdHeader1 *header;
525: char pool[RBD_MAX_SEG_NAME_SIZE];
526: char snap_buf[RBD_MAX_SEG_NAME_SIZE];
527: char *snap = NULL;
528: char *hbuf = NULL;
529: int r;
530:
531: if (rbd_parsename(filename, pool, sizeof(pool),
532: snap_buf, sizeof(snap_buf),
533: s->name, sizeof(s->name)) < 0) {
534: return -EINVAL;
535: }
536: if (snap_buf[0] != '\0') {
537: snap = snap_buf;
538: }
539:
540: if ((r = rados_initialize(0, NULL)) < 0) {
541: error_report("error initializing");
542: return r;
543: }
544:
545: if ((r = rados_open_pool(pool, &s->pool))) {
546: error_report("error opening pool %s", pool);
547: rados_deinitialize();
548: return r;
549: }
550:
551: if ((r = rados_open_pool(pool, &s->header_pool))) {
552: error_report("error opening pool %s", pool);
553: rados_deinitialize();
554: return r;
555: }
556:
557: if ((r = rbd_read_header(s, &hbuf)) < 0) {
558: error_report("error reading header from %s", s->name);
559: goto failed;
560: }
561:
562: if (memcmp(hbuf + 64, RBD_HEADER_SIGNATURE, 4)) {
563: error_report("Invalid header signature");
564: r = -EMEDIUMTYPE;
565: goto failed;
566: }
567:
568: if (memcmp(hbuf + 68, RBD_HEADER_VERSION, 8)) {
569: error_report("Unknown image version");
570: r = -EMEDIUMTYPE;
571: goto failed;
572: }
573:
574: header = (RbdHeader1 *) hbuf;
575: s->size = le64_to_cpu(header->image_size);
576: s->objsize = 1ULL << header->options.order;
577: memcpy(s->block_name, header->block_name, sizeof(header->block_name));
578:
579: r = rbd_set_snapc(s->pool, snap, header);
580: if (r < 0) {
581: error_report("failed setting snap context: %s", strerror(-r));
582: goto failed;
583: }
584:
585: bs->read_only = (snap != NULL);
586:
587: s->event_reader_pos = 0;
588: r = qemu_pipe(s->fds);
589: if (r < 0) {
590: error_report("error opening eventfd");
591: goto failed;
592: }
593: fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
594: fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
595: qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], rbd_aio_event_reader, NULL,
596: rbd_aio_flush_cb, NULL, s);
597:
598: qemu_free(hbuf);
599:
600: return 0;
601:
602: failed:
603: qemu_free(hbuf);
604:
605: rados_close_pool(s->header_pool);
606: rados_close_pool(s->pool);
607: rados_deinitialize();
608: return r;
609: }
610:
611: static void rbd_close(BlockDriverState *bs)
612: {
613: BDRVRBDState *s = bs->opaque;
614:
615: close(s->fds[0]);
616: close(s->fds[1]);
617: qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL , NULL, NULL, NULL,
618: NULL);
619:
620: rados_close_pool(s->header_pool);
621: rados_close_pool(s->pool);
622: rados_deinitialize();
623: }
624:
625: /*
626: * Cancel aio. Since we don't reference acb in a non qemu threads,
627: * it is safe to access it here.
628: */
629: static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
630: {
631: RBDAIOCB *acb = (RBDAIOCB *) blockacb;
632: acb->cancelled = 1;
633: }
634:
635: static AIOPool rbd_aio_pool = {
636: .aiocb_size = sizeof(RBDAIOCB),
637: .cancel = rbd_aio_cancel,
638: };
639:
640: /*
641: * This is the callback function for rados_aio_read and _write
642: *
643: * Note: this function is being called from a non qemu thread so
644: * we need to be careful about what we do here. Generally we only
645: * write to the block notification pipe, and do the rest of the
646: * io completion handling from rbd_aio_event_reader() which
647: * runs in a qemu context.
648: */
649: static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
650: {
651: int ret;
652: rcb->ret = rados_aio_get_return_value(c);
653: rados_aio_release(c);
654: while (1) {
655: fd_set wfd;
656: int fd = rcb->s->fds[RBD_FD_WRITE];
657:
658: /* send the rcb pointer to the qemu thread that is responsible
659: for the aio completion. Must do it in a qemu thread context */
660: ret = write(fd, (void *)&rcb, sizeof(rcb));
661: if (ret >= 0) {
662: break;
663: }
664: if (errno == EINTR) {
665: continue;
666: }
667: if (errno != EAGAIN) {
668: break;
669: }
670:
671: FD_ZERO(&wfd);
672: FD_SET(fd, &wfd);
673: do {
674: ret = select(fd + 1, NULL, &wfd, NULL, NULL);
675: } while (ret < 0 && errno == EINTR);
676: }
677:
678: if (ret < 0) {
679: error_report("failed writing to acb->s->fds\n");
680: qemu_free(rcb);
681: }
682: }
683:
684: /* Callback when all queued rados_aio requests are complete */
685:
686: static void rbd_aio_bh_cb(void *opaque)
687: {
688: RBDAIOCB *acb = opaque;
689:
690: if (!acb->write) {
691: qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
692: }
693: qemu_vfree(acb->bounce);
694: acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
695: qemu_bh_delete(acb->bh);
696: acb->bh = NULL;
697:
698: qemu_aio_release(acb);
699: }
700:
701: static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
702: int64_t sector_num,
703: QEMUIOVector *qiov,
704: int nb_sectors,
705: BlockDriverCompletionFunc *cb,
706: void *opaque, int write)
707: {
708: RBDAIOCB *acb;
709: RADOSCB *rcb;
710: rados_completion_t c;
711: char n[RBD_MAX_SEG_NAME_SIZE];
712: int64_t segnr, segoffs, segsize, last_segnr;
713: int64_t off, size;
714: char *buf;
715:
716: BDRVRBDState *s = bs->opaque;
717:
718: acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
719: acb->write = write;
720: acb->qiov = qiov;
721: acb->bounce = qemu_blockalign(bs, qiov->size);
722: acb->aiocnt = 0;
723: acb->ret = 0;
724: acb->error = 0;
725: acb->s = s;
726: acb->cancelled = 0;
727: acb->bh = NULL;
728:
729: if (write) {
730: qemu_iovec_to_buffer(acb->qiov, acb->bounce);
731: }
732:
733: buf = acb->bounce;
734:
735: off = sector_num * BDRV_SECTOR_SIZE;
736: size = nb_sectors * BDRV_SECTOR_SIZE;
737: segnr = off / s->objsize;
738: segoffs = off % s->objsize;
739: segsize = s->objsize - segoffs;
740:
741: last_segnr = ((off + size - 1) / s->objsize);
742: acb->aiocnt = (last_segnr - segnr) + 1;
743:
744: s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */
745:
746: while (size > 0) {
747: if (size < segsize) {
748: segsize = size;
749: }
750:
751: snprintf(n, sizeof(n), "%s.%012" PRIx64, s->block_name,
752: segnr);
753:
754: rcb = qemu_malloc(sizeof(RADOSCB));
755: rcb->done = 0;
756: rcb->acb = acb;
757: rcb->segsize = segsize;
758: rcb->buf = buf;
759: rcb->s = acb->s;
760:
761: if (write) {
762: rados_aio_create_completion(rcb, NULL,
763: (rados_callback_t) rbd_finish_aiocb,
764: &c);
765: rados_aio_write(s->pool, n, segoffs, buf, segsize, c);
766: } else {
767: rados_aio_create_completion(rcb,
768: (rados_callback_t) rbd_finish_aiocb,
769: NULL, &c);
770: rados_aio_read(s->pool, n, segoffs, buf, segsize, c);
771: }
772:
773: buf += segsize;
774: size -= segsize;
775: segoffs = 0;
776: segsize = s->objsize;
777: segnr++;
778: }
779:
780: return &acb->common;
781: }
782:
783: static BlockDriverAIOCB *rbd_aio_readv(BlockDriverState * bs,
784: int64_t sector_num, QEMUIOVector * qiov,
785: int nb_sectors,
786: BlockDriverCompletionFunc * cb,
787: void *opaque)
788: {
789: return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
790: }
791:
792: static BlockDriverAIOCB *rbd_aio_writev(BlockDriverState * bs,
793: int64_t sector_num, QEMUIOVector * qiov,
794: int nb_sectors,
795: BlockDriverCompletionFunc * cb,
796: void *opaque)
797: {
798: return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
799: }
800:
801: static int rbd_getinfo(BlockDriverState * bs, BlockDriverInfo * bdi)
802: {
803: BDRVRBDState *s = bs->opaque;
804: bdi->cluster_size = s->objsize;
805: return 0;
806: }
807:
808: static int64_t rbd_getlength(BlockDriverState * bs)
809: {
810: BDRVRBDState *s = bs->opaque;
811:
812: return s->size;
813: }
814:
815: static int rbd_snap_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
816: {
817: BDRVRBDState *s = bs->opaque;
818: char inbuf[512], outbuf[128];
819: uint64_t snap_id;
820: int r;
821: char *p = inbuf;
822: char *end = inbuf + sizeof(inbuf);
823: char n[RBD_MAX_SEG_NAME_SIZE];
824: char *hbuf = NULL;
825: RbdHeader1 *header;
826:
827: if (sn_info->name[0] == '\0') {
828: return -EINVAL; /* we need a name for rbd snapshots */
829: }
830:
831: /*
832: * rbd snapshots are using the name as the user controlled unique identifier
833: * we can't use the rbd snapid for that purpose, as it can't be set
834: */
835: if (sn_info->id_str[0] != '\0' &&
836: strcmp(sn_info->id_str, sn_info->name) != 0) {
837: return -EINVAL;
838: }
839:
840: if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
841: return -ERANGE;
842: }
843:
844: r = rados_selfmanaged_snap_create(s->header_pool, &snap_id);
845: if (r < 0) {
846: error_report("failed to create snap id: %s", strerror(-r));
847: return r;
848: }
849:
850: *(uint32_t *)p = strlen(sn_info->name);
851: cpu_to_le32s((uint32_t *)p);
852: p += sizeof(uint32_t);
853: strncpy(p, sn_info->name, end - p);
854: p += strlen(p);
855: if (p + sizeof(snap_id) > end) {
856: error_report("invalid input parameter");
857: return -EINVAL;
858: }
859:
860: *(uint64_t *)p = snap_id;
861: cpu_to_le64s((uint64_t *)p);
862:
863: snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX);
864:
865: r = rados_exec(s->header_pool, n, "rbd", "snap_add", inbuf,
866: sizeof(inbuf), outbuf, sizeof(outbuf));
867: if (r < 0) {
868: error_report("rbd.snap_add execution failed failed: %s", strerror(-r));
869: return r;
870: }
871:
872: sprintf(sn_info->id_str, "%s", sn_info->name);
873:
874: r = rbd_read_header(s, &hbuf);
875: if (r < 0) {
876: error_report("failed reading header: %s", strerror(-r));
877: return r;
878: }
879:
880: header = (RbdHeader1 *) hbuf;
881: r = rbd_set_snapc(s->pool, sn_info->name, header);
882: if (r < 0) {
883: error_report("failed setting snap context: %s", strerror(-r));
884: goto failed;
885: }
886:
887: return 0;
888:
889: failed:
890: qemu_free(header);
891: return r;
892: }
893:
894: static int decode32(char **p, const char *end, uint32_t *v)
895: {
896: if (*p + 4 > end) {
897: return -ERANGE;
898: }
899:
900: *v = *(uint32_t *)(*p);
901: le32_to_cpus(v);
902: *p += 4;
903: return 0;
904: }
905:
906: static int decode64(char **p, const char *end, uint64_t *v)
907: {
908: if (*p + 8 > end) {
909: return -ERANGE;
910: }
911:
912: *v = *(uint64_t *)(*p);
913: le64_to_cpus(v);
914: *p += 8;
915: return 0;
916: }
917:
918: static int decode_str(char **p, const char *end, char **s)
919: {
920: uint32_t len;
921: int r;
922:
923: if ((r = decode32(p, end, &len)) < 0) {
924: return r;
925: }
926:
927: *s = qemu_malloc(len + 1);
928: memcpy(*s, *p, len);
929: *p += len;
930: (*s)[len] = '\0';
931:
932: return len;
933: }
934:
935: static int rbd_snap_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
936: {
937: BDRVRBDState *s = bs->opaque;
938: char n[RBD_MAX_SEG_NAME_SIZE];
939: QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
940: RbdHeader1 *header;
941: char *hbuf = NULL;
942: char *outbuf = NULL, *end, *buf;
943: uint64_t len;
944: uint64_t snap_seq;
945: uint32_t snap_count;
946: int r, i;
947:
948: /* read header to estimate how much space we need to read the snap
949: * list */
950: if ((r = rbd_read_header(s, &hbuf)) < 0) {
951: goto done_err;
952: }
953: header = (RbdHeader1 *)hbuf;
954: len = le64_to_cpu(header->snap_names_len);
955: len += 1024; /* should have already been enough, but new snapshots might
956: already been created since we read the header. just allocate
957: a bit more, so that in most cases it'll suffice anyway */
958: qemu_free(hbuf);
959:
960: snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX);
961: while (1) {
962: qemu_free(outbuf);
963: outbuf = qemu_malloc(len);
964:
965: r = rados_exec(s->header_pool, n, "rbd", "snap_list", NULL, 0,
966: outbuf, len);
967: if (r < 0) {
968: error_report("rbd.snap_list execution failed failed: %s", strerror(-r));
969: goto done_err;
970: }
971: if (r != len) {
972: break;
973: }
974:
975: /* if we're here, we probably raced with some snaps creation */
976: len *= 2;
977: }
978: buf = outbuf;
979: end = buf + len;
980:
981: if ((r = decode64(&buf, end, &snap_seq)) < 0) {
982: goto done_err;
983: }
984: if ((r = decode32(&buf, end, &snap_count)) < 0) {
985: goto done_err;
986: }
987:
988: sn_tab = qemu_mallocz(snap_count * sizeof(QEMUSnapshotInfo));
989: for (i = 0; i < snap_count; i++) {
990: uint64_t id, image_size;
991: char *snap_name;
992:
993: if ((r = decode64(&buf, end, &id)) < 0) {
994: goto done_err;
995: }
996: if ((r = decode64(&buf, end, &image_size)) < 0) {
997: goto done_err;
998: }
999: if ((r = decode_str(&buf, end, &snap_name)) < 0) {
1000: goto done_err;
1001: }
1002:
1003: sn_info = sn_tab + i;
1004: pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1005: pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1006: qemu_free(snap_name);
1007:
1008: sn_info->vm_state_size = image_size;
1009: sn_info->date_sec = 0;
1010: sn_info->date_nsec = 0;
1011: sn_info->vm_clock_nsec = 0;
1012: }
1013: *psn_tab = sn_tab;
1014: qemu_free(outbuf);
1015: return snap_count;
1016: done_err:
1017: qemu_free(sn_tab);
1018: qemu_free(outbuf);
1019: return r;
1020: }
1021:
1022: static QEMUOptionParameter rbd_create_options[] = {
1023: {
1024: .name = BLOCK_OPT_SIZE,
1025: .type = OPT_SIZE,
1026: .help = "Virtual disk size"
1027: },
1028: {
1029: .name = BLOCK_OPT_CLUSTER_SIZE,
1030: .type = OPT_SIZE,
1031: .help = "RBD object size"
1032: },
1033: {NULL}
1034: };
1035:
1036: static BlockDriver bdrv_rbd = {
1037: .format_name = "rbd",
1038: .instance_size = sizeof(BDRVRBDState),
1039: .bdrv_file_open = rbd_open,
1040: .bdrv_close = rbd_close,
1041: .bdrv_create = rbd_create,
1042: .bdrv_get_info = rbd_getinfo,
1043: .create_options = rbd_create_options,
1044: .bdrv_getlength = rbd_getlength,
1045: .protocol_name = "rbd",
1046:
1047: .bdrv_aio_readv = rbd_aio_readv,
1048: .bdrv_aio_writev = rbd_aio_writev,
1049:
1050: .bdrv_snapshot_create = rbd_snap_create,
1051: .bdrv_snapshot_list = rbd_snap_list,
1052: };
1053:
1054: static void bdrv_rbd_init(void)
1055: {
1056: bdrv_register(&bdrv_rbd);
1057: }
1058:
1059: block_init(bdrv_rbd_init);
This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.