File:  [Qemu by Fabrice Bellard] / qemu / posix-aio-compat.c
Revision 1.1.1.1 (vendor branch): download - view: text, annotated - select for diffs
Tue Apr 24 16:50:31 2018 UTC (3 years, 1 month ago) by root
Branches: qemu, MAIN
CVS tags: qemu0105, qemu0104, qemu0103, qemu0102, qemu0101, qemu0100, HEAD
qemu 0.10.0

    1: /*
    2:  * QEMU posix-aio emulation
    3:  *
    4:  * Copyright IBM, Corp. 2008
    5:  *
    6:  * Authors:
    7:  *  Anthony Liguori   <aliguori@us.ibm.com>
    8:  *
    9:  * This work is licensed under the terms of the GNU GPL, version 2.  See
   10:  * the COPYING file in the top-level directory.
   11:  *
   12:  */
   13: 
   14: #include <pthread.h>
   15: #include <unistd.h>
   16: #include <errno.h>
   17: #include <time.h>
   18: #include <string.h>
   19: #include <stdlib.h>
   20: #include <stdio.h>
   21: #include "osdep.h"
   22: 
   23: #include "posix-aio-compat.h"
   24: 
   25: static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
   26: static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
   27: static pthread_t thread_id;
   28: static pthread_attr_t attr;
   29: static int max_threads = 64;
   30: static int cur_threads = 0;
   31: static int idle_threads = 0;
   32: static TAILQ_HEAD(, qemu_paiocb) request_list;
   33: 
   34: static void die2(int err, const char *what)
   35: {
   36:     fprintf(stderr, "%s failed: %s\n", what, strerror(err));
   37:     abort();
   38: }
   39: 
   40: static void die(const char *what)
   41: {
   42:     die2(errno, what);
   43: }
   44: 
   45: static void mutex_lock(pthread_mutex_t *mutex)
   46: {
   47:     int ret = pthread_mutex_lock(mutex);
   48:     if (ret) die2(ret, "pthread_mutex_lock");
   49: }
   50: 
   51: static void mutex_unlock(pthread_mutex_t *mutex)
   52: {
   53:     int ret = pthread_mutex_unlock(mutex);
   54:     if (ret) die2(ret, "pthread_mutex_unlock");
   55: }
   56: 
   57: static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
   58:                            struct timespec *ts)
   59: {
   60:     int ret = pthread_cond_timedwait(cond, mutex, ts);
   61:     if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
   62:     return ret;
   63: }
   64: 
   65: static void cond_signal(pthread_cond_t *cond)
   66: {
   67:     int ret = pthread_cond_signal(cond);
   68:     if (ret) die2(ret, "pthread_cond_signal");
   69: }
   70: 
   71: static void thread_create(pthread_t *thread, pthread_attr_t *attr,
   72:                           void *(*start_routine)(void*), void *arg)
   73: {
   74:     int ret = pthread_create(thread, attr, start_routine, arg);
   75:     if (ret) die2(ret, "pthread_create");
   76: }
   77: 
   78: static void *aio_thread(void *unused)
   79: {
   80:     pid_t pid;
   81:     sigset_t set;
   82: 
   83:     pid = getpid();
   84: 
   85:     /* block all signals */
   86:     if (sigfillset(&set)) die("sigfillset");
   87:     if (sigprocmask(SIG_BLOCK, &set, NULL)) die("sigprocmask");
   88: 
   89:     while (1) {
   90:         struct qemu_paiocb *aiocb;
   91:         size_t offset;
   92:         int ret = 0;
   93:         qemu_timeval tv;
   94:         struct timespec ts;
   95: 
   96:         qemu_gettimeofday(&tv);
   97:         ts.tv_sec = tv.tv_sec + 10;
   98:         ts.tv_nsec = 0;
   99: 
  100:         mutex_lock(&lock);
  101: 
  102:         while (TAILQ_EMPTY(&request_list) &&
  103:                !(ret == ETIMEDOUT)) {
  104:             ret = cond_timedwait(&cond, &lock, &ts);
  105:         }
  106: 
  107:         if (TAILQ_EMPTY(&request_list))
  108:             break;
  109: 
  110:         aiocb = TAILQ_FIRST(&request_list);
  111:         TAILQ_REMOVE(&request_list, aiocb, node);
  112: 
  113:         offset = 0;
  114:         aiocb->active = 1;
  115: 
  116:         idle_threads--;
  117:         mutex_unlock(&lock);
  118: 
  119:         while (offset < aiocb->aio_nbytes) {
  120:             ssize_t len;
  121: 
  122:             if (aiocb->is_write)
  123:                 len = pwrite(aiocb->aio_fildes,
  124:                              (const char *)aiocb->aio_buf + offset,
  125:                              aiocb->aio_nbytes - offset,
  126:                              aiocb->aio_offset + offset);
  127:             else
  128:                 len = pread(aiocb->aio_fildes,
  129:                             (char *)aiocb->aio_buf + offset,
  130:                             aiocb->aio_nbytes - offset,
  131:                             aiocb->aio_offset + offset);
  132: 
  133:             if (len == -1 && errno == EINTR)
  134:                 continue;
  135:             else if (len == -1) {
  136:                 offset = -errno;
  137:                 break;
  138:             } else if (len == 0)
  139:                 break;
  140: 
  141:             offset += len;
  142:         }
  143: 
  144:         mutex_lock(&lock);
  145:         aiocb->ret = offset;
  146:         idle_threads++;
  147:         mutex_unlock(&lock);
  148: 
  149:         if (kill(pid, aiocb->ev_signo)) die("kill failed");
  150:     }
  151: 
  152:     idle_threads--;
  153:     cur_threads--;
  154:     mutex_unlock(&lock);
  155: 
  156:     return NULL;
  157: }
  158: 
  159: static void spawn_thread(void)
  160: {
  161:     cur_threads++;
  162:     idle_threads++;
  163:     thread_create(&thread_id, &attr, aio_thread, NULL);
  164: }
  165: 
  166: int qemu_paio_init(struct qemu_paioinit *aioinit)
  167: {
  168:     int ret;
  169: 
  170:     ret = pthread_attr_init(&attr);
  171:     if (ret) die2(ret, "pthread_attr_init");
  172: 
  173:     ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  174:     if (ret) die2(ret, "pthread_attr_setdetachstate");
  175: 
  176:     TAILQ_INIT(&request_list);
  177: 
  178:     return 0;
  179: }
  180: 
  181: static int qemu_paio_submit(struct qemu_paiocb *aiocb, int is_write)
  182: {
  183:     aiocb->is_write = is_write;
  184:     aiocb->ret = -EINPROGRESS;
  185:     aiocb->active = 0;
  186:     mutex_lock(&lock);
  187:     if (idle_threads == 0 && cur_threads < max_threads)
  188:         spawn_thread();
  189:     TAILQ_INSERT_TAIL(&request_list, aiocb, node);
  190:     mutex_unlock(&lock);
  191:     cond_signal(&cond);
  192: 
  193:     return 0;
  194: }
  195: 
  196: int qemu_paio_read(struct qemu_paiocb *aiocb)
  197: {
  198:     return qemu_paio_submit(aiocb, 0);
  199: }
  200: 
  201: int qemu_paio_write(struct qemu_paiocb *aiocb)
  202: {
  203:     return qemu_paio_submit(aiocb, 1);
  204: }
  205: 
  206: ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
  207: {
  208:     ssize_t ret;
  209: 
  210:     mutex_lock(&lock);
  211:     ret = aiocb->ret;
  212:     mutex_unlock(&lock);
  213: 
  214:     return ret;
  215: }
  216: 
  217: int qemu_paio_error(struct qemu_paiocb *aiocb)
  218: {
  219:     ssize_t ret = qemu_paio_return(aiocb);
  220: 
  221:     if (ret < 0)
  222:         ret = -ret;
  223:     else
  224:         ret = 0;
  225: 
  226:     return ret;
  227: }
  228: 
  229: int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
  230: {
  231:     int ret;
  232: 
  233:     mutex_lock(&lock);
  234:     if (!aiocb->active) {
  235:         TAILQ_REMOVE(&request_list, aiocb, node);
  236:         aiocb->ret = -ECANCELED;
  237:         ret = QEMU_PAIO_CANCELED;
  238:     } else if (aiocb->ret == -EINPROGRESS)
  239:         ret = QEMU_PAIO_NOTCANCELED;
  240:     else
  241:         ret = QEMU_PAIO_ALLDONE;
  242:     mutex_unlock(&lock);
  243: 
  244:     return ret;
  245: }

unix.superglobalmegacorp.com