Annotation of qemu/posix-aio-compat.c, revision 1.1.1.8

1.1       root        1: /*
                      2:  * QEMU posix-aio emulation
                      3:  *
                      4:  * Copyright IBM, Corp. 2008
                      5:  *
                      6:  * Authors:
                      7:  *  Anthony Liguori   <aliguori@us.ibm.com>
                      8:  *
                      9:  * This work is licensed under the terms of the GNU GPL, version 2.  See
                     10:  * the COPYING file in the top-level directory.
                     11:  *
1.1.1.8 ! root       12:  * Contributions after 2012-01-13 are licensed under the terms of the
        !            13:  * GNU GPL, version 2 or (at your option) any later version.
1.1       root       14:  */
                     15: 
1.1.1.2   root       16: #include <sys/ioctl.h>
1.1.1.3   root       17: #include <sys/types.h>
1.1       root       18: #include <pthread.h>
                     19: #include <unistd.h>
                     20: #include <errno.h>
                     21: #include <time.h>
                     22: #include <string.h>
                     23: #include <stdlib.h>
                     24: #include <stdio.h>
1.1.1.3   root       25: 
                     26: #include "qemu-queue.h"
1.1       root       27: #include "osdep.h"
1.1.1.5   root       28: #include "sysemu.h"
1.1.1.2   root       29: #include "qemu-common.h"
1.1.1.5   root       30: #include "trace.h"
1.1.1.3   root       31: #include "block_int.h"
                     32: 
                     33: #include "block/raw-posix-aio.h"
                     34: 
1.1.1.7   root       35: static void do_spawn_thread(void);
1.1.1.3   root       36: 
                     37: struct qemu_paiocb {
                     38:     BlockDriverAIOCB common;
                     39:     int aio_fildes;
                     40:     union {
                     41:         struct iovec *aio_iov;
1.1.1.4   root       42:         void *aio_ioctl_buf;
1.1.1.3   root       43:     };
                     44:     int aio_niov;
                     45:     size_t aio_nbytes;
                     46: #define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
                     47:     off_t aio_offset;
                     48: 
                     49:     QTAILQ_ENTRY(qemu_paiocb) node;
                     50:     int aio_type;
                     51:     ssize_t ret;
                     52:     int active;
                     53:     struct qemu_paiocb *next;
                     54: };
                     55: 
                     56: typedef struct PosixAioState {
                     57:     int rfd, wfd;
                     58:     struct qemu_paiocb *first_aio;
                     59: } PosixAioState;
1.1       root       60: 
                     61: 
                     62: static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
                     63: static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
                     64: static pthread_t thread_id;
                     65: static pthread_attr_t attr;
                     66: static int max_threads = 64;
                     67: static int cur_threads = 0;
                     68: static int idle_threads = 0;
1.1.1.7   root       69: static int new_threads = 0;     /* backlog of threads we need to create */
                     70: static int pending_threads = 0; /* threads created but not running yet */
                     71: static QEMUBH *new_thread_bh;
1.1.1.3   root       72: static QTAILQ_HEAD(, qemu_paiocb) request_list;
1.1       root       73: 
1.1.1.3   root       74: #ifdef CONFIG_PREADV
1.1.1.2   root       75: static int preadv_present = 1;
                     76: #else
                     77: static int preadv_present = 0;
                     78: #endif
                     79: 
1.1       root       80: static void die2(int err, const char *what)
                     81: {
                     82:     fprintf(stderr, "%s failed: %s\n", what, strerror(err));
                     83:     abort();
                     84: }
                     85: 
                     86: static void die(const char *what)
                     87: {
                     88:     die2(errno, what);
                     89: }
                     90: 
                     91: static void mutex_lock(pthread_mutex_t *mutex)
                     92: {
                     93:     int ret = pthread_mutex_lock(mutex);
                     94:     if (ret) die2(ret, "pthread_mutex_lock");
                     95: }
                     96: 
                     97: static void mutex_unlock(pthread_mutex_t *mutex)
                     98: {
                     99:     int ret = pthread_mutex_unlock(mutex);
                    100:     if (ret) die2(ret, "pthread_mutex_unlock");
                    101: }
                    102: 
                    103: static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
                    104:                            struct timespec *ts)
                    105: {
                    106:     int ret = pthread_cond_timedwait(cond, mutex, ts);
                    107:     if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
                    108:     return ret;
                    109: }
                    110: 
                    111: static void cond_signal(pthread_cond_t *cond)
                    112: {
                    113:     int ret = pthread_cond_signal(cond);
                    114:     if (ret) die2(ret, "pthread_cond_signal");
                    115: }
                    116: 
                    117: static void thread_create(pthread_t *thread, pthread_attr_t *attr,
                    118:                           void *(*start_routine)(void*), void *arg)
                    119: {
                    120:     int ret = pthread_create(thread, attr, start_routine, arg);
                    121:     if (ret) die2(ret, "pthread_create");
                    122: }
                    123: 
