Annotation of XNU/osfmk/ipc/ipc_mqueue.c, revision 1.1.1.1

1.1       root        1: /*
                      2:  * Copyright (c) 2000 Apple Computer, Inc. All rights reserved.
                      3:  *
                      4:  * @APPLE_LICENSE_HEADER_START@
                      5:  * 
                      6:  * The contents of this file constitute Original Code as defined in and
                      7:  * are subject to the Apple Public Source License Version 1.1 (the
                      8:  * "License").  You may not use this file except in compliance with the
                      9:  * License.  Please obtain a copy of the License at
                     10:  * http://www.apple.com/publicsource and read it before using this file.
                     11:  * 
                     12:  * This Original Code and all software distributed under the License are
                     13:  * distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, EITHER
                     14:  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
                     15:  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
                     16:  * FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT.  Please see the
                     17:  * License for the specific language governing rights and limitations
                     18:  * under the License.
                     19:  * 
                     20:  * @APPLE_LICENSE_HEADER_END@
                     21:  */
                     22: /*
                     23:  * @OSF_FREE_COPYRIGHT@
                     24:  */
                     25: /* 
                     26:  * Mach Operating System
                     27:  * Copyright (c) 1991,1990,1989 Carnegie Mellon University
                     28:  * All Rights Reserved.
                     29:  * 
                     30:  * Permission to use, copy, modify and distribute this software and its
                     31:  * documentation is hereby granted, provided that both the copyright
                     32:  * notice and this permission notice appear in all copies of the
                     33:  * software, derivative works or modified versions, and any portions
                     34:  * thereof, and that both notices appear in supporting documentation.
                     35:  * 
                     36:  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
                     37:  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
                     38:  * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
                     39:  * 
                     40:  * Carnegie Mellon requests users of this software to return to
                     41:  * 
                     42:  *  Software Distribution Coordinator  or  [email protected]
                     43:  *  School of Computer Science
                     44:  *  Carnegie Mellon University
                     45:  *  Pittsburgh PA 15213-3890
                     46:  * 
                     47:  * any improvements or extensions that they make and grant Carnegie Mellon
                     48:  * the rights to redistribute these changes.
                     49:  */
                     50: /*
                     51:  */
                     52: /*
                     53:  *     File:   ipc/ipc_mqueue.c
                     54:  *     Author: Rich Draves
                     55:  *     Date:   1989
                     56:  *
                     57:  *     Functions to manipulate IPC message queues.
                     58:  */
                     59: 
                     60: #include <dipc.h>
                     61: 
                     62: #include <mach/port.h>
                     63: #include <mach/message.h>
                     64: #include <mach/sync_policy.h>
                     65: 
                     66: #include <kern/assert.h>
                     67: #include <kern/counters.h>
                     68: #include <kern/sched_prim.h>
                     69: #include <kern/ipc_kobject.h>
                     70: #include <kern/misc_protos.h>
                     71: #include <kern/task.h>
                     72: #include <kern/thread.h>
                     73: #include <kern/wait_queue.h>
                     74: 
                     75: #include <ipc/ipc_mqueue.h>
                     76: #include <ipc/ipc_kmsg.h>
                     77: #include <ipc/ipc_port.h>
                     78: #include <ipc/ipc_pset.h>
                     79: #include <ipc/ipc_space.h>
                     80: 
                     81: #include <ddb/tr.h>
                     82: 
                     83: int ipc_mqueue_full;           /* address is event for queue space */
                     84: int ipc_mqueue_rcv;            /* address is event for message arrival */
                     85: 
                     86: #define TR_ENABLE 0
                     87: 
                     88: /*
                     89:  *     Routine:        ipc_mqueue_init
                     90:  *     Purpose:
                     91:  *             Initialize a newly-allocated message queue.
                     92:  */
                     93: void
                     94: ipc_mqueue_init(
                     95:        ipc_mqueue_t    mqueue,
                     96:        boolean_t       is_set)
                     97: {
                     98:        if (is_set) {
                     99:                wait_queue_sub_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO);
                    100:        } else {
                    101:                wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
                    102:                ipc_kmsg_queue_init(&mqueue->imq_messages);
                    103:                mqueue->imq_seqno = 0;
                    104:                mqueue->imq_msgcount = 0;
                    105:                mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
                    106:                mqueue->imq_fullwaiters = FALSE;
                    107:        }
                    108: }
                    109: 
                    110: /*
                    111:  *     Routine:        ipc_mqueue_member
                    112:  *     Purpose:
                    113:  *             Indicate whether the (port) mqueue is a member of
                    114:  *             this portset's mqueue.  We do this by checking
                    115:  *             whether the portset mqueue's waitq is an member of
                    116:  *             the port's mqueue waitq.
                    117:  *     Conditions:
                    118:  *             the portset's mqueue is not already a member
                    119:  *             this may block while allocating linkage structures.
                    120:  */
                    121: 
                    122: boolean_t
                    123: ipc_mqueue_member(
                    124:        ipc_mqueue_t    port_mqueue,
                    125:        ipc_mqueue_t    set_mqueue)
                    126: {
                    127:        wait_queue_t    port_waitq = &port_mqueue->imq_wait_queue;
                    128:        wait_queue_t    set_waitq = &set_mqueue->imq_wait_queue;
                    129: 
                    130:        return (wait_queue_member(port_waitq, set_waitq));
                    131: 
                    132: }
                    133: 
                    134: /*
                    135:  *     Routine:        ipc_mqueue_remove
                    136:  *     Purpose:
                    137:  *             Remove the association between the queue and the specified
                    138:  *             subordinate message queue.
                    139:  */
                    140: 
                    141: kern_return_t
                    142: ipc_mqueue_remove(
                    143:        ipc_mqueue_t     mqueue,
                    144:        ipc_mqueue_t     sub_mqueue)
                    145: {
                    146:        wait_queue_t     mq_waitq = &mqueue->imq_wait_queue;
                    147:        wait_queue_sub_t sub_waitq = &sub_mqueue->imq_set_queue;
                    148: 
                    149:        if (wait_queue_member(mq_waitq, sub_waitq)) {
                    150:            wait_queue_unlink(mq_waitq, sub_waitq);
                    151:            return KERN_SUCCESS;
                    152:        }
                    153:        return KERN_NOT_IN_SET;    
                    154: }
                    155: 
                    156: /*
                    157:  *     Routine:        ipc_mqueue_remove_one
                    158:  *     Purpose:
                    159:  *             Find and remove one subqueue from the queue.
                    160:  *     Conditions:
                    161:  *             Will return the set mqueue that was removed
                    162:  */
                    163: void
                    164: ipc_mqueue_remove_one(
                    165:        ipc_mqueue_t    mqueue,
                    166:        ipc_mqueue_t    *sub_queuep)
                    167: {
                    168:        wait_queue_t    mq_waitq = &mqueue->imq_wait_queue;
                    169: 
                    170:        wait_queue_unlink_one(mq_waitq, (wait_queue_sub_t *)sub_queuep);
                    171:        return;
                    172: }
                    173: 
                    174: 
                    175: /*
                    176:  *     Routine:        ipc_mqueue_add
                    177:  *     Purpose:
                    178:  *             Associate the portset's mqueue with the port's mqueue.
                    179:  *             This has to be done so that posting the port will wakeup
                    180:  *             a portset waiter.  If there are waiters on the portset
                    181:  *             mqueue and messages on the port mqueue, try to match them
                    182:  *             up now.
                    183:  *     Conditions:
                    184:  *             May block.
                    185:  */
                    186: kern_return_t
                    187: ipc_mqueue_add(
                    188:        ipc_mqueue_t     port_mqueue,
                    189:        ipc_mqueue_t     set_mqueue)
                    190: {
                    191:        wait_queue_t     port_waitq = &port_mqueue->imq_wait_queue;
                    192:        wait_queue_sub_t set_waitq = &set_mqueue->imq_set_queue;
                    193:        ipc_kmsg_queue_t kmsgq;
                    194:        ipc_kmsg_t       kmsg, next;
                    195:        kern_return_t    kr;
                    196: 
                    197:        kr = wait_queue_link(port_waitq, set_waitq);
                    198:        if (kr != KERN_SUCCESS)
                    199:                return kr;
                    200: 
                    201:        /*
                    202:         * Now that the set has been added to the port, there may be
                    203:         * messages queued on the port and threads waiting on the set
                    204:         * waitq.  Lets get them together.
                    205:         */
                    206:        imq_lock(port_mqueue);
                    207:        kmsgq = &port_mqueue->imq_messages;
                    208:        for (kmsg = ipc_kmsg_queue_first(kmsgq);
                    209:             kmsg != IKM_NULL;
                    210:             kmsg = next) {
                    211:                next = ipc_kmsg_queue_next(kmsgq, kmsg);
                    212: 
                    213:                for (;;) {
                    214:                        thread_t th;
                    215:                        spl_t s;
                    216: 
                    217:                        s = splsched();
                    218:                        th = wait_queue_wakeup_identity_locked(port_waitq,
                    219:                                                                                                   IPC_MQUEUE_RECEIVE,
                    220:                                                                                                   THREAD_AWAKENED,
                    221:                                                                                                   FALSE);
                    222:                        /* waitq/mqueue still locked, thread locked */
                    223: 
                    224:                        if (th == THREAD_NULL) {
                    225:                                splx(s);
                    226:                                goto leave;
                    227:                        }
                    228: 
                    229:                        /*
                    230:                         * Found a receiver. see if they can handle the message
                    231:                         * correctly (the message is not too large for them, or
                    232:                         * they didn't care to be informed that the message was
                    233:                         * too large).  If they can't handle it, take them off
                    234:                         * the list and let them go back and figure it out and
                    235:                         * just move onto the next.
                    236:                         */
                    237:                        if (th->ith_msize <
                    238:                            kmsg->ikm_header.msgh_size +
                    239:                            REQUESTED_TRAILER_SIZE(th->ith_option)) {
                    240:                                th->ith_state = MACH_RCV_TOO_LARGE;
                    241:                                th->ith_msize = kmsg->ikm_header.msgh_size;
                    242:                                if (th->ith_option & MACH_RCV_LARGE) {
                    243:                                        /*
                    244:                                         * let him go without message
                    245:                                         */
                    246:                                        th->ith_kmsg = IKM_NULL;
                    247:                                        th->ith_seqno = 0;
                    248:                                        thread_unlock(th);
                    249:                                        splx(s);
                    250:                                        continue; /* find another thread */
                    251:                                }
                    252:                        } else {
                    253:                                th->ith_state = MACH_MSG_SUCCESS;
                    254:                        }
                    255: 
                    256:                        /*
                    257:                         * This thread is going to take this message,
                    258:                         * so give it to him.
                    259:                         */
                    260:                        ipc_mqueue_release_msgcount(port_mqueue);
                    261:                        ipc_kmsg_rmqueue(kmsgq, kmsg);
                    262:                        th->ith_kmsg = kmsg;
                    263:                        th->ith_seqno = port_mqueue->imq_seqno++;
                    264:                        thread_unlock(th);
                    265:                        splx(s);
                    266:                        break;  /* go to next message */
                    267:                }
                    268:                        
                    269:        }
                    270:  leave:
                    271:        imq_unlock(port_mqueue);
                    272:        return KERN_SUCCESS;
                    273: }
                    274: 
                    275: /*
                    276:  *     Routine:        ipc_mqueue_changed
                    277:  *     Purpose:
                    278:  *             Wake up receivers waiting in a message queue.
                    279:  *     Conditions:
                    280:  *             The message queue is locked.
                    281:  */
                    282: 
                    283: void
                    284: ipc_mqueue_changed(
                    285:        ipc_mqueue_t            mqueue)
                    286: {
                    287:        wait_queue_wakeup_all_locked(&mqueue->imq_wait_queue,
                    288:                                                                 IPC_MQUEUE_RECEIVE,
                    289:                                                                 THREAD_RESTART,
                    290:                                                                 FALSE);                /* unlock waitq? */
                    291: }
                    292: 
                    293: 
                    294:                
                    295: 
                    296: /*
                    297:  *     Routine:        ipc_mqueue_send
                    298:  *     Purpose:
                    299:  *             Send a message to a message queue.  The message holds a reference
                    300:  *             for the destination port for this message queue in the 
                    301:  *             msgh_remote_port field.
                    302:  *
                    303:  *             If unsuccessful, the caller still has possession of
                    304:  *             the message and must do something with it.  If successful,
                    305:  *             the message is queued, given to a receiver, or destroyed.
                    306:  *     Conditions:
                    307:  *             Nothing locked.
                    308:  *     Returns:
                    309:  *             MACH_MSG_SUCCESS        The message was accepted.
                    310:  *             MACH_SEND_TIMED_OUT     Caller still has message.
                    311:  *             MACH_SEND_INTERRUPTED   Caller still has message.
                    312:  */
                    313: mach_msg_return_t
                    314: ipc_mqueue_send(
                    315:        ipc_mqueue_t            mqueue,
                    316:        ipc_kmsg_t                      kmsg,
                    317:        mach_msg_option_t       option,
                    318:        mach_msg_timeout_t      timeout)
                    319: {
                    320:    int save_wait_result;
                    321: 
                    322:        /*
                    323:         *  Don't block if:
                    324:         *      1) We're under the queue limit.
                    325:         *      2) Caller used the MACH_SEND_ALWAYS internal option.
                    326:         *      3) Message is sent to a send-once right.
                    327:         */
                    328:        imq_lock(mqueue);
                    329: 
                    330:        if (!imq_full(mqueue) ||
                    331:                (option & MACH_SEND_ALWAYS) ||
                    332:                (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header.msgh_bits) ==
                    333:                 MACH_MSG_TYPE_PORT_SEND_ONCE)) {
                    334:                mqueue->imq_msgcount++;
                    335:                imq_unlock(mqueue);
                    336: 
                    337:        } else {
                    338: 
                    339:                /* 
                    340:                 * We have to wait for space to be granted to us.
                    341:                 */
                    342:                if ((option & MACH_SEND_TIMEOUT) && (timeout == 0)) {
                    343:                        imq_unlock(mqueue);
                    344:                        return MACH_SEND_TIMED_OUT;
                    345:                }
                    346:                mqueue->imq_fullwaiters = TRUE;
                    347:                wait_queue_assert_wait_locked(&mqueue->imq_wait_queue,
                    348:                                                                          IPC_MQUEUE_FULL,
                    349:                                                                          THREAD_ABORTSAFE,
                    350:                                                                          TRUE); /* unlock? */
                    351:                /* wait/mqueue is unlocked */
                    352:                
                    353:                if (option & MACH_SEND_TIMEOUT)
                    354:                        thread_set_timer(timeout, 1000*NSEC_PER_USEC);
                    355: 
                    356:                counter(c_ipc_mqueue_send_block++);
                    357:                save_wait_result = thread_block((void (*)(void)) 0);
                    358:                
                    359:                if (option & MACH_SEND_TIMEOUT)
                    360:                        thread_cancel_timer();
                    361: 
                    362: 
                    363:                switch (save_wait_result) {
                    364:                case THREAD_TIMED_OUT:
                    365:                        assert(option & MACH_SEND_TIMEOUT);
                    366:                        return MACH_SEND_TIMED_OUT;
                    367:                        
                    368:                case THREAD_AWAKENED:
                    369:                        break;          /* we can proceed - inherited msgcount from waker */
                    370:                        
                    371:                case THREAD_INTERRUPTED:
                    372:                        return MACH_SEND_INTERRUPTED;
                    373:                        
                    374:                case THREAD_RESTART:
                    375:                default:
                    376:                        panic("ipc_mqueue_send");
                    377:                }
                    378:        }
                    379: 
                    380:        ipc_mqueue_post(mqueue, kmsg);
                    381:        return MACH_MSG_SUCCESS;
                    382: }
                    383: 
                    384: /*
                    385:  *     Routine:        ipc_mqueue_release_msgcount
                    386:  *     Purpose:
                    387:  *             Release a message queue reference in the case where we
                    388:  *             found a waiter.
                    389:  *
                    390:  *     Conditions:
                    391:  *             The message queue is locked
                    392:  */
                    393: void
                    394: ipc_mqueue_release_msgcount(
                    395:        ipc_mqueue_t mqueue)    
                    396: {
                    397:        assert(imq_held(mqueue));
                    398:        assert(mqueue->imq_msgcount > 0);
                    399: 
                    400:        mqueue->imq_msgcount--;
                    401:        if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
                    402:                        if (wait_queue_wakeup_one_locked(&mqueue->imq_wait_queue,
                    403:                                                                                         IPC_MQUEUE_FULL,
                    404:                                                                                         THREAD_AWAKENED,
                    405:                                                                                         FALSE) != KERN_SUCCESS) {
                    406:                                        mqueue->imq_fullwaiters = FALSE;
                    407:                        } else {
                    408:                                        mqueue->imq_msgcount++;  /* gave it away */
                    409:                        }
                    410:        }
                    411: }
                    412: 
                    413: /*
                    414:  *     Routine:        ipc_mqueue_post
                    415:  *     Purpose:
                    416:  *             Post a message to a waiting receiver or enqueue it.  If a
                    417:  *             receiver is waiting, we can release our reserved space in
                    418:  *             the message queue.
                    419:  *
                    420:  *     Conditions:
                    421:  *             If we need to queue, our space in the message queue is reserved.
                    422:  */
                    423: void
                    424: ipc_mqueue_post(
                    425:        register ipc_mqueue_t   mqueue,
                    426:        register ipc_kmsg_t             kmsg)
                    427: {
                    428: 
                    429:        /*
                    430:         *      While the msg queue     is locked, we have control of the
                    431:         *  kmsg, so the ref in it for the port is still good.
                    432:         *
                    433:         *      Check for a receiver for the message.
                    434:         */
                    435:        imq_lock(mqueue);
                    436:        for (;;) {
                    437:                wait_queue_t waitq = &mqueue->imq_wait_queue;
                    438:                thread_t receiver;
                    439:                spl_t s;
                    440: 
                    441:                s = splsched();
                    442:                receiver = wait_queue_wakeup_identity_locked(waitq,
                    443:                                                                                                         IPC_MQUEUE_RECEIVE,
                    444:                                                                                                         THREAD_AWAKENED,
                    445:                                                                                                         FALSE);
                    446:                /* waitq still locked, thread locked */
                    447: 
                    448:                if (receiver == THREAD_NULL) {
                    449:                        /* 
                    450:                         * no receivers; queue kmsg
                    451:                         */
                    452:                        assert(mqueue->imq_msgcount > 0);
                    453:                        ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
                    454:                        splx(s);
                    455:                        break;
                    456:                }
                    457:                
                    458:                /*
                    459:                 * We found a waiting thread.
                    460:                 * If the message is too large or the scatter list is too small
                    461:                 * the thread we wake up will get that as its status.
                    462:                 */
                    463:                if (receiver->ith_msize <
                    464:                    (kmsg->ikm_header.msgh_size) +
                    465:                    REQUESTED_TRAILER_SIZE(receiver->ith_option)) {
                    466:                        receiver->ith_msize = kmsg->ikm_header.msgh_size;
                    467:                        receiver->ith_state = MACH_RCV_TOO_LARGE;
                    468:                } else {
                    469:                        receiver->ith_state =
                    470: #ifdef DOITFORSCATTER
                    471:                    (receiver->ith_scatter_list != MACH_MSG_BODY_NULL) ?
                    472:                    ipc_kmsg_check_scatter(kmsg,
                    473:                                           receiver->ith_option,
                    474:                                           &receiver->ith_scatter_list, 
                    475:                                           &receiver->ith_scatter_list_size) :
                    476: #endif /* DOITFORSCATTER */
                    477:                    MACH_MSG_SUCCESS;
                    478:                }
                    479: 
                    480:                /*
                    481:                 * If there is no problem with the upcoming receive, or the
                    482:                 * receiver thread didn't specifically ask for special too
                    483:                 * large error condition, go ahead and select it anyway.
                    484:                 */
                    485:                if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
                    486:                    !(receiver->ith_option & MACH_RCV_LARGE)) {
                    487: 
                    488:                        receiver->ith_kmsg = kmsg;
                    489:                        receiver->ith_seqno = mqueue->imq_seqno++;
                    490:                        thread_unlock(receiver);
                    491:                        splx(s);
                    492: 
                    493:                        /* we didn't need our reserved spot in the queue */
                    494:                        ipc_mqueue_release_msgcount(mqueue);
                    495:                        break;
                    496:                }
                    497: 
                    498:                /*
                    499:                 * Otherwise, this thread needs to be released to run
                    500:                 * and handle its error without getting the message.  We
                    501:                 * need to go back and pick another one.
                    502:                 */
                    503:                receiver->ith_kmsg = IKM_NULL;
                    504:                receiver->ith_seqno = 0;
                    505:                thread_unlock(receiver);
                    506:                splx(s);
                    507:        }
                    508: 
                    509:        imq_unlock(mqueue);
                    510:        current_task()->messages_sent++;
                    511:        return;
                    512: }
                    513: 
                    514: 
                    515: /*
                    516:  *     Routine:        ipc_mqueue_receive
                    517:  *     Purpose:
                    518:  *             Receive a message from a message queue.
                    519:  *
                    520:  *             If continuation is non-zero, then we might discard
                    521:  *             our kernel stack when we block.  We will continue
                    522:  *             after unblocking by executing continuation.
                    523:  *
                    524:  *             If resume is true, then we are resuming a receive
                    525:  *             operation after a blocked receive discarded our stack.
                    526:  *     Conditions:
                    527:  *             Our caller must hold a reference for the port or port set
                    528:  *             to which this queue belongs, to keep the queue
                    529:  *             from being deallocated.
                    530:  *
                    531:  *             The kmsg is returned with clean header fields
                    532:  *             and with the circular bit turned off.
                    533:  *     Returns:
                    534:  *             MACH_MSG_SUCCESS        Message returned in kmsgp.
                    535:  *             MACH_RCV_TOO_LARGE      Message size returned in kmsgp.
                    536:  *             MACH_RCV_TIMED_OUT      No message obtained.
                    537:  *             MACH_RCV_INTERRUPTED    No message obtained.
                    538:  *             MACH_RCV_PORT_DIED      Port/set died; no message.
                    539:  *             MACH_RCV_PORT_CHANGED   Port moved into set; no msg.
                    540:  *
                    541:  */
                    542: 
                    543: mach_msg_return_t
                    544: ipc_mqueue_receive(
                    545:        ipc_mqueue_t            mqueue,
                    546:        mach_msg_option_t       option,
                    547:        mach_msg_size_t         max_size,
                    548:        mach_msg_timeout_t      timeout,
                    549:        int                     interruptible,
                    550:        ipc_kmsg_t                  *kmsgp,
                    551:        mach_port_seqno_t       *seqnop)
                    552: {
                    553:        ipc_port_t port;
                    554:        mach_msg_return_t mr, mr2;
                    555:        ipc_kmsg_queue_t kmsgs;
                    556:        kern_return_t   save_wait_result;
                    557:        thread_t self;
                    558: 
                    559:        imq_lock(mqueue);
                    560:        
                    561:        if (imq_is_set(mqueue)) {
                    562:                wait_queue_link_t wql;
                    563:                ipc_mqueue_t port_mq;
                    564:                queue_t q;
                    565: 
                    566:                q = &mqueue->imq_setlinks;
                    567: 
                    568:                /*
                    569:                 * If we are waiting on a portset mqueue, we need to see if
                    570:                 * any of the member ports have work for us.  If so, try to
                    571:                 * deliver one of those messages. By holding the portset's
                    572:                 * mqueue lock during the search, we tie up any attempts by
                    573:                 * mqueue_deliver or portset membership changes that may
                    574:                 * cross our path. But this is a lock order violation, so we
                    575:                 * have to do it "softly."  If we don't find a message waiting
                    576:                 * for us, we will assert our intention to wait while still
                    577:                 * holding that lock.  When we release the lock, the deliver/
                    578:                 * change will succeed and find us.
                    579:                 */
                    580:        search_set:
                    581:                queue_iterate(q, wql, wait_queue_link_t, wql_sublinks) {
                    582:                        port_mq = (ipc_mqueue_t)wql->wql_queue;
                    583:                        kmsgs = &port_mq->imq_messages;
                    584:                        
                    585:                        if (!ipc_kmsg_queue_first(kmsgs))  /* peek first */
                    586:                                continue;
                    587: 
                    588:                        if (!imq_lock_try(port_mq)) {
                    589:                                imq_unlock(mqueue);
                    590:                                imq_lock(mqueue);
                    591:                                goto search_set; /* start again at beginning - SMP */
                    592:                        }
                    593: 
                    594:                        /*
                    595:                         * If there is still a message to be had, we will
                    596:                         * try to select it (may not succeed because of size
                    597:                         * and options).  In any case, we deliver those
                    598:                         * results back to the user.
                    599:                         *
                    600:                         * We also move the port's linkage to the tail of the
                    601:                         * list for this set (fairness). Future versions will
                    602:                         * sort by timestamp or priority.
                    603:                         */
                    604:                        if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) {
                    605:                                imq_unlock(port_mq);
                    606:                                continue;
                    607:                        }
                    608:                        queue_remove(q, wql, wait_queue_link_t, wql_sublinks);
                    609:                        queue_enter(q, wql, wait_queue_link_t, wql_sublinks);
                    610:                        imq_unlock(mqueue);
                    611: 
                    612:                        mr = ipc_mqueue_select(port_mq, option, max_size, kmsgp, seqnop);
                    613:                        imq_unlock(port_mq);
                    614:                        return mr;
                    615:                        
                    616:                }
                    617: 
                    618:        } else {
                    619: 
                    620:                /*
                    621:                 * Receive on a single port. Just try to get the messages.
                    622:                 */
                    623:                kmsgs = &mqueue->imq_messages;
                    624:                if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
                    625:                        mr = ipc_mqueue_select(mqueue, option, max_size, kmsgp, seqnop);
                    626:                        imq_unlock(mqueue);
                    627:                        return mr;
                    628:                }
                    629:        }
                    630:                
                    631:        /*
                    632:         * Looks like we'll have to block.  The mqueue we will
                    633:         * block on (whether the set's or the local port's) is
                    634:         * still locked.
                    635:         */
                    636:        if (option & MACH_RCV_TIMEOUT) {
                    637:                if (timeout == 0) {
                    638:                        imq_unlock(mqueue);
                    639:                        return MACH_RCV_TIMED_OUT;
                    640:                }
                    641:        }
                    642:        self = current_thread();
                    643: 
                    644:        self->ith_state = MACH_RCV_IN_PROGRESS;
                    645:        self->ith_option = option;
                    646:        self->ith_msize = max_size;
                    647:                
                    648:        wait_queue_assert_wait_locked(&mqueue->imq_wait_queue,
                    649:                                                                  IPC_MQUEUE_RECEIVE,
                    650:                                                                  interruptible,
                    651:                                                                  TRUE); /* unlock? */
                    652:        /* mqueue/waitq is unlocked */
                    653: 
                    654:        if (option & MACH_RCV_TIMEOUT) {
                    655:                thread_set_timer(timeout, 1000*NSEC_PER_USEC);
                    656:        }
                    657: 
                    658:        if (interruptible == THREAD_ABORTSAFE) {
                    659:                counter(c_ipc_mqueue_receive_block_user++);
                    660:        } else {
                    661:                counter(c_ipc_mqueue_receive_block_kernel++);
                    662:        }
                    663: 
                    664:        save_wait_result = thread_block((void (*)(void))0);
                    665: 
                    666:        if (option & MACH_RCV_TIMEOUT)
                    667:                thread_cancel_timer();
                    668: 
                    669:        /*
                    670:         * why did we wake up?
                    671:         */
                    672:        switch (save_wait_result) {
                    673:        case THREAD_TIMED_OUT:
                    674:                return MACH_RCV_TIMED_OUT;
                    675: 
                    676:        case THREAD_INTERRUPTED:
                    677:                return MACH_RCV_INTERRUPTED;
                    678: 
                    679:        case THREAD_RESTART:
                    680:                /* something bad happened to the port/set */
                    681:                return MACH_RCV_PORT_CHANGED;
                    682: 
                    683:        case THREAD_AWAKENED:
                    684:                /*
                    685:                 * We do not need to go select a message, somebody
                    686:                 * handed us one (or a too-large indication).
                    687:                 */
                    688:                
                    689:                mr = MACH_MSG_SUCCESS;
                    690: 
                    691:                switch (self->ith_state) {
                    692:                case MACH_RCV_SCATTER_SMALL:
                    693:                case MACH_RCV_TOO_LARGE:
                    694:                        /*
                    695:                         * Somebody tried to give us a too large
                    696:                         * message. If we indicated that we cared,
                    697:                         * then they only gave us the indication,
                    698:                         * otherwise they gave us the indication
                    699:                         * AND the message anyway.
                    700:                         */
                    701:                        if (option & MACH_RCV_LARGE) {
                    702:                                if (self->ith_state == MACH_RCV_TOO_LARGE)
                    703:                                        *kmsgp = (ipc_kmsg_t)self->ith_msize;
                    704:                                return(self->ith_state);
                    705:                        } else {
                    706:                                mr = self->ith_state;
                    707:                                /* fall thru to SUCCESS case */
                    708:                        }
                    709: 
                    710:                case MACH_MSG_SUCCESS:
                    711:                        /* pick up the message that was handed to us */
                    712:                        *kmsgp = self->ith_kmsg;
                    713:                        *seqnop = self->ith_seqno;
                    714:                        return mr;
                    715: 
                    716:                default:
                    717:                        panic("ipc_mqueue_receive: strange ith_state");
                    718:                }
                    719: 
                    720:        default:
                    721:                panic("ipc_mqueue_receive: strange wait_result");
                    722:        }
                    723: }
                    724: 
                    725: 
                    726: /*
                    727:  *     Routine:        ipc_mqueue_select
                    728:  *     Purpose:
                    729:  *             A receiver discovered that there was a message on the queue
                    730:  *             before he had to block.  Pick the message off the queue and
                    731:  *             "post" it to himself.
                    732:  *     Conditions:
                    733:  *             mqueue locked.
                    734:  *             There is a message.
                    735:  *     Returns:
                    736:  *             MACH_MSG_SUCCESS        Actually selected a message for ourselves.
                    737:  *             MACH_RCV_TOO_LARGE  May or may not have pull it, but it is large
                    738:  */
                    739: kern_return_t
                    740: ipc_mqueue_select(
                    741:        ipc_mqueue_t            mqueue,
                    742:        mach_msg_option_t       option,
                    743:        mach_msg_size_t         max_size,
                    744:        ipc_kmsg_t                  *kmsgp,
                    745:        mach_port_seqno_t       *seqnop)
                    746: {              
                    747:        ipc_kmsg_t kmsg;
                    748:        mach_port_seqno_t seqno;
                    749:        mach_msg_return_t mr;
                    750: 
                    751:        mr = MACH_MSG_SUCCESS;
                    752: 
                    753: 
                    754:        /*
                    755:         * Do some sanity checking of our ability to receive
                    756:         * before pulling the message off the queue.
                    757:         */
                    758:        kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
                    759: 
                    760:        assert(kmsg != IKM_NULL);
                    761: 
                    762:        if (kmsg->ikm_header.msgh_size + 
                    763:                REQUESTED_TRAILER_SIZE(option) > max_size) {
                    764:                mr = MACH_RCV_TOO_LARGE;
                    765:        } 
                    766: #ifdef DOITFORSCATTER
                    767:        else if (self->ith_scatter_list != MACH_MSG_BODY_NULL) {
                    768:                mr = ipc_kmsg_check_scatter(
                    769:                                                                        kmsg,
                    770:                                                                        self->ith_option,
                    771:                                                                        &self->ith_scatter_list, 
                    772:                                                                        &self->ith_scatter_list_size);
                    773:        }
                    774: #endif /* DOITFORSCATTER */
                    775: 
                    776:        /*
                    777:         * If we really can't receive it, but we had the
                    778:         * MACH_RCV_LARGE option set, then don't take it off
                    779:         * the queue, instead return the appropriate error
                    780:         * (and size needed).
                    781:         */
                    782:        if (((mr == MACH_RCV_TOO_LARGE) ||
                    783:                 (mr == MACH_RCV_SCATTER_SMALL)) &&
                    784:                (option & MACH_RCV_LARGE)) {
                    785:                *seqnop = 0;
                    786:                *kmsgp = (ipc_kmsg_t)kmsg->ikm_header.msgh_size;
                    787:                return mr;
                    788:        }
                    789: 
                    790:        ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
                    791:        ipc_mqueue_release_msgcount(mqueue);
                    792:        *seqnop = mqueue->imq_seqno++;
                    793:        *kmsgp = kmsg;
                    794: 
                    795:        current_task()->messages_received++;
                    796:        return mr;
                    797: }
                    798: 
                    799: /*
                    800:  *     Routine:        ipc_mqueue_destroy
                    801:  *     Purpose:
                    802:  *             Destroy a message queue.  Set any blocked senders running.
                    803:  *             Destroy the kmsgs in the queue.
                    804:  *     Conditions:
                    805:  *             Nothing locked.
                    806:  *             Receivers were removed when the receive right was "changed"
                    807:  */
                    808: void
                    809: ipc_mqueue_destroy(
                    810:        ipc_mqueue_t    mqueue) 
                    811: {
                    812:        ipc_kmsg_queue_t kmqueue;
                    813:        ipc_kmsg_t kmsg;
                    814: 
                    815:        imq_lock(mqueue);
                    816:        /*
                    817:         *      rouse all blocked senders
                    818:         */
                    819:        mqueue->imq_fullwaiters = FALSE;
                    820:        wait_queue_wakeup_all_locked(&mqueue->imq_wait_queue,
                    821:                                                                 IPC_MQUEUE_FULL,
                    822:                                                                 THREAD_AWAKENED,
                    823:                                                                 FALSE);
                    824: 
                    825:        kmqueue = &mqueue->imq_messages;
                    826: 
                    827:        while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) {
                    828:                imq_unlock(mqueue);
                    829:                ipc_kmsg_destroy_dest(kmsg);
                    830:                imq_lock(mqueue);
                    831:        }
                    832:        imq_unlock(mqueue);
                    833: }
                    834: 
                    835: /*
                    836:  *     Routine:        ipc_mqueue_set_qlimit
                    837:  *     Purpose:
                    838:  *             Changes a message queue limit; the maximum number
                    839:  *             of messages which may be queued.
                    840:  *     Conditions:
                    841:  *             Nothing locked.
                    842:  */
                    843: 
                    844: void
                    845: ipc_mqueue_set_qlimit(
                    846:         ipc_mqueue_t                   mqueue,
                    847:         mach_port_msgcount_t   qlimit)
                    848: {
                    849:         /* wake up senders allowed by the new qlimit */
                    850:         imq_lock(mqueue);
                    851:         if (qlimit > mqueue->imq_qlimit) {
                    852:                 mach_port_msgcount_t i, wakeup;
                    853: 
                    854:                 /* caution: wakeup, qlimit are unsigned */
                    855:                 wakeup = qlimit - mqueue->imq_qlimit;
                    856: 
                    857:                 for (i = 0; i < wakeup; i++) {
                    858:                         if (wait_queue_wakeup_one_locked(&mqueue->imq_wait_queue,
                    859:                                                                                          IPC_MQUEUE_FULL,
                    860:                                                                                          THREAD_AWAKENED,
                    861:                                                                                          FALSE) == KERN_NOT_WAITING) {
                    862:                                         mqueue->imq_fullwaiters = FALSE;
                    863:                                         break;
                    864:                         }
                    865:                 }
                    866:         }
                    867:        mqueue->imq_qlimit = qlimit;
                    868:        imq_unlock(mqueue);
                    869: }
                    870: 
                    871: /*
                    872:  *     Routine:        ipc_mqueue_set_seqno
                    873:  *     Purpose:
                    874:  *             Changes an mqueue's sequence number.
                    875:  *     Conditions:
                    876:  *             Caller holds a reference to the queue's containing object.
                    877:  */
                    878: void
                    879: ipc_mqueue_set_seqno(
                    880:        ipc_mqueue_t            mqueue,
                    881:        mach_port_seqno_t       seqno)
                    882: {
                    883:        imq_lock(mqueue);
                    884:        mqueue->imq_seqno = seqno;
                    885:        imq_unlock(mqueue);
                    886: }
                    887: 
                    888: 
                    889: /*
                    890:  *     Routine:        ipc_mqueue_copyin
                    891:  *     Purpose:
                    892:  *             Convert a name in a space to a message queue.
                    893:  *     Conditions:
                    894:  *             Nothing locked.  If successful, the caller gets a ref for
                    895:  *             for the object. This ref ensures the continued existence of
                    896:  *             the queue.
                    897:  *     Returns:
                    898:  *             MACH_MSG_SUCCESS        Found a message queue.
                    899:  *             MACH_RCV_INVALID_NAME   The space is dead.
                    900:  *             MACH_RCV_INVALID_NAME   The name doesn't denote a right.
                    901:  *             MACH_RCV_INVALID_NAME
                    902:  *                     The denoted right is not receive or port set.
                    903:  *             MACH_RCV_IN_SET         Receive right is a member of a set.
                    904:  */
                    905: 
                    906: mach_msg_return_t
                    907: ipc_mqueue_copyin(
                    908:        ipc_space_t             space,
                    909:        mach_port_name_t        name,
                    910:        ipc_mqueue_t            *mqueuep,
                    911:        ipc_object_t            *objectp)
                    912: {
                    913:        ipc_entry_t entry;
                    914:        ipc_object_t object;
                    915:        ipc_mqueue_t mqueue;
                    916: 
                    917:        is_read_lock(space);
                    918:        if (!space->is_active) {
                    919:                is_read_unlock(space);
                    920:                return MACH_RCV_INVALID_NAME;
                    921:        }
                    922: 
                    923:        entry = ipc_entry_lookup(space, name);
                    924:        if (entry == IE_NULL) {
                    925:                is_read_unlock(space);
                    926:                return MACH_RCV_INVALID_NAME;
                    927:        }
                    928: 
                    929:        object = entry->ie_object;
                    930: 
                    931:        if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
                    932:                ipc_port_t port;
                    933:                ipc_pset_t pset;
                    934: 
                    935:                port = (ipc_port_t) object;
                    936:                assert(port != IP_NULL);
                    937: 
                    938:                ip_lock(port);
                    939:                assert(ip_active(port));
                    940:                assert(port->ip_receiver_name == name);
                    941:                assert(port->ip_receiver == space);
                    942:                is_read_unlock(space);
                    943:                mqueue = &port->ip_messages;
                    944: 
                    945:        } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
                    946:                ipc_pset_t pset;
                    947: 
                    948:                pset = (ipc_pset_t) object;
                    949:                assert(pset != IPS_NULL);
                    950: 
                    951:                ips_lock(pset);
                    952:                assert(ips_active(pset));
                    953:                assert(pset->ips_local_name == name);
                    954:                is_read_unlock(space);
                    955: 
                    956:                mqueue = &pset->ips_messages;
                    957:        } else {
                    958:                is_read_unlock(space);
                    959:                return MACH_RCV_INVALID_NAME;
                    960:        }
                    961: 
                    962:        /*
                    963:         *      At this point, the object is locked and active,
                    964:         *      the space is unlocked, and mqueue is initialized.
                    965:         */
                    966: 
                    967:        io_reference(object);
                    968:        io_unlock(object);
                    969: 
                    970:        *objectp = object;
                    971:        *mqueuep = mqueue;
                    972:        return MACH_MSG_SUCCESS;
                    973: }
                    974: 

unix.superglobalmegacorp.com

This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.