Annotation of XNU/osfmk/ipc/ipc_mqueue.c, revision 1.1

1.1     ! root        1: /*
        !             2:  * Copyright (c) 2000 Apple Computer, Inc. All rights reserved.
        !             3:  *
        !             4:  * @APPLE_LICENSE_HEADER_START@
        !             5:  * 
        !             6:  * The contents of this file constitute Original Code as defined in and
        !             7:  * are subject to the Apple Public Source License Version 1.1 (the
        !             8:  * "License").  You may not use this file except in compliance with the
        !             9:  * License.  Please obtain a copy of the License at
        !            10:  * http://www.apple.com/publicsource and read it before using this file.
        !            11:  * 
        !            12:  * This Original Code and all software distributed under the License are
        !            13:  * distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, EITHER
        !            14:  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
        !            15:  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
        !            16:  * FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT.  Please see the
        !            17:  * License for the specific language governing rights and limitations
        !            18:  * under the License.
        !            19:  * 
        !            20:  * @APPLE_LICENSE_HEADER_END@
        !            21:  */
        !            22: /*
        !            23:  * @OSF_FREE_COPYRIGHT@
        !            24:  */
        !            25: /* 
        !            26:  * Mach Operating System
        !            27:  * Copyright (c) 1991,1990,1989 Carnegie Mellon University
        !            28:  * All Rights Reserved.
        !            29:  * 
        !            30:  * Permission to use, copy, modify and distribute this software and its
        !            31:  * documentation is hereby granted, provided that both the copyright
        !            32:  * notice and this permission notice appear in all copies of the
        !            33:  * software, derivative works or modified versions, and any portions
        !            34:  * thereof, and that both notices appear in supporting documentation.
        !            35:  * 
        !            36:  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
        !            37:  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
        !            38:  * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
        !            39:  * 
        !            40:  * Carnegie Mellon requests users of this software to return to
        !            41:  * 
        !            42:  *  Software Distribution Coordinator  or  [email protected]
        !            43:  *  School of Computer Science
        !            44:  *  Carnegie Mellon University
        !            45:  *  Pittsburgh PA 15213-3890
        !            46:  * 
        !            47:  * any improvements or extensions that they make and grant Carnegie Mellon
        !            48:  * the rights to redistribute these changes.
        !            49:  */
        !            50: /*
        !            51:  */
        !            52: /*
        !            53:  *     File:   ipc/ipc_mqueue.c
        !            54:  *     Author: Rich Draves
        !            55:  *     Date:   1989
        !            56:  *
        !            57:  *     Functions to manipulate IPC message queues.
        !            58:  */
        !            59: 
        !            60: #include <dipc.h>
        !            61: 
        !            62: #include <mach/port.h>
        !            63: #include <mach/message.h>
        !            64: #include <mach/sync_policy.h>
        !            65: 
        !            66: #include <kern/assert.h>
        !            67: #include <kern/counters.h>
        !            68: #include <kern/sched_prim.h>
        !            69: #include <kern/ipc_kobject.h>
        !            70: #include <kern/misc_protos.h>
        !            71: #include <kern/task.h>
        !            72: #include <kern/thread.h>
        !            73: #include <kern/wait_queue.h>
        !            74: 
        !            75: #include <ipc/ipc_mqueue.h>
        !            76: #include <ipc/ipc_kmsg.h>
        !            77: #include <ipc/ipc_port.h>
        !            78: #include <ipc/ipc_pset.h>
        !            79: #include <ipc/ipc_space.h>
        !            80: 
        !            81: #include <ddb/tr.h>
        !            82: 
        !            83: int ipc_mqueue_full;           /* address is event for queue space */
        !            84: int ipc_mqueue_rcv;            /* address is event for message arrival */
        !            85: 
        !            86: #define TR_ENABLE 0
        !            87: 
        !            88: /*
        !            89:  *     Routine:        ipc_mqueue_init
        !            90:  *     Purpose:
        !            91:  *             Initialize a newly-allocated message queue.
        !            92:  */
        !            93: void
        !            94: ipc_mqueue_init(
        !            95:        ipc_mqueue_t    mqueue,
        !            96:        boolean_t       is_set)
        !            97: {
        !            98:        if (is_set) {
        !            99:                wait_queue_sub_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO);
        !           100:        } else {
        !           101:                wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
        !           102:                ipc_kmsg_queue_init(&mqueue->imq_messages);
        !           103:                mqueue->imq_seqno = 0;
        !           104:                mqueue->imq_msgcount = 0;
        !           105:                mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
        !           106:                mqueue->imq_fullwaiters = FALSE;
        !           107:        }
        !           108: }
        !           109: 
        !           110: /*
        !           111:  *     Routine:        ipc_mqueue_member
        !           112:  *     Purpose:
        !           113:  *             Indicate whether the (port) mqueue is a member of
        !           114:  *             this portset's mqueue.  We do this by checking
        !           115:  *             whether the portset mqueue's waitq is an member of
        !           116:  *             the port's mqueue waitq.
        !           117:  *     Conditions:
        !           118:  *             the portset's mqueue is not already a member
        !           119:  *             this may block while allocating linkage structures.
        !           120:  */
        !           121: 
        !           122: boolean_t
        !           123: ipc_mqueue_member(
        !           124:        ipc_mqueue_t    port_mqueue,
        !           125:        ipc_mqueue_t    set_mqueue)
        !           126: {
        !           127:        wait_queue_t    port_waitq = &port_mqueue->imq_wait_queue;
        !           128:        wait_queue_t    set_waitq = &set_mqueue->imq_wait_queue;
        !           129: 
        !           130:        return (wait_queue_member(port_waitq, set_waitq));
        !           131: 
        !           132: }
        !           133: 
        !           134: /*
        !           135:  *     Routine:        ipc_mqueue_remove
        !           136:  *     Purpose:
        !           137:  *             Remove the association between the queue and the specified
        !           138:  *             subordinate message queue.
        !           139:  */
        !           140: 
        !           141: kern_return_t
        !           142: ipc_mqueue_remove(
        !           143:        ipc_mqueue_t     mqueue,
        !           144:        ipc_mqueue_t     sub_mqueue)
        !           145: {
        !           146:        wait_queue_t     mq_waitq = &mqueue->imq_wait_queue;
        !           147:        wait_queue_sub_t sub_waitq = &sub_mqueue->imq_set_queue;
        !           148: 
        !           149:        if (wait_queue_member(mq_waitq, sub_waitq)) {
        !           150:            wait_queue_unlink(mq_waitq, sub_waitq);
        !           151:            return KERN_SUCCESS;
        !           152:        }
        !           153:        return KERN_NOT_IN_SET;    
        !           154: }
        !           155: 
        !           156: /*
        !           157:  *     Routine:        ipc_mqueue_remove_one
        !           158:  *     Purpose:
        !           159:  *             Find and remove one subqueue from the queue.
        !           160:  *     Conditions:
        !           161:  *             Will return the set mqueue that was removed
        !           162:  */
        !           163: void
        !           164: ipc_mqueue_remove_one(
        !           165:        ipc_mqueue_t    mqueue,
        !           166:        ipc_mqueue_t    *sub_queuep)
        !           167: {
        !           168:        wait_queue_t    mq_waitq = &mqueue->imq_wait_queue;
        !           169: 
        !           170:        wait_queue_unlink_one(mq_waitq, (wait_queue_sub_t *)sub_queuep);
        !           171:        return;
        !           172: }
        !           173: 
        !           174: 
        !           175: /*
        !           176:  *     Routine:        ipc_mqueue_add
        !           177:  *     Purpose:
        !           178:  *             Associate the portset's mqueue with the port's mqueue.
        !           179:  *             This has to be done so that posting the port will wakeup
        !           180:  *             a portset waiter.  If there are waiters on the portset
        !           181:  *             mqueue and messages on the port mqueue, try to match them
        !           182:  *             up now.
        !           183:  *     Conditions:
        !           184:  *             May block.
        !           185:  */
        !           186: kern_return_t
        !           187: ipc_mqueue_add(
        !           188:        ipc_mqueue_t     port_mqueue,
        !           189:        ipc_mqueue_t     set_mqueue)
        !           190: {
        !           191:        wait_queue_t     port_waitq = &port_mqueue->imq_wait_queue;
        !           192:        wait_queue_sub_t set_waitq = &set_mqueue->imq_set_queue;
        !           193:        ipc_kmsg_queue_t kmsgq;
        !           194:        ipc_kmsg_t       kmsg, next;
        !           195:        kern_return_t    kr;
        !           196: 
        !           197:        kr = wait_queue_link(port_waitq, set_waitq);
        !           198:        if (kr != KERN_SUCCESS)
        !           199:                return kr;
        !           200: 
        !           201:        /*
        !           202:         * Now that the set has been added to the port, there may be
        !           203:         * messages queued on the port and threads waiting on the set
        !           204:         * waitq.  Lets get them together.
        !           205:         */
        !           206:        imq_lock(port_mqueue);
        !           207:        kmsgq = &port_mqueue->imq_messages;
        !           208:        for (kmsg = ipc_kmsg_queue_first(kmsgq);
        !           209:             kmsg != IKM_NULL;
        !           210:             kmsg = next) {
        !           211:                next = ipc_kmsg_queue_next(kmsgq, kmsg);
        !           212: 
        !           213:                for (;;) {
        !           214:                        thread_t th;
        !           215:                        spl_t s;
        !           216: 
        !           217:                        s = splsched();
        !           218:                        th = wait_queue_wakeup_identity_locked(port_waitq,
        !           219:                                                                                                   IPC_MQUEUE_RECEIVE,
        !           220:                                                                                                   THREAD_AWAKENED,
        !           221:                                                                                                   FALSE);
        !           222:                        /* waitq/mqueue still locked, thread locked */
        !           223: 
        !           224:                        if (th == THREAD_NULL) {
        !           225:                                splx(s);
        !           226:                                goto leave;
        !           227:                        }
        !           228: 
        !           229:                        /*
        !           230:                         * Found a receiver. see if they can handle the message
        !           231:                         * correctly (the message is not too large for them, or
        !           232:                         * they didn't care to be informed that the message was
        !           233:                         * too large).  If they can't handle it, take them off
        !           234:                         * the list and let them go back and figure it out and
        !           235:                         * just move onto the next.
        !           236:                         */
        !           237:                        if (th->ith_msize <
        !           238:                            kmsg->ikm_header.msgh_size +
        !           239:                            REQUESTED_TRAILER_SIZE(th->ith_option)) {
        !           240:                                th->ith_state = MACH_RCV_TOO_LARGE;
        !           241:                                th->ith_msize = kmsg->ikm_header.msgh_size;
        !           242:                                if (th->ith_option & MACH_RCV_LARGE) {
        !           243:                                        /*
        !           244:                                         * let him go without message
        !           245:                                         */
        !           246:                                        th->ith_kmsg = IKM_NULL;
        !           247:                                        th->ith_seqno = 0;
        !           248:                                        thread_unlock(th);
        !           249:                                        splx(s);
        !           250:                                        continue; /* find another thread */
        !           251:                                }
        !           252:                        } else {
        !           253:                                th->ith_state = MACH_MSG_SUCCESS;
        !           254:                        }
        !           255: 
        !           256:                        /*
        !           257:                         * This thread is going to take this message,
        !           258:                         * so give it to him.
        !           259:                         */
        !           260:                        ipc_mqueue_release_msgcount(port_mqueue);
        !           261:                        ipc_kmsg_rmqueue(kmsgq, kmsg);
        !           262:                        th->ith_kmsg = kmsg;
        !           263:                        th->ith_seqno = port_mqueue->imq_seqno++;
        !           264:                        thread_unlock(th);
        !           265:                        splx(s);
        !           266:                        break;  /* go to next message */
        !           267:                }
        !           268:                        
        !           269:        }
        !           270:  leave:
        !           271:        imq_unlock(port_mqueue);
        !           272:        return KERN_SUCCESS;
        !           273: }
        !           274: 
        !           275: /*
        !           276:  *     Routine:        ipc_mqueue_changed
        !           277:  *     Purpose:
        !           278:  *             Wake up receivers waiting in a message queue.
        !           279:  *     Conditions:
        !           280:  *             The message queue is locked.
        !           281:  */
        !           282: 
        !           283: void
        !           284: ipc_mqueue_changed(
        !           285:        ipc_mqueue_t            mqueue)
        !           286: {
        !           287:        wait_queue_wakeup_all_locked(&mqueue->imq_wait_queue,
        !           288:                                                                 IPC_MQUEUE_RECEIVE,
        !           289:                                                                 THREAD_RESTART,
        !           290:                                                                 FALSE);                /* unlock waitq? */
        !           291: }
        !           292: 
        !           293: 
        !           294:                
        !           295: 
        !           296: /*
        !           297:  *     Routine:        ipc_mqueue_send
        !           298:  *     Purpose:
        !           299:  *             Send a message to a message queue.  The message holds a reference
        !           300:  *             for the destination port for this message queue in the 
        !           301:  *             msgh_remote_port field.
        !           302:  *
        !           303:  *             If unsuccessful, the caller still has possession of
        !           304:  *             the message and must do something with it.  If successful,
        !           305:  *             the message is queued, given to a receiver, or destroyed.
        !           306:  *     Conditions:
        !           307:  *             Nothing locked.
        !           308:  *     Returns:
        !           309:  *             MACH_MSG_SUCCESS        The message was accepted.
        !           310:  *             MACH_SEND_TIMED_OUT     Caller still has message.
        !           311:  *             MACH_SEND_INTERRUPTED   Caller still has message.
        !           312:  */
        !           313: mach_msg_return_t
        !           314: ipc_mqueue_send(
        !           315:        ipc_mqueue_t            mqueue,
        !           316:        ipc_kmsg_t                      kmsg,
        !           317:        mach_msg_option_t       option,
        !           318:        mach_msg_timeout_t      timeout)
        !           319: {
        !           320:    int save_wait_result;
        !           321: 
        !           322:        /*
        !           323:         *  Don't block if:
        !           324:         *      1) We're under the queue limit.
        !           325:         *      2) Caller used the MACH_SEND_ALWAYS internal option.
        !           326:         *      3) Message is sent to a send-once right.
        !           327:         */
        !           328:        imq_lock(mqueue);
        !           329: 
        !           330:        if (!imq_full(mqueue) ||
        !           331:                (option & MACH_SEND_ALWAYS) ||
        !           332:                (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header.msgh_bits) ==
        !           333:                 MACH_MSG_TYPE_PORT_SEND_ONCE)) {
        !           334:                mqueue->imq_msgcount++;
        !           335:                imq_unlock(mqueue);
        !           336: 
        !           337:        } else {
        !           338: 
        !           339:                /* 
        !           340:                 * We have to wait for space to be granted to us.
        !           341:                 */
        !           342:                if ((option & MACH_SEND_TIMEOUT) && (timeout == 0)) {
        !           343:                        imq_unlock(mqueue);
        !           344:                        return MACH_SEND_TIMED_OUT;
        !           345:                }
        !           346:                mqueue->imq_fullwaiters = TRUE;
        !           347:                wait_queue_assert_wait_locked(&mqueue->imq_wait_queue,
        !           348:                                                                          IPC_MQUEUE_FULL,
        !           349:                                                                          THREAD_ABORTSAFE,
        !           350:                                                                          TRUE); /* unlock? */
        !           351:                /* wait/mqueue is unlocked */
        !           352:                
        !           353:                if (option & MACH_SEND_TIMEOUT)
        !           354:                        thread_set_timer(timeout, 1000*NSEC_PER_USEC);
        !           355: 
        !           356:                counter(c_ipc_mqueue_send_block++);
        !           357:                save_wait_result = thread_block((void (*)(void)) 0);
        !           358:                
        !           359:                if (option & MACH_SEND_TIMEOUT)
        !           360:                        thread_cancel_timer();
        !           361: 
        !           362: 
        !           363:                switch (save_wait_result) {
        !           364:                case THREAD_TIMED_OUT:
        !           365:                        assert(option & MACH_SEND_TIMEOUT);
        !           366:                        return MACH_SEND_TIMED_OUT;
        !           367:                        
        !           368:                case THREAD_AWAKENED:
        !           369:                        break;          /* we can proceed - inherited msgcount from waker */
        !           370:                        
        !           371:                case THREAD_INTERRUPTED:
        !           372:                        return MACH_SEND_INTERRUPTED;
        !           373:                        
        !           374:                case THREAD_RESTART:
        !           375:                default:
        !           376:                        panic("ipc_mqueue_send");
        !           377:                }
        !           378:        }
        !           379: 
        !           380:        ipc_mqueue_post(mqueue, kmsg);
        !           381:        return MACH_MSG_SUCCESS;
        !           382: }
        !           383: 
        !           384: /*
        !           385:  *     Routine:        ipc_mqueue_release_msgcount
        !           386:  *     Purpose:
        !           387:  *             Release a message queue reference in the case where we
        !           388:  *             found a waiter.
        !           389:  *
        !           390:  *     Conditions:
        !           391:  *             The message queue is locked
        !           392:  */
        !           393: void
        !           394: ipc_mqueue_release_msgcount(
        !           395:        ipc_mqueue_t mqueue)    
        !           396: {
        !           397:        assert(imq_held(mqueue));
        !           398:        assert(mqueue->imq_msgcount > 0);
        !           399: 
        !           400:        mqueue->imq_msgcount--;
        !           401:        if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
        !           402:                        if (wait_queue_wakeup_one_locked(&mqueue->imq_wait_queue,
        !           403:                                                                                         IPC_MQUEUE_FULL,
        !           404:                                                                                         THREAD_AWAKENED,
        !           405:                                                                                         FALSE) != KERN_SUCCESS) {
        !           406:                                        mqueue->imq_fullwaiters = FALSE;
        !           407:                        } else {
        !           408:                                        mqueue->imq_msgcount++;  /* gave it away */
        !           409:                        }
        !           410:        }
        !           411: }
        !           412: 
        !           413: /*
        !           414:  *     Routine:        ipc_mqueue_post
        !           415:  *     Purpose:
        !           416:  *             Post a message to a waiting receiver or enqueue it.  If a
        !           417:  *             receiver is waiting, we can release our reserved space in
        !           418:  *             the message queue.
        !           419:  *
        !           420:  *     Conditions:
        !           421:  *             If we need to queue, our space in the message queue is reserved.
        !           422:  */
        !           423: void
        !           424: ipc_mqueue_post(
        !           425:        register ipc_mqueue_t   mqueue,
        !           426:        register ipc_kmsg_t             kmsg)
        !           427: {
        !           428: 
        !           429:        /*
        !           430:         *      While the msg queue     is locked, we have control of the
        !           431:         *  kmsg, so the ref in it for the port is still good.
        !           432:         *
        !           433:         *      Check for a receiver for the message.
        !           434:         */
        !           435:        imq_lock(mqueue);
        !           436:        for (;;) {
        !           437:                wait_queue_t waitq = &mqueue->imq_wait_queue;
        !           438:                thread_t receiver;
        !           439:                spl_t s;
        !           440: 
        !           441:                s = splsched();
        !           442:                receiver = wait_queue_wakeup_identity_locked(waitq,
        !           443:                                                                                                         IPC_MQUEUE_RECEIVE,
        !           444:                                                                                                         THREAD_AWAKENED,
        !           445:                                                                                                         FALSE);
        !           446:                /* waitq still locked, thread locked */
        !           447: 
        !           448:                if (receiver == THREAD_NULL) {
        !           449:                        /* 
        !           450:                         * no receivers; queue kmsg
        !           451:                         */
        !           452:                        assert(mqueue->imq_msgcount > 0);
        !           453:                        ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
        !           454:                        splx(s);
        !           455:                        break;
        !           456:                }
        !           457:                
        !           458:                /*
        !           459:                 * We found a waiting thread.
        !           460:                 * If the message is too large or the scatter list is too small
        !           461:                 * the thread we wake up will get that as its status.
        !           462:                 */
        !           463:                if (receiver->ith_msize <
        !           464:                    (kmsg->ikm_header.msgh_size) +
        !           465:                    REQUESTED_TRAILER_SIZE(receiver->ith_option)) {
        !           466:                        receiver->ith_msize = kmsg->ikm_header.msgh_size;
        !           467:                        receiver->ith_state = MACH_RCV_TOO_LARGE;
        !           468:                } else {
        !           469:                        receiver->ith_state =
        !           470: #ifdef DOITFORSCATTER
        !           471:                    (receiver->ith_scatter_list != MACH_MSG_BODY_NULL) ?
        !           472:                    ipc_kmsg_check_scatter(kmsg,
        !           473:                                           receiver->ith_option,
        !           474:                                           &receiver->ith_scatter_list, 
        !           475:                                           &receiver->ith_scatter_list_size) :
        !           476: #endif /* DOITFORSCATTER */
        !           477:                    MACH_MSG_SUCCESS;
        !           478:                }
        !           479: 
        !           480:                /*
        !           481:                 * If there is no problem with the upcoming receive, or the
        !           482:                 * receiver thread didn't specifically ask for special too
        !           483:                 * large error condition, go ahead and select it anyway.
        !           484:                 */
        !           485:                if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
        !           486:                    !(receiver->ith_option & MACH_RCV_LARGE)) {
        !           487: 
        !           488:                        receiver->ith_kmsg = kmsg;
        !           489:                        receiver->ith_seqno = mqueue->imq_seqno++;
        !           490:                        thread_unlock(receiver);
        !           491:                        splx(s);
        !           492: 
        !           493:                        /* we didn't need our reserved spot in the queue */
        !           494:                        ipc_mqueue_release_msgcount(mqueue);
        !           495:                        break;
        !           496:                }
        !           497: 
        !           498:                /*
        !           499:                 * Otherwise, this thread needs to be released to run
        !           500:                 * and handle its error without getting the message.  We
        !           501:                 * need to go back and pick another one.
        !           502:                 */
        !           503:                receiver->ith_kmsg = IKM_NULL;
        !           504:                receiver->ith_seqno = 0;
        !           505:                thread_unlock(receiver);
        !           506:                splx(s);
        !           507:        }
        !           508: 
        !           509:        imq_unlock(mqueue);
        !           510:        current_task()->messages_sent++;
        !           511:        return;
        !           512: }
        !           513: 
        !           514: 
        !           515: /*
        !           516:  *     Routine:        ipc_mqueue_receive
        !           517:  *     Purpose:
        !           518:  *             Receive a message from a message queue.
        !           519:  *
        !           520:  *             If continuation is non-zero, then we might discard
        !           521:  *             our kernel stack when we block.  We will continue
        !           522:  *             after unblocking by executing continuation.
        !           523:  *
        !           524:  *             If resume is true, then we are resuming a receive
        !           525:  *             operation after a blocked receive discarded our stack.
        !           526:  *     Conditions:
        !           527:  *             Our caller must hold a reference for the port or port set
        !           528:  *             to which this queue belongs, to keep the queue
        !           529:  *             from being deallocated.
        !           530:  *
        !           531:  *             The kmsg is returned with clean header fields
        !           532:  *             and with the circular bit turned off.
        !           533:  *     Returns:
        !           534:  *             MACH_MSG_SUCCESS        Message returned in kmsgp.
        !           535:  *             MACH_RCV_TOO_LARGE      Message size returned in kmsgp.
        !           536:  *             MACH_RCV_TIMED_OUT      No message obtained.
        !           537:  *             MACH_RCV_INTERRUPTED    No message obtained.
        !           538:  *             MACH_RCV_PORT_DIED      Port/set died; no message.
        !           539:  *             MACH_RCV_PORT_CHANGED   Port moved into set; no msg.
        !           540:  *
        !           541:  */
        !           542: 
        !           543: mach_msg_return_t
        !           544: ipc_mqueue_receive(
        !           545:        ipc_mqueue_t            mqueue,
        !           546:        mach_msg_option_t       option,
        !           547:        mach_msg_size_t         max_size,
        !           548:        mach_msg_timeout_t      timeout,
        !           549:        int                     interruptible,
        !           550:        ipc_kmsg_t                  *kmsgp,
        !           551:        mach_port_seqno_t       *seqnop)
        !           552: {
        !           553:        ipc_port_t port;
        !           554:        mach_msg_return_t mr, mr2;
        !           555:        ipc_kmsg_queue_t kmsgs;
        !           556:        kern_return_t   save_wait_result;
        !           557:        thread_t self;
        !           558: 
        !           559:        imq_lock(mqueue);
        !           560:        
        !           561:        if (imq_is_set(mqueue)) {
        !           562:                wait_queue_link_t wql;
        !           563:                ipc_mqueue_t port_mq;
        !           564:                queue_t q;
        !           565: 
        !           566:                q = &mqueue->imq_setlinks;
        !           567: 
        !           568:                /*
        !           569:                 * If we are waiting on a portset mqueue, we need to see if
        !           570:                 * any of the member ports have work for us.  If so, try to
        !           571:                 * deliver one of those messages. By holding the portset's
        !           572:                 * mqueue lock during the search, we tie up any attempts by
        !           573:                 * mqueue_deliver or portset membership changes that may
        !           574:                 * cross our path. But this is a lock order violation, so we
        !           575:                 * have to do it "softly."  If we don't find a message waiting
        !           576:                 * for us, we will assert our intention to wait while still
        !           577:                 * holding that lock.  When we release the lock, the deliver/
        !           578:                 * change will succeed and find us.
        !           579:                 */
        !           580:        search_set:
        !           581:                queue_iterate(q, wql, wait_queue_link_t, wql_sublinks) {
        !           582:                        port_mq = (ipc_mqueue_t)wql->wql_queue;
        !           583:                        kmsgs = &port_mq->imq_messages;
        !           584:                        
        !           585:                        if (!ipc_kmsg_queue_first(kmsgs))  /* peek first */
        !           586:                                continue;
        !           587: 
        !           588:                        if (!imq_lock_try(port_mq)) {
        !           589:                                imq_unlock(mqueue);
        !           590:                                imq_lock(mqueue);
        !           591:                                goto search_set; /* start again at beginning - SMP */
        !           592:                        }
        !           593: 
        !           594:                        /*
        !           595:                         * If there is still a message to be had, we will
        !           596:                         * try to select it (may not succeed because of size
        !           597:                         * and options).  In any case, we deliver those
        !           598:                         * results back to the user.
        !           599:                         *
        !           600:                         * We also move the port's linkage to the tail of the
        !           601:                         * list for this set (fairness). Future versions will
        !           602:                         * sort by timestamp or priority.
        !           603:                         */
        !           604:                        if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) {
        !           605:                                imq_unlock(port_mq);
        !           606:                                continue;
        !           607:                        }
        !           608:                        queue_remove(q, wql, wait_queue_link_t, wql_sublinks);
        !           609:                        queue_enter(q, wql, wait_queue_link_t, wql_sublinks);
        !           610:                        imq_unlock(mqueue);
        !           611: 
        !           612:                        mr = ipc_mqueue_select(port_mq, option, max_size, kmsgp, seqnop);
        !           613:                        imq_unlock(port_mq);
        !           614:                        return mr;
        !           615:                        
        !           616:                }
        !           617: 
        !           618:        } else {
        !           619: 
        !           620:                /*
        !           621:                 * Receive on a single port. Just try to get the messages.
        !           622:                 */
        !           623:                kmsgs = &mqueue->imq_messages;
        !           624:                if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
        !           625:                        mr = ipc_mqueue_select(mqueue, option, max_size, kmsgp, seqnop);
        !           626:                        imq_unlock(mqueue);
        !           627:                        return mr;
        !           628:                }
        !           629:        }
        !           630:                
        !           631:        /*
        !           632:         * Looks like we'll have to block.  The mqueue we will
        !           633:         * block on (whether the set's or the local port's) is
        !           634:         * still locked.
        !           635:         */
        !           636:        if (option & MACH_RCV_TIMEOUT) {
        !           637:                if (timeout == 0) {
        !           638:                        imq_unlock(mqueue);
        !           639:                        return MACH_RCV_TIMED_OUT;
        !           640:                }
        !           641:        }
        !           642:        self = current_thread();
        !           643: 
        !           644:        self->ith_state = MACH_RCV_IN_PROGRESS;
        !           645:        self->ith_option = option;
        !           646:        self->ith_msize = max_size;
        !           647:                
        !           648:        wait_queue_assert_wait_locked(&mqueue->imq_wait_queue,
        !           649:                                                                  IPC_MQUEUE_RECEIVE,
        !           650:                                                                  interruptible,
        !           651:                                                                  TRUE); /* unlock? */
        !           652:        /* mqueue/waitq is unlocked */
        !           653: 
        !           654:        if (option & MACH_RCV_TIMEOUT) {
        !           655:                thread_set_timer(timeout, 1000*NSEC_PER_USEC);
        !           656:        }
        !           657: 
        !           658:        if (interruptible == THREAD_ABORTSAFE) {
        !           659:                counter(c_ipc_mqueue_receive_block_user++);
        !           660:        } else {
        !           661:                counter(c_ipc_mqueue_receive_block_kernel++);
        !           662:        }
        !           663: 
        !           664:        save_wait_result = thread_block((void (*)(void))0);
        !           665: 
        !           666:        if (option & MACH_RCV_TIMEOUT)
        !           667:                thread_cancel_timer();
        !           668: 
        !           669:        /*
        !           670:         * why did we wake up?
        !           671:         */
        !           672:        switch (save_wait_result) {
        !           673:        case THREAD_TIMED_OUT:
        !           674:                return MACH_RCV_TIMED_OUT;
        !           675: 
        !           676:        case THREAD_INTERRUPTED:
        !           677:                return MACH_RCV_INTERRUPTED;
        !           678: 
        !           679:        case THREAD_RESTART:
        !           680:                /* something bad happened to the port/set */
        !           681:                return MACH_RCV_PORT_CHANGED;
        !           682: 
        !           683:        case THREAD_AWAKENED:
        !           684:                /*
        !           685:                 * We do not need to go select a message, somebody
        !           686:                 * handed us one (or a too-large indication).
        !           687:                 */
        !           688:                
        !           689:                mr = MACH_MSG_SUCCESS;
        !           690: 
        !           691:                switch (self->ith_state) {
        !           692:                case MACH_RCV_SCATTER_SMALL:
        !           693:                case MACH_RCV_TOO_LARGE:
        !           694:                        /*
        !           695:                         * Somebody tried to give us a too large
        !           696:                         * message. If we indicated that we cared,
        !           697:                         * then they only gave us the indication,
        !           698:                         * otherwise they gave us the indication
        !           699:                         * AND the message anyway.
        !           700:                         */
        !           701:                        if (option & MACH_RCV_LARGE) {
        !           702:                                if (self->ith_state == MACH_RCV_TOO_LARGE)
        !           703:                                        *kmsgp = (ipc_kmsg_t)self->ith_msize;
        !           704:                                return(self->ith_state);
        !           705:                        } else {
        !           706:                                mr = self->ith_state;
        !           707:                                /* fall thru to SUCCESS case */
        !           708:                        }
        !           709: 
        !           710:                case MACH_MSG_SUCCESS:
        !           711:                        /* pick up the message that was handed to us */
        !           712:                        *kmsgp = self->ith_kmsg;
        !           713:                        *seqnop = self->ith_seqno;
        !           714:                        return mr;
        !           715: 
        !           716:                default:
        !           717:                        panic("ipc_mqueue_receive: strange ith_state");
        !           718:                }
        !           719: 
        !           720:        default:
        !           721:                panic("ipc_mqueue_receive: strange wait_result");
        !           722:        }
        !           723: }
        !           724: 
        !           725: 
        !           726: /*
        !           727:  *     Routine:        ipc_mqueue_select
        !           728:  *     Purpose:
        !           729:  *             A receiver discovered that there was a message on the queue
        !           730:  *             before he had to block.  Pick the message off the queue and
        !           731:  *             "post" it to himself.
        !           732:  *     Conditions:
        !           733:  *             mqueue locked.
        !           734:  *             There is a message.
        !           735:  *     Returns:
        !           736:  *             MACH_MSG_SUCCESS        Actually selected a message for ourselves.
        !           737:  *             MACH_RCV_TOO_LARGE  May or may not have pull it, but it is large
        !           738:  */
        !           739: kern_return_t
        !           740: ipc_mqueue_select(
        !           741:        ipc_mqueue_t            mqueue,
        !           742:        mach_msg_option_t       option,
        !           743:        mach_msg_size_t         max_size,
        !           744:        ipc_kmsg_t                  *kmsgp,
        !           745:        mach_port_seqno_t       *seqnop)
        !           746: {              
        !           747:        ipc_kmsg_t kmsg;
        !           748:        mach_port_seqno_t seqno;
        !           749:        mach_msg_return_t mr;
        !           750: 
        !           751:        mr = MACH_MSG_SUCCESS;
        !           752: 
        !           753: 
        !           754:        /*
        !           755:         * Do some sanity checking of our ability to receive
        !           756:         * before pulling the message off the queue.
        !           757:         */
        !           758:        kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
        !           759: 
        !           760:        assert(kmsg != IKM_NULL);
        !           761: 
        !           762:        if (kmsg->ikm_header.msgh_size + 
        !           763:                REQUESTED_TRAILER_SIZE(option) > max_size) {
        !           764:                mr = MACH_RCV_TOO_LARGE;
        !           765:        } 
        !           766: #ifdef DOITFORSCATTER
        !           767:        else if (self->ith_scatter_list != MACH_MSG_BODY_NULL) {
        !           768:                mr = ipc_kmsg_check_scatter(
        !           769:                                                                        kmsg,
        !           770:                                                                        self->ith_option,
        !           771:                                                                        &self->ith_scatter_list, 
        !           772:                                                                        &self->ith_scatter_list_size);
        !           773:        }
        !           774: #endif /* DOITFORSCATTER */
        !           775: 
        !           776:        /*
        !           777:         * If we really can't receive it, but we had the
        !           778:         * MACH_RCV_LARGE option set, then don't take it off
        !           779:         * the queue, instead return the appropriate error
        !           780:         * (and size needed).
        !           781:         */
        !           782:        if (((mr == MACH_RCV_TOO_LARGE) ||
        !           783:                 (mr == MACH_RCV_SCATTER_SMALL)) &&
        !           784:                (option & MACH_RCV_LARGE)) {
        !           785:                *seqnop = 0;
        !           786:                *kmsgp = (ipc_kmsg_t)kmsg->ikm_header.msgh_size;
        !           787:                return mr;
        !           788:        }
        !           789: 
        !           790:        ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
        !           791:        ipc_mqueue_release_msgcount(mqueue);
        !           792:        *seqnop = mqueue->imq_seqno++;
        !           793:        *kmsgp = kmsg;
        !           794: 
        !           795:        current_task()->messages_received++;
        !           796:        return mr;
        !           797: }
        !           798: 
        !           799: /*
        !           800:  *     Routine:        ipc_mqueue_destroy
        !           801:  *     Purpose:
        !           802:  *             Destroy a message queue.  Set any blocked senders running.
        !           803:  *             Destroy the kmsgs in the queue.
        !           804:  *     Conditions:
        !           805:  *             Nothing locked.
        !           806:  *             Receivers were removed when the receive right was "changed"
        !           807:  */
        !           808: void
        !           809: ipc_mqueue_destroy(
        !           810:        ipc_mqueue_t    mqueue) 
        !           811: {
        !           812:        ipc_kmsg_queue_t kmqueue;
        !           813:        ipc_kmsg_t kmsg;
        !           814: 
        !           815:        imq_lock(mqueue);
        !           816:        /*
        !           817:         *      rouse all blocked senders
        !           818:         */
        !           819:        mqueue->imq_fullwaiters = FALSE;
        !           820:        wait_queue_wakeup_all_locked(&mqueue->imq_wait_queue,
        !           821:                                                                 IPC_MQUEUE_FULL,
        !           822:                                                                 THREAD_AWAKENED,
        !           823:                                                                 FALSE);
        !           824: 
        !           825:        kmqueue = &mqueue->imq_messages;
        !           826: 
        !           827:        while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) {
        !           828:                imq_unlock(mqueue);
        !           829:                ipc_kmsg_destroy_dest(kmsg);
        !           830:                imq_lock(mqueue);
        !           831:        }
        !           832:        imq_unlock(mqueue);
        !           833: }
        !           834: 
        !           835: /*
        !           836:  *     Routine:        ipc_mqueue_set_qlimit
        !           837:  *     Purpose:
        !           838:  *             Changes a message queue limit; the maximum number
        !           839:  *             of messages which may be queued.
        !           840:  *     Conditions:
        !           841:  *             Nothing locked.
        !           842:  */
        !           843: 
        !           844: void
        !           845: ipc_mqueue_set_qlimit(
        !           846:         ipc_mqueue_t                   mqueue,
        !           847:         mach_port_msgcount_t   qlimit)
        !           848: {
        !           849:         /* wake up senders allowed by the new qlimit */
        !           850:         imq_lock(mqueue);
        !           851:         if (qlimit > mqueue->imq_qlimit) {
        !           852:                 mach_port_msgcount_t i, wakeup;
        !           853: 
        !           854:                 /* caution: wakeup, qlimit are unsigned */
        !           855:                 wakeup = qlimit - mqueue->imq_qlimit;
        !           856: 
        !           857:                 for (i = 0; i < wakeup; i++) {
        !           858:                         if (wait_queue_wakeup_one_locked(&mqueue->imq_wait_queue,
        !           859:                                                                                          IPC_MQUEUE_FULL,
        !           860:                                                                                          THREAD_AWAKENED,
        !           861:                                                                                          FALSE) == KERN_NOT_WAITING) {
        !           862:                                         mqueue->imq_fullwaiters = FALSE;
        !           863:                                         break;
        !           864:                         }
        !           865:                 }
        !           866:         }
        !           867:        mqueue->imq_qlimit = qlimit;
        !           868:        imq_unlock(mqueue);
        !           869: }
        !           870: 
        !           871: /*
        !           872:  *     Routine:        ipc_mqueue_set_seqno
        !           873:  *     Purpose:
        !           874:  *             Changes an mqueue's sequence number.
        !           875:  *     Conditions:
        !           876:  *             Caller holds a reference to the queue's containing object.
        !           877:  */
        !           878: void
        !           879: ipc_mqueue_set_seqno(
        !           880:        ipc_mqueue_t            mqueue,
        !           881:        mach_port_seqno_t       seqno)
        !           882: {
        !           883:        imq_lock(mqueue);
        !           884:        mqueue->imq_seqno = seqno;
        !           885:        imq_unlock(mqueue);
        !           886: }
        !           887: 
        !           888: 
        !           889: /*
        !           890:  *     Routine:        ipc_mqueue_copyin
        !           891:  *     Purpose:
        !           892:  *             Convert a name in a space to a message queue.
        !           893:  *     Conditions:
        !           894:  *             Nothing locked.  If successful, the caller gets a ref for
        !           895:  *             for the object. This ref ensures the continued existence of
        !           896:  *             the queue.
        !           897:  *     Returns:
        !           898:  *             MACH_MSG_SUCCESS        Found a message queue.
        !           899:  *             MACH_RCV_INVALID_NAME   The space is dead.
        !           900:  *             MACH_RCV_INVALID_NAME   The name doesn't denote a right.
        !           901:  *             MACH_RCV_INVALID_NAME
        !           902:  *                     The denoted right is not receive or port set.
        !           903:  *             MACH_RCV_IN_SET         Receive right is a member of a set.
        !           904:  */
        !           905: 
        !           906: mach_msg_return_t
        !           907: ipc_mqueue_copyin(
        !           908:        ipc_space_t             space,
        !           909:        mach_port_name_t        name,
        !           910:        ipc_mqueue_t            *mqueuep,
        !           911:        ipc_object_t            *objectp)
        !           912: {
        !           913:        ipc_entry_t entry;
        !           914:        ipc_object_t object;
        !           915:        ipc_mqueue_t mqueue;
        !           916: 
        !           917:        is_read_lock(space);
        !           918:        if (!space->is_active) {
        !           919:                is_read_unlock(space);
        !           920:                return MACH_RCV_INVALID_NAME;
        !           921:        }
        !           922: 
        !           923:        entry = ipc_entry_lookup(space, name);
        !           924:        if (entry == IE_NULL) {
        !           925:                is_read_unlock(space);
        !           926:                return MACH_RCV_INVALID_NAME;
        !           927:        }
        !           928: 
        !           929:        object = entry->ie_object;
        !           930: 
        !           931:        if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
        !           932:                ipc_port_t port;
        !           933:                ipc_pset_t pset;
        !           934: 
        !           935:                port = (ipc_port_t) object;
        !           936:                assert(port != IP_NULL);
        !           937: 
        !           938:                ip_lock(port);
        !           939:                assert(ip_active(port));
        !           940:                assert(port->ip_receiver_name == name);
        !           941:                assert(port->ip_receiver == space);
        !           942:                is_read_unlock(space);
        !           943:                mqueue = &port->ip_messages;
        !           944: 
        !           945:        } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
        !           946:                ipc_pset_t pset;
        !           947: 
        !           948:                pset = (ipc_pset_t) object;
        !           949:                assert(pset != IPS_NULL);
        !           950: 
        !           951:                ips_lock(pset);
        !           952:                assert(ips_active(pset));
        !           953:                assert(pset->ips_local_name == name);
        !           954:                is_read_unlock(space);
        !           955: 
        !           956:                mqueue = &pset->ips_messages;
        !           957:        } else {
        !           958:                is_read_unlock(space);
        !           959:                return MACH_RCV_INVALID_NAME;
        !           960:        }
        !           961: 
        !           962:        /*
        !           963:         *      At this point, the object is locked and active,
        !           964:         *      the space is unlocked, and mqueue is initialized.
        !           965:         */
        !           966: 
        !           967:        io_reference(object);
        !           968:        io_unlock(object);
        !           969: 
        !           970:        *objectp = object;
        !           971:        *mqueuep = mqueue;
        !           972:        return MACH_MSG_SUCCESS;
        !           973: }
        !           974: 

unix.superglobalmegacorp.com

This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.