1.1.1.3   root      124: static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
1.1.1.2   root      125: {
1.1.1.4   root      126:     int ret;
                    127: 
                    128:     ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
                    129:     if (ret == -1)
                    130:         return -errno;
1.1.1.2   root      131: 
1.1.1.4   root      132:     /*
1.1.1.8 ! root      133:      * This looks weird, but the aio code only considers a request
        !           134:      * successful if it has written the full number of bytes.
1.1.1.4   root      135:      *
                    136:      * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
                    137:      * so in fact we return the ioctl command here to make posix_aio_read()
                    138:      * happy..
                    139:      */
                    140:     return aiocb->aio_nbytes;
1.1.1.2   root      141: }
                    142: 
1.1.1.3   root      143: static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
                    144: {
                    145:     int ret;
                    146: 
                    147:     ret = qemu_fdatasync(aiocb->aio_fildes);
                    148:     if (ret == -1)
                    149:         return -errno;
                    150:     return 0;
                    151: }
                    152: 
                    153: #ifdef CONFIG_PREADV
1.1.1.2   root      154: 
                    155: static ssize_t
                    156: qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
                    157: {
                    158:     return preadv(fd, iov, nr_iov, offset);
                    159: }
                    160: 
                    161: static ssize_t
                    162: qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
                    163: {
                    164:     return pwritev(fd, iov, nr_iov, offset);
                    165: }
                    166: 
                    167: #else
                    168: 
                    169: static ssize_t
                    170: qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
                    171: {
                    172:     return -ENOSYS;
                    173: }
                    174: 
                    175: static ssize_t
                    176: qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
                    177: {
                    178:     return -ENOSYS;
                    179: }
                    180: 
                    181: #endif
                    182: 
1.1.1.3   root      183: static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
1.1.1.2   root      184: {
                    185:     ssize_t len;
                    186: 
                    187:     do {
1.1.1.3   root      188:         if (aiocb->aio_type & QEMU_AIO_WRITE)
1.1.1.2   root      189:             len = qemu_pwritev(aiocb->aio_fildes,
                    190:                                aiocb->aio_iov,
                    191:                                aiocb->aio_niov,
1.1.1.7   root      192:                                aiocb->aio_offset);
1.1.1.2   root      193:          else
                    194:             len = qemu_preadv(aiocb->aio_fildes,
                    195:                               aiocb->aio_iov,
                    196:                               aiocb->aio_niov,
1.1.1.7   root      197:                               aiocb->aio_offset);
1.1.1.2   root      198:     } while (len == -1 && errno == EINTR);
                    199: 
                    200:     if (len == -1)
                    201:         return -errno;
                    202:     return len;
                    203: }
                    204: 
1.1.1.7   root      205: /*
                    206:  * Read/writes the data to/from a given linear buffer.
                    207:  *
                    208:  * Returns the number of bytes handles or -errno in case of an error. Short
                    209:  * reads are only returned if the end of the file is reached.
                    210:  */
1.1.1.3   root      211: static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
1.1.1.2   root      212: {
1.1.1.3   root      213:     ssize_t offset = 0;
                    214:     ssize_t len;
1.1.1.2   root      215: 
                    216:     while (offset < aiocb->aio_nbytes) {
1.1.1.3   root      217:          if (aiocb->aio_type & QEMU_AIO_WRITE)
1.1.1.2   root      218:              len = pwrite(aiocb->aio_fildes,
                    219:                           (const char *)buf + offset,
                    220:                           aiocb->aio_nbytes - offset,
                    221:                           aiocb->aio_offset + offset);
                    222:          else
                    223:              len = pread(aiocb->aio_fildes,
                    224:                          buf + offset,
                    225:                          aiocb->aio_nbytes - offset,
                    226:                          aiocb->aio_offset + offset);
                    227: 
                    228:          if (len == -1 && errno == EINTR)
                    229:              continue;
                    230:          else if (len == -1) {
                    231:              offset = -errno;
                    232:              break;
                    233:          } else if (len == 0)
                    234:              break;
                    235: 
                    236:          offset += len;
                    237:     }
                    238: 
                    239:     return offset;
                    240: }
                    241: 
