|
|
1.1 ! root 1: /* ! 2: * Copyright (c) 2000 Apple Computer, Inc. All rights reserved. ! 3: * ! 4: * @APPLE_LICENSE_HEADER_START@ ! 5: * ! 6: * The contents of this file constitute Original Code as defined in and ! 7: * are subject to the Apple Public Source License Version 1.1 (the ! 8: * "License"). You may not use this file except in compliance with the ! 9: * License. Please obtain a copy of the License at ! 10: * http://www.apple.com/publicsource and read it before using this file. ! 11: * ! 12: * This Original Code and all software distributed under the License are ! 13: * distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, EITHER ! 14: * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, ! 15: * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, ! 16: * FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT. Please see the ! 17: * License for the specific language governing rights and limitations ! 18: * under the License. ! 19: * ! 20: * @APPLE_LICENSE_HEADER_END@ ! 21: */ ! 22: /* ! 23: * @OSF_FREE_COPYRIGHT@ ! 24: */ ! 25: /* ! 26: * Mach Operating System ! 27: * Copyright (c) 1991,1990,1989 Carnegie Mellon University ! 28: * All Rights Reserved. ! 29: * ! 30: * Permission to use, copy, modify and distribute this software and its ! 31: * documentation is hereby granted, provided that both the copyright ! 32: * notice and this permission notice appear in all copies of the ! 33: * software, derivative works or modified versions, and any portions ! 34: * thereof, and that both notices appear in supporting documentation. ! 35: * ! 36: * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" ! 37: * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR ! 38: * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. ! 39: * ! 40: * Carnegie Mellon requests users of this software to return to ! 41: * ! 42: * Software Distribution Coordinator or [email protected] ! 43: * School of Computer Science ! 44: * Carnegie Mellon University ! 45: * Pittsburgh PA 15213-3890 ! 46: * ! 47: * any improvements or extensions that they make and grant Carnegie Mellon ! 48: * the rights to redistribute these changes. ! 49: */ ! 50: /* ! 51: */ ! 52: /* ! 53: * File: ipc/ipc_mqueue.c ! 54: * Author: Rich Draves ! 55: * Date: 1989 ! 56: * ! 57: * Functions to manipulate IPC message queues. ! 58: */ ! 59: ! 60: #include <dipc.h> ! 61: ! 62: #include <mach/port.h> ! 63: #include <mach/message.h> ! 64: #include <mach/sync_policy.h> ! 65: ! 66: #include <kern/assert.h> ! 67: #include <kern/counters.h> ! 68: #include <kern/sched_prim.h> ! 69: #include <kern/ipc_kobject.h> ! 70: #include <kern/misc_protos.h> ! 71: #include <kern/task.h> ! 72: #include <kern/thread.h> ! 73: #include <kern/wait_queue.h> ! 74: ! 75: #include <ipc/ipc_mqueue.h> ! 76: #include <ipc/ipc_kmsg.h> ! 77: #include <ipc/ipc_port.h> ! 78: #include <ipc/ipc_pset.h> ! 79: #include <ipc/ipc_space.h> ! 80: ! 81: #include <ddb/tr.h> ! 82: ! 83: int ipc_mqueue_full; /* address is event for queue space */ ! 84: int ipc_mqueue_rcv; /* address is event for message arrival */ ! 85: ! 86: #define TR_ENABLE 0 ! 87: ! 88: /* ! 89: * Routine: ipc_mqueue_init ! 90: * Purpose: ! 91: * Initialize a newly-allocated message queue. ! 92: */ ! 93: void ! 94: ipc_mqueue_init( ! 95: ipc_mqueue_t mqueue, ! 96: boolean_t is_set) ! 97: { ! 98: if (is_set) { ! 99: wait_queue_sub_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO); ! 100: } else { ! 101: wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO); ! 102: ipc_kmsg_queue_init(&mqueue->imq_messages); ! 103: mqueue->imq_seqno = 0; ! 104: mqueue->imq_msgcount = 0; ! 105: mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT; ! 106: mqueue->imq_fullwaiters = FALSE; ! 107: } ! 108: } ! 109: ! 110: /* ! 111: * Routine: ipc_mqueue_member ! 112: * Purpose: ! 113: * Indicate whether the (port) mqueue is a member of ! 114: * this portset's mqueue. We do this by checking ! 115: * whether the portset mqueue's waitq is an member of ! 116: * the port's mqueue waitq. ! 117: * Conditions: ! 118: * the portset's mqueue is not already a member ! 119: * this may block while allocating linkage structures. ! 120: */ ! 121: ! 122: boolean_t ! 123: ipc_mqueue_member( ! 124: ipc_mqueue_t port_mqueue, ! 125: ipc_mqueue_t set_mqueue) ! 126: { ! 127: wait_queue_t port_waitq = &port_mqueue->imq_wait_queue; ! 128: wait_queue_t set_waitq = &set_mqueue->imq_wait_queue; ! 129: ! 130: return (wait_queue_member(port_waitq, set_waitq)); ! 131: ! 132: } ! 133: ! 134: /* ! 135: * Routine: ipc_mqueue_remove ! 136: * Purpose: ! 137: * Remove the association between the queue and the specified ! 138: * subordinate message queue. ! 139: */ ! 140: ! 141: kern_return_t ! 142: ipc_mqueue_remove( ! 143: ipc_mqueue_t mqueue, ! 144: ipc_mqueue_t sub_mqueue) ! 145: { ! 146: wait_queue_t mq_waitq = &mqueue->imq_wait_queue; ! 147: wait_queue_sub_t sub_waitq = &sub_mqueue->imq_set_queue; ! 148: ! 149: if (wait_queue_member(mq_waitq, sub_waitq)) { ! 150: wait_queue_unlink(mq_waitq, sub_waitq); ! 151: return KERN_SUCCESS; ! 152: } ! 153: return KERN_NOT_IN_SET; ! 154: } ! 155: ! 156: /* ! 157: * Routine: ipc_mqueue_remove_one ! 158: * Purpose: ! 159: * Find and remove one subqueue from the queue. ! 160: * Conditions: ! 161: * Will return the set mqueue that was removed ! 162: */ ! 163: void ! 164: ipc_mqueue_remove_one( ! 165: ipc_mqueue_t mqueue, ! 166: ipc_mqueue_t *sub_queuep) ! 167: { ! 168: wait_queue_t mq_waitq = &mqueue->imq_wait_queue; ! 169: ! 170: wait_queue_unlink_one(mq_waitq, (wait_queue_sub_t *)sub_queuep); ! 171: return; ! 172: } ! 173: ! 174: ! 175: /* ! 176: * Routine: ipc_mqueue_add ! 177: * Purpose: ! 178: * Associate the portset's mqueue with the port's mqueue. ! 179: * This has to be done so that posting the port will wakeup ! 180: * a portset waiter. If there are waiters on the portset ! 181: * mqueue and messages on the port mqueue, try to match them ! 182: * up now. ! 183: * Conditions: ! 184: * May block. ! 185: */ ! 186: kern_return_t ! 187: ipc_mqueue_add( ! 188: ipc_mqueue_t port_mqueue, ! 189: ipc_mqueue_t set_mqueue) ! 190: { ! 191: wait_queue_t port_waitq = &port_mqueue->imq_wait_queue; ! 192: wait_queue_sub_t set_waitq = &set_mqueue->imq_set_queue; ! 193: ipc_kmsg_queue_t kmsgq; ! 194: ipc_kmsg_t kmsg, next; ! 195: kern_return_t kr; ! 196: ! 197: kr = wait_queue_link(port_waitq, set_waitq); ! 198: if (kr != KERN_SUCCESS) ! 199: return kr; ! 200: ! 201: /* ! 202: * Now that the set has been added to the port, there may be ! 203: * messages queued on the port and threads waiting on the set ! 204: * waitq. Lets get them together. ! 205: */ ! 206: imq_lock(port_mqueue); ! 207: kmsgq = &port_mqueue->imq_messages; ! 208: for (kmsg = ipc_kmsg_queue_first(kmsgq); ! 209: kmsg != IKM_NULL; ! 210: kmsg = next) { ! 211: next = ipc_kmsg_queue_next(kmsgq, kmsg); ! 212: ! 213: for (;;) { ! 214: thread_t th; ! 215: spl_t s; ! 216: ! 217: s = splsched(); ! 218: th = wait_queue_wakeup_identity_locked(port_waitq, ! 219: IPC_MQUEUE_RECEIVE, ! 220: THREAD_AWAKENED, ! 221: FALSE); ! 222: /* waitq/mqueue still locked, thread locked */ ! 223: ! 224: if (th == THREAD_NULL) { ! 225: splx(s); ! 226: goto leave; ! 227: } ! 228: ! 229: /* ! 230: * Found a receiver. see if they can handle the message ! 231: * correctly (the message is not too large for them, or ! 232: * they didn't care to be informed that the message was ! 233: * too large). If they can't handle it, take them off ! 234: * the list and let them go back and figure it out and ! 235: * just move onto the next. ! 236: */ ! 237: if (th->ith_msize < ! 238: kmsg->ikm_header.msgh_size + ! 239: REQUESTED_TRAILER_SIZE(th->ith_option)) { ! 240: th->ith_state = MACH_RCV_TOO_LARGE; ! 241: th->ith_msize = kmsg->ikm_header.msgh_size; ! 242: if (th->ith_option & MACH_RCV_LARGE) { ! 243: /* ! 244: * let him go without message ! 245: */ ! 246: th->ith_kmsg = IKM_NULL; ! 247: th->ith_seqno = 0; ! 248: thread_unlock(th); ! 249: splx(s); ! 250: continue; /* find another thread */ ! 251: } ! 252: } else { ! 253: th->ith_state = MACH_MSG_SUCCESS; ! 254: } ! 255: ! 256: /* ! 257: * This thread is going to take this message, ! 258: * so give it to him. ! 259: */ ! 260: ipc_mqueue_release_msgcount(port_mqueue); ! 261: ipc_kmsg_rmqueue(kmsgq, kmsg); ! 262: th->ith_kmsg = kmsg; ! 263: th->ith_seqno = port_mqueue->imq_seqno++; ! 264: thread_unlock(th); ! 265: splx(s); ! 266: break; /* go to next message */ ! 267: } ! 268: ! 269: } ! 270: leave: ! 271: imq_unlock(port_mqueue); ! 272: return KERN_SUCCESS; ! 273: } ! 274: ! 275: /* ! 276: * Routine: ipc_mqueue_changed ! 277: * Purpose: ! 278: * Wake up receivers waiting in a message queue. ! 279: * Conditions: ! 280: * The message queue is locked. ! 281: */ ! 282: ! 283: void ! 284: ipc_mqueue_changed( ! 285: ipc_mqueue_t mqueue) ! 286: { ! 287: wait_queue_wakeup_all_locked(&mqueue->imq_wait_queue, ! 288: IPC_MQUEUE_RECEIVE, ! 289: THREAD_RESTART, ! 290: FALSE); /* unlock waitq? */ ! 291: } ! 292: ! 293: ! 294: ! 295: ! 296: /* ! 297: * Routine: ipc_mqueue_send ! 298: * Purpose: ! 299: * Send a message to a message queue. The message holds a reference ! 300: * for the destination port for this message queue in the ! 301: * msgh_remote_port field. ! 302: * ! 303: * If unsuccessful, the caller still has possession of ! 304: * the message and must do something with it. If successful, ! 305: * the message is queued, given to a receiver, or destroyed. ! 306: * Conditions: ! 307: * Nothing locked. ! 308: * Returns: ! 309: * MACH_MSG_SUCCESS The message was accepted. ! 310: * MACH_SEND_TIMED_OUT Caller still has message. ! 311: * MACH_SEND_INTERRUPTED Caller still has message. ! 312: */ ! 313: mach_msg_return_t ! 314: ipc_mqueue_send( ! 315: ipc_mqueue_t mqueue, ! 316: ipc_kmsg_t kmsg, ! 317: mach_msg_option_t option, ! 318: mach_msg_timeout_t timeout) ! 319: { ! 320: int save_wait_result; ! 321: ! 322: /* ! 323: * Don't block if: ! 324: * 1) We're under the queue limit. ! 325: * 2) Caller used the MACH_SEND_ALWAYS internal option. ! 326: * 3) Message is sent to a send-once right. ! 327: */ ! 328: imq_lock(mqueue); ! 329: ! 330: if (!imq_full(mqueue) || ! 331: (option & MACH_SEND_ALWAYS) || ! 332: (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header.msgh_bits) == ! 333: MACH_MSG_TYPE_PORT_SEND_ONCE)) { ! 334: mqueue->imq_msgcount++; ! 335: imq_unlock(mqueue); ! 336: ! 337: } else { ! 338: ! 339: /* ! 340: * We have to wait for space to be granted to us. ! 341: */ ! 342: if ((option & MACH_SEND_TIMEOUT) && (timeout == 0)) { ! 343: imq_unlock(mqueue); ! 344: return MACH_SEND_TIMED_OUT; ! 345: } ! 346: mqueue->imq_fullwaiters = TRUE; ! 347: wait_queue_assert_wait_locked(&mqueue->imq_wait_queue, ! 348: IPC_MQUEUE_FULL, ! 349: THREAD_ABORTSAFE, ! 350: TRUE); /* unlock? */ ! 351: /* wait/mqueue is unlocked */ ! 352: ! 353: if (option & MACH_SEND_TIMEOUT) ! 354: thread_set_timer(timeout, 1000*NSEC_PER_USEC); ! 355: ! 356: counter(c_ipc_mqueue_send_block++); ! 357: save_wait_result = thread_block((void (*)(void)) 0); ! 358: ! 359: if (option & MACH_SEND_TIMEOUT) ! 360: thread_cancel_timer(); ! 361: ! 362: ! 363: switch (save_wait_result) { ! 364: case THREAD_TIMED_OUT: ! 365: assert(option & MACH_SEND_TIMEOUT); ! 366: return MACH_SEND_TIMED_OUT; ! 367: ! 368: case THREAD_AWAKENED: ! 369: break; /* we can proceed - inherited msgcount from waker */ ! 370: ! 371: case THREAD_INTERRUPTED: ! 372: return MACH_SEND_INTERRUPTED; ! 373: ! 374: case THREAD_RESTART: ! 375: default: ! 376: panic("ipc_mqueue_send"); ! 377: } ! 378: } ! 379: ! 380: ipc_mqueue_post(mqueue, kmsg); ! 381: return MACH_MSG_SUCCESS; ! 382: } ! 383: ! 384: /* ! 385: * Routine: ipc_mqueue_release_msgcount ! 386: * Purpose: ! 387: * Release a message queue reference in the case where we ! 388: * found a waiter. ! 389: * ! 390: * Conditions: ! 391: * The message queue is locked ! 392: */ ! 393: void ! 394: ipc_mqueue_release_msgcount( ! 395: ipc_mqueue_t mqueue) ! 396: { ! 397: assert(imq_held(mqueue)); ! 398: assert(mqueue->imq_msgcount > 0); ! 399: ! 400: mqueue->imq_msgcount--; ! 401: if (!imq_full(mqueue) && mqueue->imq_fullwaiters) { ! 402: if (wait_queue_wakeup_one_locked(&mqueue->imq_wait_queue, ! 403: IPC_MQUEUE_FULL, ! 404: THREAD_AWAKENED, ! 405: FALSE) != KERN_SUCCESS) { ! 406: mqueue->imq_fullwaiters = FALSE; ! 407: } else { ! 408: mqueue->imq_msgcount++; /* gave it away */ ! 409: } ! 410: } ! 411: } ! 412: ! 413: /* ! 414: * Routine: ipc_mqueue_post ! 415: * Purpose: ! 416: * Post a message to a waiting receiver or enqueue it. If a ! 417: * receiver is waiting, we can release our reserved space in ! 418: * the message queue. ! 419: * ! 420: * Conditions: ! 421: * If we need to queue, our space in the message queue is reserved. ! 422: */ ! 423: void ! 424: ipc_mqueue_post( ! 425: register ipc_mqueue_t mqueue, ! 426: register ipc_kmsg_t kmsg) ! 427: { ! 428: ! 429: /* ! 430: * While the msg queue is locked, we have control of the ! 431: * kmsg, so the ref in it for the port is still good. ! 432: * ! 433: * Check for a receiver for the message. ! 434: */ ! 435: imq_lock(mqueue); ! 436: for (;;) { ! 437: wait_queue_t waitq = &mqueue->imq_wait_queue; ! 438: thread_t receiver; ! 439: spl_t s; ! 440: ! 441: s = splsched(); ! 442: receiver = wait_queue_wakeup_identity_locked(waitq, ! 443: IPC_MQUEUE_RECEIVE, ! 444: THREAD_AWAKENED, ! 445: FALSE); ! 446: /* waitq still locked, thread locked */ ! 447: ! 448: if (receiver == THREAD_NULL) { ! 449: /* ! 450: * no receivers; queue kmsg ! 451: */ ! 452: assert(mqueue->imq_msgcount > 0); ! 453: ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg); ! 454: splx(s); ! 455: break; ! 456: } ! 457: ! 458: /* ! 459: * We found a waiting thread. ! 460: * If the message is too large or the scatter list is too small ! 461: * the thread we wake up will get that as its status. ! 462: */ ! 463: if (receiver->ith_msize < ! 464: (kmsg->ikm_header.msgh_size) + ! 465: REQUESTED_TRAILER_SIZE(receiver->ith_option)) { ! 466: receiver->ith_msize = kmsg->ikm_header.msgh_size; ! 467: receiver->ith_state = MACH_RCV_TOO_LARGE; ! 468: } else { ! 469: receiver->ith_state = ! 470: #ifdef DOITFORSCATTER ! 471: (receiver->ith_scatter_list != MACH_MSG_BODY_NULL) ? ! 472: ipc_kmsg_check_scatter(kmsg, ! 473: receiver->ith_option, ! 474: &receiver->ith_scatter_list, ! 475: &receiver->ith_scatter_list_size) : ! 476: #endif /* DOITFORSCATTER */ ! 477: MACH_MSG_SUCCESS; ! 478: } ! 479: ! 480: /* ! 481: * If there is no problem with the upcoming receive, or the ! 482: * receiver thread didn't specifically ask for special too ! 483: * large error condition, go ahead and select it anyway. ! 484: */ ! 485: if ((receiver->ith_state == MACH_MSG_SUCCESS) || ! 486: !(receiver->ith_option & MACH_RCV_LARGE)) { ! 487: ! 488: receiver->ith_kmsg = kmsg; ! 489: receiver->ith_seqno = mqueue->imq_seqno++; ! 490: thread_unlock(receiver); ! 491: splx(s); ! 492: ! 493: /* we didn't need our reserved spot in the queue */ ! 494: ipc_mqueue_release_msgcount(mqueue); ! 495: break; ! 496: } ! 497: ! 498: /* ! 499: * Otherwise, this thread needs to be released to run ! 500: * and handle its error without getting the message. We ! 501: * need to go back and pick another one. ! 502: */ ! 503: receiver->ith_kmsg = IKM_NULL; ! 504: receiver->ith_seqno = 0; ! 505: thread_unlock(receiver); ! 506: splx(s); ! 507: } ! 508: ! 509: imq_unlock(mqueue); ! 510: current_task()->messages_sent++; ! 511: return; ! 512: } ! 513: ! 514: ! 515: /* ! 516: * Routine: ipc_mqueue_receive ! 517: * Purpose: ! 518: * Receive a message from a message queue. ! 519: * ! 520: * If continuation is non-zero, then we might discard ! 521: * our kernel stack when we block. We will continue ! 522: * after unblocking by executing continuation. ! 523: * ! 524: * If resume is true, then we are resuming a receive ! 525: * operation after a blocked receive discarded our stack. ! 526: * Conditions: ! 527: * Our caller must hold a reference for the port or port set ! 528: * to which this queue belongs, to keep the queue ! 529: * from being deallocated. ! 530: * ! 531: * The kmsg is returned with clean header fields ! 532: * and with the circular bit turned off. ! 533: * Returns: ! 534: * MACH_MSG_SUCCESS Message returned in kmsgp. ! 535: * MACH_RCV_TOO_LARGE Message size returned in kmsgp. ! 536: * MACH_RCV_TIMED_OUT No message obtained. ! 537: * MACH_RCV_INTERRUPTED No message obtained. ! 538: * MACH_RCV_PORT_DIED Port/set died; no message. ! 539: * MACH_RCV_PORT_CHANGED Port moved into set; no msg. ! 540: * ! 541: */ ! 542: ! 543: mach_msg_return_t ! 544: ipc_mqueue_receive( ! 545: ipc_mqueue_t mqueue, ! 546: mach_msg_option_t option, ! 547: mach_msg_size_t max_size, ! 548: mach_msg_timeout_t timeout, ! 549: int interruptible, ! 550: ipc_kmsg_t *kmsgp, ! 551: mach_port_seqno_t *seqnop) ! 552: { ! 553: ipc_port_t port; ! 554: mach_msg_return_t mr, mr2; ! 555: ipc_kmsg_queue_t kmsgs; ! 556: kern_return_t save_wait_result; ! 557: thread_t self; ! 558: ! 559: imq_lock(mqueue); ! 560: ! 561: if (imq_is_set(mqueue)) { ! 562: wait_queue_link_t wql; ! 563: ipc_mqueue_t port_mq; ! 564: queue_t q; ! 565: ! 566: q = &mqueue->imq_setlinks; ! 567: ! 568: /* ! 569: * If we are waiting on a portset mqueue, we need to see if ! 570: * any of the member ports have work for us. If so, try to ! 571: * deliver one of those messages. By holding the portset's ! 572: * mqueue lock during the search, we tie up any attempts by ! 573: * mqueue_deliver or portset membership changes that may ! 574: * cross our path. But this is a lock order violation, so we ! 575: * have to do it "softly." If we don't find a message waiting ! 576: * for us, we will assert our intention to wait while still ! 577: * holding that lock. When we release the lock, the deliver/ ! 578: * change will succeed and find us. ! 579: */ ! 580: search_set: ! 581: queue_iterate(q, wql, wait_queue_link_t, wql_sublinks) { ! 582: port_mq = (ipc_mqueue_t)wql->wql_queue; ! 583: kmsgs = &port_mq->imq_messages; ! 584: ! 585: if (!ipc_kmsg_queue_first(kmsgs)) /* peek first */ ! 586: continue; ! 587: ! 588: if (!imq_lock_try(port_mq)) { ! 589: imq_unlock(mqueue); ! 590: imq_lock(mqueue); ! 591: goto search_set; /* start again at beginning - SMP */ ! 592: } ! 593: ! 594: /* ! 595: * If there is still a message to be had, we will ! 596: * try to select it (may not succeed because of size ! 597: * and options). In any case, we deliver those ! 598: * results back to the user. ! 599: * ! 600: * We also move the port's linkage to the tail of the ! 601: * list for this set (fairness). Future versions will ! 602: * sort by timestamp or priority. ! 603: */ ! 604: if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) { ! 605: imq_unlock(port_mq); ! 606: continue; ! 607: } ! 608: queue_remove(q, wql, wait_queue_link_t, wql_sublinks); ! 609: queue_enter(q, wql, wait_queue_link_t, wql_sublinks); ! 610: imq_unlock(mqueue); ! 611: ! 612: mr = ipc_mqueue_select(port_mq, option, max_size, kmsgp, seqnop); ! 613: imq_unlock(port_mq); ! 614: return mr; ! 615: ! 616: } ! 617: ! 618: } else { ! 619: ! 620: /* ! 621: * Receive on a single port. Just try to get the messages. ! 622: */ ! 623: kmsgs = &mqueue->imq_messages; ! 624: if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) { ! 625: mr = ipc_mqueue_select(mqueue, option, max_size, kmsgp, seqnop); ! 626: imq_unlock(mqueue); ! 627: return mr; ! 628: } ! 629: } ! 630: ! 631: /* ! 632: * Looks like we'll have to block. The mqueue we will ! 633: * block on (whether the set's or the local port's) is ! 634: * still locked. ! 635: */ ! 636: if (option & MACH_RCV_TIMEOUT) { ! 637: if (timeout == 0) { ! 638: imq_unlock(mqueue); ! 639: return MACH_RCV_TIMED_OUT; ! 640: } ! 641: } ! 642: self = current_thread(); ! 643: ! 644: self->ith_state = MACH_RCV_IN_PROGRESS; ! 645: self->ith_option = option; ! 646: self->ith_msize = max_size; ! 647: ! 648: wait_queue_assert_wait_locked(&mqueue->imq_wait_queue, ! 649: IPC_MQUEUE_RECEIVE, ! 650: interruptible, ! 651: TRUE); /* unlock? */ ! 652: /* mqueue/waitq is unlocked */ ! 653: ! 654: if (option & MACH_RCV_TIMEOUT) { ! 655: thread_set_timer(timeout, 1000*NSEC_PER_USEC); ! 656: } ! 657: ! 658: if (interruptible == THREAD_ABORTSAFE) { ! 659: counter(c_ipc_mqueue_receive_block_user++); ! 660: } else { ! 661: counter(c_ipc_mqueue_receive_block_kernel++); ! 662: } ! 663: ! 664: save_wait_result = thread_block((void (*)(void))0); ! 665: ! 666: if (option & MACH_RCV_TIMEOUT) ! 667: thread_cancel_timer(); ! 668: ! 669: /* ! 670: * why did we wake up? ! 671: */ ! 672: switch (save_wait_result) { ! 673: case THREAD_TIMED_OUT: ! 674: return MACH_RCV_TIMED_OUT; ! 675: ! 676: case THREAD_INTERRUPTED: ! 677: return MACH_RCV_INTERRUPTED; ! 678: ! 679: case THREAD_RESTART: ! 680: /* something bad happened to the port/set */ ! 681: return MACH_RCV_PORT_CHANGED; ! 682: ! 683: case THREAD_AWAKENED: ! 684: /* ! 685: * We do not need to go select a message, somebody ! 686: * handed us one (or a too-large indication). ! 687: */ ! 688: ! 689: mr = MACH_MSG_SUCCESS; ! 690: ! 691: switch (self->ith_state) { ! 692: case MACH_RCV_SCATTER_SMALL: ! 693: case MACH_RCV_TOO_LARGE: ! 694: /* ! 695: * Somebody tried to give us a too large ! 696: * message. If we indicated that we cared, ! 697: * then they only gave us the indication, ! 698: * otherwise they gave us the indication ! 699: * AND the message anyway. ! 700: */ ! 701: if (option & MACH_RCV_LARGE) { ! 702: if (self->ith_state == MACH_RCV_TOO_LARGE) ! 703: *kmsgp = (ipc_kmsg_t)self->ith_msize; ! 704: return(self->ith_state); ! 705: } else { ! 706: mr = self->ith_state; ! 707: /* fall thru to SUCCESS case */ ! 708: } ! 709: ! 710: case MACH_MSG_SUCCESS: ! 711: /* pick up the message that was handed to us */ ! 712: *kmsgp = self->ith_kmsg; ! 713: *seqnop = self->ith_seqno; ! 714: return mr; ! 715: ! 716: default: ! 717: panic("ipc_mqueue_receive: strange ith_state"); ! 718: } ! 719: ! 720: default: ! 721: panic("ipc_mqueue_receive: strange wait_result"); ! 722: } ! 723: } ! 724: ! 725: ! 726: /* ! 727: * Routine: ipc_mqueue_select ! 728: * Purpose: ! 729: * A receiver discovered that there was a message on the queue ! 730: * before he had to block. Pick the message off the queue and ! 731: * "post" it to himself. ! 732: * Conditions: ! 733: * mqueue locked. ! 734: * There is a message. ! 735: * Returns: ! 736: * MACH_MSG_SUCCESS Actually selected a message for ourselves. ! 737: * MACH_RCV_TOO_LARGE May or may not have pull it, but it is large ! 738: */ ! 739: kern_return_t ! 740: ipc_mqueue_select( ! 741: ipc_mqueue_t mqueue, ! 742: mach_msg_option_t option, ! 743: mach_msg_size_t max_size, ! 744: ipc_kmsg_t *kmsgp, ! 745: mach_port_seqno_t *seqnop) ! 746: { ! 747: ipc_kmsg_t kmsg; ! 748: mach_port_seqno_t seqno; ! 749: mach_msg_return_t mr; ! 750: ! 751: mr = MACH_MSG_SUCCESS; ! 752: ! 753: ! 754: /* ! 755: * Do some sanity checking of our ability to receive ! 756: * before pulling the message off the queue. ! 757: */ ! 758: kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages); ! 759: ! 760: assert(kmsg != IKM_NULL); ! 761: ! 762: if (kmsg->ikm_header.msgh_size + ! 763: REQUESTED_TRAILER_SIZE(option) > max_size) { ! 764: mr = MACH_RCV_TOO_LARGE; ! 765: } ! 766: #ifdef DOITFORSCATTER ! 767: else if (self->ith_scatter_list != MACH_MSG_BODY_NULL) { ! 768: mr = ipc_kmsg_check_scatter( ! 769: kmsg, ! 770: self->ith_option, ! 771: &self->ith_scatter_list, ! 772: &self->ith_scatter_list_size); ! 773: } ! 774: #endif /* DOITFORSCATTER */ ! 775: ! 776: /* ! 777: * If we really can't receive it, but we had the ! 778: * MACH_RCV_LARGE option set, then don't take it off ! 779: * the queue, instead return the appropriate error ! 780: * (and size needed). ! 781: */ ! 782: if (((mr == MACH_RCV_TOO_LARGE) || ! 783: (mr == MACH_RCV_SCATTER_SMALL)) && ! 784: (option & MACH_RCV_LARGE)) { ! 785: *seqnop = 0; ! 786: *kmsgp = (ipc_kmsg_t)kmsg->ikm_header.msgh_size; ! 787: return mr; ! 788: } ! 789: ! 790: ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg); ! 791: ipc_mqueue_release_msgcount(mqueue); ! 792: *seqnop = mqueue->imq_seqno++; ! 793: *kmsgp = kmsg; ! 794: ! 795: current_task()->messages_received++; ! 796: return mr; ! 797: } ! 798: ! 799: /* ! 800: * Routine: ipc_mqueue_destroy ! 801: * Purpose: ! 802: * Destroy a message queue. Set any blocked senders running. ! 803: * Destroy the kmsgs in the queue. ! 804: * Conditions: ! 805: * Nothing locked. ! 806: * Receivers were removed when the receive right was "changed" ! 807: */ ! 808: void ! 809: ipc_mqueue_destroy( ! 810: ipc_mqueue_t mqueue) ! 811: { ! 812: ipc_kmsg_queue_t kmqueue; ! 813: ipc_kmsg_t kmsg; ! 814: ! 815: imq_lock(mqueue); ! 816: /* ! 817: * rouse all blocked senders ! 818: */ ! 819: mqueue->imq_fullwaiters = FALSE; ! 820: wait_queue_wakeup_all_locked(&mqueue->imq_wait_queue, ! 821: IPC_MQUEUE_FULL, ! 822: THREAD_AWAKENED, ! 823: FALSE); ! 824: ! 825: kmqueue = &mqueue->imq_messages; ! 826: ! 827: while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) { ! 828: imq_unlock(mqueue); ! 829: ipc_kmsg_destroy_dest(kmsg); ! 830: imq_lock(mqueue); ! 831: } ! 832: imq_unlock(mqueue); ! 833: } ! 834: ! 835: /* ! 836: * Routine: ipc_mqueue_set_qlimit ! 837: * Purpose: ! 838: * Changes a message queue limit; the maximum number ! 839: * of messages which may be queued. ! 840: * Conditions: ! 841: * Nothing locked. ! 842: */ ! 843: ! 844: void ! 845: ipc_mqueue_set_qlimit( ! 846: ipc_mqueue_t mqueue, ! 847: mach_port_msgcount_t qlimit) ! 848: { ! 849: /* wake up senders allowed by the new qlimit */ ! 850: imq_lock(mqueue); ! 851: if (qlimit > mqueue->imq_qlimit) { ! 852: mach_port_msgcount_t i, wakeup; ! 853: ! 854: /* caution: wakeup, qlimit are unsigned */ ! 855: wakeup = qlimit - mqueue->imq_qlimit; ! 856: ! 857: for (i = 0; i < wakeup; i++) { ! 858: if (wait_queue_wakeup_one_locked(&mqueue->imq_wait_queue, ! 859: IPC_MQUEUE_FULL, ! 860: THREAD_AWAKENED, ! 861: FALSE) == KERN_NOT_WAITING) { ! 862: mqueue->imq_fullwaiters = FALSE; ! 863: break; ! 864: } ! 865: } ! 866: } ! 867: mqueue->imq_qlimit = qlimit; ! 868: imq_unlock(mqueue); ! 869: } ! 870: ! 871: /* ! 872: * Routine: ipc_mqueue_set_seqno ! 873: * Purpose: ! 874: * Changes an mqueue's sequence number. ! 875: * Conditions: ! 876: * Caller holds a reference to the queue's containing object. ! 877: */ ! 878: void ! 879: ipc_mqueue_set_seqno( ! 880: ipc_mqueue_t mqueue, ! 881: mach_port_seqno_t seqno) ! 882: { ! 883: imq_lock(mqueue); ! 884: mqueue->imq_seqno = seqno; ! 885: imq_unlock(mqueue); ! 886: } ! 887: ! 888: ! 889: /* ! 890: * Routine: ipc_mqueue_copyin ! 891: * Purpose: ! 892: * Convert a name in a space to a message queue. ! 893: * Conditions: ! 894: * Nothing locked. If successful, the caller gets a ref for ! 895: * for the object. This ref ensures the continued existence of ! 896: * the queue. ! 897: * Returns: ! 898: * MACH_MSG_SUCCESS Found a message queue. ! 899: * MACH_RCV_INVALID_NAME The space is dead. ! 900: * MACH_RCV_INVALID_NAME The name doesn't denote a right. ! 901: * MACH_RCV_INVALID_NAME ! 902: * The denoted right is not receive or port set. ! 903: * MACH_RCV_IN_SET Receive right is a member of a set. ! 904: */ ! 905: ! 906: mach_msg_return_t ! 907: ipc_mqueue_copyin( ! 908: ipc_space_t space, ! 909: mach_port_name_t name, ! 910: ipc_mqueue_t *mqueuep, ! 911: ipc_object_t *objectp) ! 912: { ! 913: ipc_entry_t entry; ! 914: ipc_object_t object; ! 915: ipc_mqueue_t mqueue; ! 916: ! 917: is_read_lock(space); ! 918: if (!space->is_active) { ! 919: is_read_unlock(space); ! 920: return MACH_RCV_INVALID_NAME; ! 921: } ! 922: ! 923: entry = ipc_entry_lookup(space, name); ! 924: if (entry == IE_NULL) { ! 925: is_read_unlock(space); ! 926: return MACH_RCV_INVALID_NAME; ! 927: } ! 928: ! 929: object = entry->ie_object; ! 930: ! 931: if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) { ! 932: ipc_port_t port; ! 933: ipc_pset_t pset; ! 934: ! 935: port = (ipc_port_t) object; ! 936: assert(port != IP_NULL); ! 937: ! 938: ip_lock(port); ! 939: assert(ip_active(port)); ! 940: assert(port->ip_receiver_name == name); ! 941: assert(port->ip_receiver == space); ! 942: is_read_unlock(space); ! 943: mqueue = &port->ip_messages; ! 944: ! 945: } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) { ! 946: ipc_pset_t pset; ! 947: ! 948: pset = (ipc_pset_t) object; ! 949: assert(pset != IPS_NULL); ! 950: ! 951: ips_lock(pset); ! 952: assert(ips_active(pset)); ! 953: assert(pset->ips_local_name == name); ! 954: is_read_unlock(space); ! 955: ! 956: mqueue = &pset->ips_messages; ! 957: } else { ! 958: is_read_unlock(space); ! 959: return MACH_RCV_INVALID_NAME; ! 960: } ! 961: ! 962: /* ! 963: * At this point, the object is locked and active, ! 964: * the space is unlocked, and mqueue is initialized. ! 965: */ ! 966: ! 967: io_reference(object); ! 968: io_unlock(object); ! 969: ! 970: *objectp = object; ! 971: *mqueuep = mqueue; ! 972: return MACH_MSG_SUCCESS; ! 973: } ! 974:
This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.