1.1.1.3   root      242: static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
1.1.1.2   root      243: {
1.1.1.3   root      244:     ssize_t nbytes;
1.1.1.2   root      245:     char *buf;
                    246: 
1.1.1.3   root      247:     if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
1.1.1.2   root      248:         /*
                    249:          * If there is just a single buffer, and it is properly aligned
                    250:          * we can just use plain pread/pwrite without any problems.
                    251:          */
                    252:         if (aiocb->aio_niov == 1)
                    253:              return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
                    254: 
                    255:         /*
                    256:          * We have more than one iovec, and all are properly aligned.
                    257:          *
                    258:          * Try preadv/pwritev first and fall back to linearizing the
                    259:          * buffer if it's not supported.
                    260:          */
1.1.1.4   root      261:         if (preadv_present) {
1.1.1.2   root      262:             nbytes = handle_aiocb_rw_vector(aiocb);
                    263:             if (nbytes == aiocb->aio_nbytes)
1.1.1.4   root      264:                 return nbytes;
1.1.1.2   root      265:             if (nbytes < 0 && nbytes != -ENOSYS)
                    266:                 return nbytes;
                    267:             preadv_present = 0;
                    268:         }
                    269: 
                    270:         /*
                    271:          * XXX(hch): short read/write.  no easy way to handle the reminder
                    272:          * using these interfaces.  For now retry using plain
                    273:          * pread/pwrite?
                    274:          */
                    275:     }
                    276: 
                    277:     /*
                    278:      * Ok, we have to do it the hard way, copy all segments into
                    279:      * a single aligned buffer.
                    280:      */
1.1.1.5   root      281:     buf = qemu_blockalign(aiocb->common.bs, aiocb->aio_nbytes);
1.1.1.3   root      282:     if (aiocb->aio_type & QEMU_AIO_WRITE) {
1.1.1.2   root      283:         char *p = buf;
                    284:         int i;
                    285: 
                    286:         for (i = 0; i < aiocb->aio_niov; ++i) {
                    287:             memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
                    288:             p += aiocb->aio_iov[i].iov_len;
                    289:         }
                    290:     }
                    291: 
                    292:     nbytes = handle_aiocb_rw_linear(aiocb, buf);
1.1.1.3   root      293:     if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
1.1.1.2   root      294:         char *p = buf;
                    295:         size_t count = aiocb->aio_nbytes, copy;
                    296:         int i;
                    297: 
                    298:         for (i = 0; i < aiocb->aio_niov && count; ++i) {
                    299:             copy = count;
                    300:             if (copy > aiocb->aio_iov[i].iov_len)
                    301:                 copy = aiocb->aio_iov[i].iov_len;
                    302:             memcpy(aiocb->aio_iov[i].iov_base, p, copy);
                    303:             p     += copy;
                    304:             count -= copy;
                    305:         }
                    306:     }
                    307:     qemu_vfree(buf);
                    308: 
                    309:     return nbytes;
                    310: }
                    311: 
1.1.1.7   root      312: static void posix_aio_notify_event(void);
                    313: 
1.1       root      314: static void *aio_thread(void *unused)
                    315: {
1.1.1.7   root      316:     mutex_lock(&lock);
                    317:     pending_threads--;
                    318:     mutex_unlock(&lock);
                    319:     do_spawn_thread();
1.1       root      320: 
                    321:     while (1) {
                    322:         struct qemu_paiocb *aiocb;
1.1.1.3   root      323:         ssize_t ret = 0;
1.1       root      324:         qemu_timeval tv;
                    325:         struct timespec ts;
                    326: 
                    327:         qemu_gettimeofday(&tv);
                    328:         ts.tv_sec = tv.tv_sec + 10;
                    329:         ts.tv_nsec = 0;
                    330: 
                    331:         mutex_lock(&lock);
                    332: 
1.1.1.3   root      333:         while (QTAILQ_EMPTY(&request_list) &&
1.1       root      334:                !(ret == ETIMEDOUT)) {
1.1.1.6   root      335:             idle_threads++;
1.1       root      336:             ret = cond_timedwait(&cond, &lock, &ts);
1.1.1.6   root      337:             idle_threads--;
1.1       root      338:         }
                    339: 
1.1.1.3   root      340:         if (QTAILQ_EMPTY(&request_list))
1.1       root      341:             break;
                    342: 
1.1.1.3   root      343:         aiocb = QTAILQ_FIRST(&request_list);
                    344:         QTAILQ_REMOVE(&request_list, aiocb, node);
1.1       root      345:         aiocb->active = 1;
                    346:         mutex_unlock(&lock);
                    347: 
1.1.1.3   root      348:         switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
                    349:         case QEMU_AIO_READ:
1.1.1.7   root      350:             ret = handle_aiocb_rw(aiocb);
                    351:             if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->common.bs->growable) {
                    352:                 /* A short read means that we have reached EOF. Pad the buffer
                    353:                  * with zeros for bytes after EOF. */
                    354:                 QEMUIOVector qiov;
                    355: 
                    356:                 qemu_iovec_init_external(&qiov, aiocb->aio_iov,
                    357:                                          aiocb->aio_niov);
                    358:                 qemu_iovec_memset_skip(&qiov, 0, aiocb->aio_nbytes - ret, ret);
                    359: 
                    360:                 ret = aiocb->aio_nbytes;
                    361:             }
                    362:             break;
1.1.1.3   root      363:         case QEMU_AIO_WRITE:
1.1.1.4   root      364:             ret = handle_aiocb_rw(aiocb);
                    365:             break;
1.1.1.3   root      366:         case QEMU_AIO_FLUSH:
1.1.1.4   root      367:             ret = handle_aiocb_flush(aiocb);
                    368:             break;
1.1.1.3   root      369:         case QEMU_AIO_IOCTL:
1.1.1.4   root      370:             ret = handle_aiocb_ioctl(aiocb);
                    371:             break;
                    372:         default:
                    373:             fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
                    374:             ret = -EINVAL;
                    375:             break;
                    376:         }
1.1       root      377: 
                    378:         mutex_lock(&lock);
1.1.1.2   root      379:         aiocb->ret = ret;
1.1       root      380:         mutex_unlock(&lock);
                    381: 
1.1.1.7   root      382:         posix_aio_notify_event();
1.1       root      383:     }
                    384: 
                    385:     cur_threads--;
                    386:     mutex_unlock(&lock);
                    387: 
                    388:     return NULL;
                    389: }
                    390: 
1.1.1.7   root      391: static void do_spawn_thread(void)
1.1       root      392: {
1.1.1.3   root      393:     sigset_t set, oldset;
                    394: 
1.1.1.7   root      395:     mutex_lock(&lock);
                    396:     if (!new_threads) {
                    397:         mutex_unlock(&lock);
                    398:         return;
                    399:     }
                    400: 
                    401:     new_threads--;
                    402:     pending_threads++;
                    403: 
                    404:     mutex_unlock(&lock);
1.1       root      405: 
1.1.1.3   root      406:     /* block all signals */
                    407:     if (sigfillset(&set)) die("sigfillset");
                    408:     if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
1.1       root      409: 
1.1.1.3   root      410:     thread_create(&thread_id, &attr, aio_thread, NULL);
1.1       root      411: 
1.1.1.3   root      412:     if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
1.1       root      413: }
                    414: 
1.1.1.7   root      415: static void spawn_thread_bh_fn(void *opaque)
                    416: {
                    417:     do_spawn_thread();
                    418: }
                    419: 
                    420: static void spawn_thread(void)
                    421: {
                    422:     cur_threads++;
                    423:     new_threads++;
                    424:     /* If there are threads being created, they will spawn new workers, so
                    425:      * we don't spend time creating many threads in a loop holding a mutex or
                    426:      * starving the current vcpu.
                    427:      *
                    428:      * If there are no idle threads, ask the main thread to create one, so we
                    429:      * inherit the correct affinity instead of the vcpu affinity.
                    430:      */
                    431:     if (!pending_threads) {
                    432:         qemu_bh_schedule(new_thread_bh);
                    433:     }
                    434: }
                    435: 
1.1.1.3   root      436: static void qemu_paio_submit(struct qemu_paiocb *aiocb)
1.1       root      437: {
                    438:     aiocb->ret = -EINPROGRESS;
                    439:     aiocb->active = 0;
                    440:     mutex_lock(&lock);
                    441:     if (idle_threads == 0 && cur_threads < max_threads)
                    442:         spawn_thread();
1.1.1.3   root      443:     QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
1.1       root      444:     mutex_unlock(&lock);
                    445:     cond_signal(&cond);
                    446: }
                    447: 
1.1.1.3   root      448: static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
1.1       root      449: {
                    450:     ssize_t ret;
                    451: 
                    452:     mutex_lock(&lock);
                    453:     ret = aiocb->ret;
                    454:     mutex_unlock(&lock);
                    455: 
                    456:     return ret;
                    457: }
                    458: 
1.1.1.3   root      459: static int qemu_paio_error(struct qemu_paiocb *aiocb)
1.1       root      460: {
                    461:     ssize_t ret = qemu_paio_return(aiocb);
                    462: 
                    463:     if (ret < 0)
                    464:         ret = -ret;
                    465:     else
                    466:         ret = 0;
                    467: 
                    468:     return ret;
                    469: }
                    470: 
1.1.1.8 ! root      471: static void posix_aio_read(void *opaque)
1.1       root      472: {
1.1.1.3   root      473:     PosixAioState *s = opaque;
                    474:     struct qemu_paiocb *acb, **pacb;
1.1       root      475:     int ret;
1.1.1.8 ! root      476:     ssize_t len;
        !           477: 
        !           478:     /* read all bytes from signal pipe */
        !           479:     for (;;) {
        !           480:         char bytes[16];
        !           481: 
        !           482:         len = read(s->rfd, bytes, sizeof(bytes));
        !           483:         if (len == -1 && errno == EINTR)
        !           484:             continue; /* try again */
        !           485:         if (len == sizeof(bytes))
        !           486:             continue; /* more to read */
        !           487:         break;
        !           488:     }
1.1.1.3   root      489: 
                    490:     for(;;) {
                    491:         pacb = &s->first_aio;
                    492:         for(;;) {
                    493:             acb = *pacb;
                    494:             if (!acb)
1.1.1.8 ! root      495:                 return;
1.1.1.3   root      496: 
                    497:             ret = qemu_paio_error(acb);
                    498:             if (ret == ECANCELED) {
                    499:                 /* remove the request */
                    500:                 *pacb = acb->next;
                    501:                 qemu_aio_release(acb);
                    502:             } else if (ret != EINPROGRESS) {
                    503:                 /* end of aio */
                    504:                 if (ret == 0) {
                    505:                     ret = qemu_paio_return(acb);
                    506:                     if (ret == acb->aio_nbytes)
                    507:                         ret = 0;
                    508:                     else
                    509:                         ret = -EINVAL;
                    510:                 } else {
                    511:                     ret = -ret;
                    512:                 }
1.1.1.6   root      513: 
                    514:                 trace_paio_complete(acb, acb->common.opaque, ret);
                    515: 
1.1.1.3   root      516:                 /* remove the request */
                    517:                 *pacb = acb->next;
                    518:                 /* call the callback */
                    519:                 acb->common.cb(acb->common.opaque, ret);
                    520:                 qemu_aio_release(acb);
                    521:                 break;
                    522:             } else {
                    523:                 pacb = &acb->next;
                    524:             }
                    525:         }
                    526:     }
                    527: }
                    528: 
                    529: static int posix_aio_flush(void *opaque)
                    530: {
                    531:     PosixAioState *s = opaque;
                    532:     return !!s->first_aio;
                    533: }
                    534: 
                    535: static PosixAioState *posix_aio_state;
                    536: 
1.1.1.7   root      537: static void posix_aio_notify_event(void)
1.1.1.3   root      538: {
1.1.1.7   root      539:     char byte = 0;
                    540:     ssize_t ret;
1.1.1.3   root      541: 
1.1.1.7   root      542:     ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
                    543:     if (ret < 0 && errno != EAGAIN)
                    544:         die("write()");
1.1.1.3   root      545: }
                    546: 
                    547: static void paio_remove(struct qemu_paiocb *acb)
                    548: {
                    549:     struct qemu_paiocb **pacb;
                    550: 
                    551:     /* remove the callback from the queue */
                    552:     pacb = &posix_aio_state->first_aio;
                    553:     for(;;) {
                    554:         if (*pacb == NULL) {
                    555:             fprintf(stderr, "paio_remove: aio request not found!\n");
                    556:             break;
                    557:         } else if (*pacb == acb) {
                    558:             *pacb = acb->next;
                    559:             qemu_aio_release(acb);
                    560:             break;
                    561:         }
                    562:         pacb = &(*pacb)->next;
                    563:     }
                    564: }
                    565: 
                    566: static void paio_cancel(BlockDriverAIOCB *blockacb)
                    567: {
                    568:     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
                    569:     int active = 0;
1.1       root      570: 
1.1.1.6   root      571:     trace_paio_cancel(acb, acb->common.opaque);
                    572: 
1.1       root      573:     mutex_lock(&lock);
1.1.1.3   root      574:     if (!acb->active) {
                    575:         QTAILQ_REMOVE(&request_list, acb, node);
                    576:         acb->ret = -ECANCELED;
                    577:     } else if (acb->ret == -EINPROGRESS) {
                    578:         active = 1;
                    579:     }
1.1       root      580:     mutex_unlock(&lock);
                    581: 
1.1.1.3   root      582:     if (active) {
                    583:         /* fail safe: if the aio could not be canceled, we wait for
                    584:            it */
                    585:         while (qemu_paio_error(acb) == EINPROGRESS)
                    586:             ;
                    587:     }
                    588: 
                    589:     paio_remove(acb);
                    590: }
                    591: 
                    592: static AIOPool raw_aio_pool = {
                    593:     .aiocb_size         = sizeof(struct qemu_paiocb),
                    594:     .cancel             = paio_cancel,
                    595: };
                    596: 
                    597: BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
                    598:         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
                    599:         BlockDriverCompletionFunc *cb, void *opaque, int type)
                    600: {
                    601:     struct qemu_paiocb *acb;
                    602: 
                    603:     acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
                    604:     acb->aio_type = type;
                    605:     acb->aio_fildes = fd;
                    606: 
                    607:     if (qiov) {
                    608:         acb->aio_iov = qiov->iov;
                    609:         acb->aio_niov = qiov->niov;
                    610:     }
                    611:     acb->aio_nbytes = nb_sectors * 512;
                    612:     acb->aio_offset = sector_num * 512;
                    613: 
                    614:     acb->next = posix_aio_state->first_aio;
                    615:     posix_aio_state->first_aio = acb;
                    616: 
1.1.1.5   root      617:     trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
1.1.1.3   root      618:     qemu_paio_submit(acb);
                    619:     return &acb->common;
                    620: }
                    621: 
                    622: BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
                    623:         unsigned long int req, void *buf,
                    624:         BlockDriverCompletionFunc *cb, void *opaque)
                    625: {
                    626:     struct qemu_paiocb *acb;
                    627: 
                    628:     acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
                    629:     acb->aio_type = QEMU_AIO_IOCTL;
                    630:     acb->aio_fildes = fd;
                    631:     acb->aio_offset = 0;
                    632:     acb->aio_ioctl_buf = buf;
                    633:     acb->aio_ioctl_cmd = req;
                    634: 
                    635:     acb->next = posix_aio_state->first_aio;
                    636:     posix_aio_state->first_aio = acb;
                    637: 
                    638:     qemu_paio_submit(acb);
                    639:     return &acb->common;
                    640: }
                    641: 
                    642: int paio_init(void)
                    643: {
                    644:     PosixAioState *s;
                    645:     int fds[2];
                    646:     int ret;
                    647: 
                    648:     if (posix_aio_state)
                    649:         return 0;
                    650: 
1.1.1.7   root      651:     s = g_malloc(sizeof(PosixAioState));
1.1.1.3   root      652: 
                    653:     s->first_aio = NULL;
                    654:     if (qemu_pipe(fds) == -1) {
                    655:         fprintf(stderr, "failed to create pipe\n");
1.1.1.7   root      656:         g_free(s);
1.1.1.3   root      657:         return -1;
                    658:     }
                    659: 
                    660:     s->rfd = fds[0];
                    661:     s->wfd = fds[1];
                    662: 
                    663:     fcntl(s->rfd, F_SETFL, O_NONBLOCK);
                    664:     fcntl(s->wfd, F_SETFL, O_NONBLOCK);
                    665: 
1.1.1.8 ! root      666:     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
1.1.1.3   root      667: 
                    668:     ret = pthread_attr_init(&attr);
                    669:     if (ret)
                    670:         die2(ret, "pthread_attr_init");
                    671: 
                    672:     ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
                    673:     if (ret)
                    674:         die2(ret, "pthread_attr_setdetachstate");
                    675: 
                    676:     QTAILQ_INIT(&request_list);
1.1.1.7   root      677:     new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
1.1.1.3   root      678: 
                    679:     posix_aio_state = s;
                    680:     return 0;
1.1       root      681: }

unix.superglobalmegacorp.com