|
|
1.1 root 1: /*
2: * Copyright (c) 2000 Apple Computer, Inc. All rights reserved.
3: *
4: * @APPLE_LICENSE_HEADER_START@
5: *
6: * The contents of this file constitute Original Code as defined in and
7: * are subject to the Apple Public Source License Version 1.1 (the
8: * "License"). You may not use this file except in compliance with the
9: * License. Please obtain a copy of the License at
10: * http://www.apple.com/publicsource and read it before using this file.
11: *
12: * This Original Code and all software distributed under the License are
13: * distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, EITHER
14: * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
15: * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
16: * FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT. Please see the
17: * License for the specific language governing rights and limitations
18: * under the License.
19: *
20: * @APPLE_LICENSE_HEADER_END@
21: */
22: /*
23: * @OSF_FREE_COPYRIGHT@
24: */
25: /*
26: * Mach Operating System
27: * Copyright (c) 1991,1990,1989 Carnegie Mellon University
28: * All Rights Reserved.
29: *
30: * Permission to use, copy, modify and distribute this software and its
31: * documentation is hereby granted, provided that both the copyright
32: * notice and this permission notice appear in all copies of the
33: * software, derivative works or modified versions, and any portions
34: * thereof, and that both notices appear in supporting documentation.
35: *
36: * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
37: * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
38: * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
39: *
40: * Carnegie Mellon requests users of this software to return to
41: *
42: * Software Distribution Coordinator or [email protected]
43: * School of Computer Science
44: * Carnegie Mellon University
45: * Pittsburgh PA 15213-3890
46: *
47: * any improvements or extensions that they make and grant Carnegie Mellon
48: * the rights to redistribute these changes.
49: */
50: /*
51: */
52: /*
53: * File: ipc/ipc_mqueue.c
54: * Author: Rich Draves
55: * Date: 1989
56: *
57: * Functions to manipulate IPC message queues.
58: */
59:
60: #include <dipc.h>
61:
62: #include <mach/port.h>
63: #include <mach/message.h>
64: #include <mach/sync_policy.h>
65:
66: #include <kern/assert.h>
67: #include <kern/counters.h>
68: #include <kern/sched_prim.h>
69: #include <kern/ipc_kobject.h>
70: #include <kern/misc_protos.h>
71: #include <kern/task.h>
72: #include <kern/thread.h>
73: #include <kern/wait_queue.h>
74:
75: #include <ipc/ipc_mqueue.h>
76: #include <ipc/ipc_kmsg.h>
77: #include <ipc/ipc_port.h>
78: #include <ipc/ipc_pset.h>
79: #include <ipc/ipc_space.h>
80:
81: #include <ddb/tr.h>
82:
83: int ipc_mqueue_full; /* address is event for queue space */
84: int ipc_mqueue_rcv; /* address is event for message arrival */
85:
86: #define TR_ENABLE 0
87:
88: /*
89: * Routine: ipc_mqueue_init
90: * Purpose:
91: * Initialize a newly-allocated message queue.
92: */
93: void
94: ipc_mqueue_init(
95: ipc_mqueue_t mqueue,
96: boolean_t is_set)
97: {
98: if (is_set) {
99: wait_queue_sub_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO);
100: } else {
101: wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
102: ipc_kmsg_queue_init(&mqueue->imq_messages);
103: mqueue->imq_seqno = 0;
104: mqueue->imq_msgcount = 0;
105: mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
106: mqueue->imq_fullwaiters = FALSE;
107: }
108: }
109:
110: /*
111: * Routine: ipc_mqueue_member
112: * Purpose:
113: * Indicate whether the (port) mqueue is a member of
114: * this portset's mqueue. We do this by checking
115: * whether the portset mqueue's waitq is an member of
116: * the port's mqueue waitq.
117: * Conditions:
118: * the portset's mqueue is not already a member
119: * this may block while allocating linkage structures.
120: */
121:
122: boolean_t
123: ipc_mqueue_member(
124: ipc_mqueue_t port_mqueue,
125: ipc_mqueue_t set_mqueue)
126: {
127: wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
128: wait_queue_t set_waitq = &set_mqueue->imq_wait_queue;
129:
130: return (wait_queue_member(port_waitq, set_waitq));
131:
132: }
133:
134: /*
135: * Routine: ipc_mqueue_remove
136: * Purpose:
137: * Remove the association between the queue and the specified
138: * subordinate message queue.
139: */
140:
141: kern_return_t
142: ipc_mqueue_remove(
143: ipc_mqueue_t mqueue,
144: ipc_mqueue_t sub_mqueue)
145: {
146: wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
147: wait_queue_sub_t sub_waitq = &sub_mqueue->imq_set_queue;
148:
149: if (wait_queue_member(mq_waitq, sub_waitq)) {
150: wait_queue_unlink(mq_waitq, sub_waitq);
151: return KERN_SUCCESS;
152: }
153: return KERN_NOT_IN_SET;
154: }
155:
156: /*
157: * Routine: ipc_mqueue_remove_one
158: * Purpose:
159: * Find and remove one subqueue from the queue.
160: * Conditions:
161: * Will return the set mqueue that was removed
162: */
163: void
164: ipc_mqueue_remove_one(
165: ipc_mqueue_t mqueue,
166: ipc_mqueue_t *sub_queuep)
167: {
168: wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
169:
170: wait_queue_unlink_one(mq_waitq, (wait_queue_sub_t *)sub_queuep);
171: return;
172: }
173:
174:
175: /*
176: * Routine: ipc_mqueue_add
177: * Purpose:
178: * Associate the portset's mqueue with the port's mqueue.
179: * This has to be done so that posting the port will wakeup
180: * a portset waiter. If there are waiters on the portset
181: * mqueue and messages on the port mqueue, try to match them
182: * up now.
183: * Conditions:
184: * May block.
185: */
186: kern_return_t
187: ipc_mqueue_add(
188: ipc_mqueue_t port_mqueue,
189: ipc_mqueue_t set_mqueue)
190: {
191: wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
192: wait_queue_sub_t set_waitq = &set_mqueue->imq_set_queue;
193: ipc_kmsg_queue_t kmsgq;
194: ipc_kmsg_t kmsg, next;
195: kern_return_t kr;
196:
197: kr = wait_queue_link(port_waitq, set_waitq);
198: if (kr != KERN_SUCCESS)
199: return kr;
200:
201: /*
202: * Now that the set has been added to the port, there may be
203: * messages queued on the port and threads waiting on the set
204: * waitq. Lets get them together.
205: */
206: imq_lock(port_mqueue);
207: kmsgq = &port_mqueue->imq_messages;
208: for (kmsg = ipc_kmsg_queue_first(kmsgq);
209: kmsg != IKM_NULL;
210: kmsg = next) {
211: next = ipc_kmsg_queue_next(kmsgq, kmsg);
212:
213: for (;;) {
214: thread_t th;
215: spl_t s;
216:
217: s = splsched();
218: th = wait_queue_wakeup_identity_locked(port_waitq,
219: IPC_MQUEUE_RECEIVE,
220: THREAD_AWAKENED,
221: FALSE);
222: /* waitq/mqueue still locked, thread locked */
223:
224: if (th == THREAD_NULL) {
225: splx(s);
226: goto leave;
227: }
228:
229: /*
230: * Found a receiver. see if they can handle the message
231: * correctly (the message is not too large for them, or
232: * they didn't care to be informed that the message was
233: * too large). If they can't handle it, take them off
234: * the list and let them go back and figure it out and
235: * just move onto the next.
236: */
237: if (th->ith_msize <
238: kmsg->ikm_header.msgh_size +
239: REQUESTED_TRAILER_SIZE(th->ith_option)) {
240: th->ith_state = MACH_RCV_TOO_LARGE;
241: th->ith_msize = kmsg->ikm_header.msgh_size;
242: if (th->ith_option & MACH_RCV_LARGE) {
243: /*
244: * let him go without message
245: */
246: th->ith_kmsg = IKM_NULL;
247: th->ith_seqno = 0;
248: thread_unlock(th);
249: splx(s);
250: continue; /* find another thread */
251: }
252: } else {
253: th->ith_state = MACH_MSG_SUCCESS;
254: }
255:
256: /*
257: * This thread is going to take this message,
258: * so give it to him.
259: */
260: ipc_mqueue_release_msgcount(port_mqueue);
261: ipc_kmsg_rmqueue(kmsgq, kmsg);
262: th->ith_kmsg = kmsg;
263: th->ith_seqno = port_mqueue->imq_seqno++;
264: thread_unlock(th);
265: splx(s);
266: break; /* go to next message */
267: }
268:
269: }
270: leave:
271: imq_unlock(port_mqueue);
272: return KERN_SUCCESS;
273: }
274:
275: /*
276: * Routine: ipc_mqueue_changed
277: * Purpose:
278: * Wake up receivers waiting in a message queue.
279: * Conditions:
280: * The message queue is locked.
281: */
282:
283: void
284: ipc_mqueue_changed(
285: ipc_mqueue_t mqueue)
286: {
287: wait_queue_wakeup_all_locked(&mqueue->imq_wait_queue,
288: IPC_MQUEUE_RECEIVE,
289: THREAD_RESTART,
290: FALSE); /* unlock waitq? */
291: }
292:
293:
294:
295:
296: /*
297: * Routine: ipc_mqueue_send
298: * Purpose:
299: * Send a message to a message queue. The message holds a reference
300: * for the destination port for this message queue in the
301: * msgh_remote_port field.
302: *
303: * If unsuccessful, the caller still has possession of
304: * the message and must do something with it. If successful,
305: * the message is queued, given to a receiver, or destroyed.
306: * Conditions:
307: * Nothing locked.
308: * Returns:
309: * MACH_MSG_SUCCESS The message was accepted.
310: * MACH_SEND_TIMED_OUT Caller still has message.
311: * MACH_SEND_INTERRUPTED Caller still has message.
312: */
313: mach_msg_return_t
314: ipc_mqueue_send(
315: ipc_mqueue_t mqueue,
316: ipc_kmsg_t kmsg,
317: mach_msg_option_t option,
318: mach_msg_timeout_t timeout)
319: {
320: int save_wait_result;
321:
322: /*
323: * Don't block if:
324: * 1) We're under the queue limit.
325: * 2) Caller used the MACH_SEND_ALWAYS internal option.
326: * 3) Message is sent to a send-once right.
327: */
328: imq_lock(mqueue);
329:
330: if (!imq_full(mqueue) ||
331: (option & MACH_SEND_ALWAYS) ||
332: (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header.msgh_bits) ==
333: MACH_MSG_TYPE_PORT_SEND_ONCE)) {
334: mqueue->imq_msgcount++;
335: imq_unlock(mqueue);
336:
337: } else {
338:
339: /*
340: * We have to wait for space to be granted to us.
341: */
342: if ((option & MACH_SEND_TIMEOUT) && (timeout == 0)) {
343: imq_unlock(mqueue);
344: return MACH_SEND_TIMED_OUT;
345: }
346: mqueue->imq_fullwaiters = TRUE;
347: wait_queue_assert_wait_locked(&mqueue->imq_wait_queue,
348: IPC_MQUEUE_FULL,
349: THREAD_ABORTSAFE,
350: TRUE); /* unlock? */
351: /* wait/mqueue is unlocked */
352:
353: if (option & MACH_SEND_TIMEOUT)
354: thread_set_timer(timeout, 1000*NSEC_PER_USEC);
355:
356: counter(c_ipc_mqueue_send_block++);
357: save_wait_result = thread_block((void (*)(void)) 0);
358:
359: if (option & MACH_SEND_TIMEOUT)
360: thread_cancel_timer();
361:
362:
363: switch (save_wait_result) {
364: case THREAD_TIMED_OUT:
365: assert(option & MACH_SEND_TIMEOUT);
366: return MACH_SEND_TIMED_OUT;
367:
368: case THREAD_AWAKENED:
369: break; /* we can proceed - inherited msgcount from waker */
370:
371: case THREAD_INTERRUPTED:
372: return MACH_SEND_INTERRUPTED;
373:
374: case THREAD_RESTART:
375: default:
376: panic("ipc_mqueue_send");
377: }
378: }
379:
380: ipc_mqueue_post(mqueue, kmsg);
381: return MACH_MSG_SUCCESS;
382: }
383:
384: /*
385: * Routine: ipc_mqueue_release_msgcount
386: * Purpose:
387: * Release a message queue reference in the case where we
388: * found a waiter.
389: *
390: * Conditions:
391: * The message queue is locked
392: */
393: void
394: ipc_mqueue_release_msgcount(
395: ipc_mqueue_t mqueue)
396: {
397: assert(imq_held(mqueue));
398: assert(mqueue->imq_msgcount > 0);
399:
400: mqueue->imq_msgcount--;
401: if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
402: if (wait_queue_wakeup_one_locked(&mqueue->imq_wait_queue,
403: IPC_MQUEUE_FULL,
404: THREAD_AWAKENED,
405: FALSE) != KERN_SUCCESS) {
406: mqueue->imq_fullwaiters = FALSE;
407: } else {
408: mqueue->imq_msgcount++; /* gave it away */
409: }
410: }
411: }
412:
413: /*
414: * Routine: ipc_mqueue_post
415: * Purpose:
416: * Post a message to a waiting receiver or enqueue it. If a
417: * receiver is waiting, we can release our reserved space in
418: * the message queue.
419: *
420: * Conditions:
421: * If we need to queue, our space in the message queue is reserved.
422: */
423: void
424: ipc_mqueue_post(
425: register ipc_mqueue_t mqueue,
426: register ipc_kmsg_t kmsg)
427: {
428:
429: /*
430: * While the msg queue is locked, we have control of the
431: * kmsg, so the ref in it for the port is still good.
432: *
433: * Check for a receiver for the message.
434: */
435: imq_lock(mqueue);
436: for (;;) {
437: wait_queue_t waitq = &mqueue->imq_wait_queue;
438: thread_t receiver;
439: spl_t s;
440:
441: s = splsched();
442: receiver = wait_queue_wakeup_identity_locked(waitq,
443: IPC_MQUEUE_RECEIVE,
444: THREAD_AWAKENED,
445: FALSE);
446: /* waitq still locked, thread locked */
447:
448: if (receiver == THREAD_NULL) {
449: /*
450: * no receivers; queue kmsg
451: */
452: assert(mqueue->imq_msgcount > 0);
453: ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
454: splx(s);
455: break;
456: }
457:
458: /*
459: * We found a waiting thread.
460: * If the message is too large or the scatter list is too small
461: * the thread we wake up will get that as its status.
462: */
463: if (receiver->ith_msize <
464: (kmsg->ikm_header.msgh_size) +
465: REQUESTED_TRAILER_SIZE(receiver->ith_option)) {
466: receiver->ith_msize = kmsg->ikm_header.msgh_size;
467: receiver->ith_state = MACH_RCV_TOO_LARGE;
468: } else {
469: receiver->ith_state =
470: #ifdef DOITFORSCATTER
471: (receiver->ith_scatter_list != MACH_MSG_BODY_NULL) ?
472: ipc_kmsg_check_scatter(kmsg,
473: receiver->ith_option,
474: &receiver->ith_scatter_list,
475: &receiver->ith_scatter_list_size) :
476: #endif /* DOITFORSCATTER */
477: MACH_MSG_SUCCESS;
478: }
479:
480: /*
481: * If there is no problem with the upcoming receive, or the
482: * receiver thread didn't specifically ask for special too
483: * large error condition, go ahead and select it anyway.
484: */
485: if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
486: !(receiver->ith_option & MACH_RCV_LARGE)) {
487:
488: receiver->ith_kmsg = kmsg;
489: receiver->ith_seqno = mqueue->imq_seqno++;
490: thread_unlock(receiver);
491: splx(s);
492:
493: /* we didn't need our reserved spot in the queue */
494: ipc_mqueue_release_msgcount(mqueue);
495: break;
496: }
497:
498: /*
499: * Otherwise, this thread needs to be released to run
500: * and handle its error without getting the message. We
501: * need to go back and pick another one.
502: */
503: receiver->ith_kmsg = IKM_NULL;
504: receiver->ith_seqno = 0;
505: thread_unlock(receiver);
506: splx(s);
507: }
508:
509: imq_unlock(mqueue);
510: current_task()->messages_sent++;
511: return;
512: }
513:
514:
515: /*
516: * Routine: ipc_mqueue_receive
517: * Purpose:
518: * Receive a message from a message queue.
519: *
520: * If continuation is non-zero, then we might discard
521: * our kernel stack when we block. We will continue
522: * after unblocking by executing continuation.
523: *
524: * If resume is true, then we are resuming a receive
525: * operation after a blocked receive discarded our stack.
526: * Conditions:
527: * Our caller must hold a reference for the port or port set
528: * to which this queue belongs, to keep the queue
529: * from being deallocated.
530: *
531: * The kmsg is returned with clean header fields
532: * and with the circular bit turned off.
533: * Returns:
534: * MACH_MSG_SUCCESS Message returned in kmsgp.
535: * MACH_RCV_TOO_LARGE Message size returned in kmsgp.
536: * MACH_RCV_TIMED_OUT No message obtained.
537: * MACH_RCV_INTERRUPTED No message obtained.
538: * MACH_RCV_PORT_DIED Port/set died; no message.
539: * MACH_RCV_PORT_CHANGED Port moved into set; no msg.
540: *
541: */
542:
543: mach_msg_return_t
544: ipc_mqueue_receive(
545: ipc_mqueue_t mqueue,
546: mach_msg_option_t option,
547: mach_msg_size_t max_size,
548: mach_msg_timeout_t timeout,
549: int interruptible,
550: ipc_kmsg_t *kmsgp,
551: mach_port_seqno_t *seqnop)
552: {
553: ipc_port_t port;
554: mach_msg_return_t mr, mr2;
555: ipc_kmsg_queue_t kmsgs;
556: kern_return_t save_wait_result;
557: thread_t self;
558:
559: imq_lock(mqueue);
560:
561: if (imq_is_set(mqueue)) {
562: wait_queue_link_t wql;
563: ipc_mqueue_t port_mq;
564: queue_t q;
565:
566: q = &mqueue->imq_setlinks;
567:
568: /*
569: * If we are waiting on a portset mqueue, we need to see if
570: * any of the member ports have work for us. If so, try to
571: * deliver one of those messages. By holding the portset's
572: * mqueue lock during the search, we tie up any attempts by
573: * mqueue_deliver or portset membership changes that may
574: * cross our path. But this is a lock order violation, so we
575: * have to do it "softly." If we don't find a message waiting
576: * for us, we will assert our intention to wait while still
577: * holding that lock. When we release the lock, the deliver/
578: * change will succeed and find us.
579: */
580: search_set:
581: queue_iterate(q, wql, wait_queue_link_t, wql_sublinks) {
582: port_mq = (ipc_mqueue_t)wql->wql_queue;
583: kmsgs = &port_mq->imq_messages;
584:
585: if (!ipc_kmsg_queue_first(kmsgs)) /* peek first */
586: continue;
587:
588: if (!imq_lock_try(port_mq)) {
589: imq_unlock(mqueue);
590: imq_lock(mqueue);
591: goto search_set; /* start again at beginning - SMP */
592: }
593:
594: /*
595: * If there is still a message to be had, we will
596: * try to select it (may not succeed because of size
597: * and options). In any case, we deliver those
598: * results back to the user.
599: *
600: * We also move the port's linkage to the tail of the
601: * list for this set (fairness). Future versions will
602: * sort by timestamp or priority.
603: */
604: if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) {
605: imq_unlock(port_mq);
606: continue;
607: }
608: queue_remove(q, wql, wait_queue_link_t, wql_sublinks);
609: queue_enter(q, wql, wait_queue_link_t, wql_sublinks);
610: imq_unlock(mqueue);
611:
612: mr = ipc_mqueue_select(port_mq, option, max_size, kmsgp, seqnop);
613: imq_unlock(port_mq);
614: return mr;
615:
616: }
617:
618: } else {
619:
620: /*
621: * Receive on a single port. Just try to get the messages.
622: */
623: kmsgs = &mqueue->imq_messages;
624: if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
625: mr = ipc_mqueue_select(mqueue, option, max_size, kmsgp, seqnop);
626: imq_unlock(mqueue);
627: return mr;
628: }
629: }
630:
631: /*
632: * Looks like we'll have to block. The mqueue we will
633: * block on (whether the set's or the local port's) is
634: * still locked.
635: */
636: if (option & MACH_RCV_TIMEOUT) {
637: if (timeout == 0) {
638: imq_unlock(mqueue);
639: return MACH_RCV_TIMED_OUT;
640: }
641: }
642: self = current_thread();
643:
644: self->ith_state = MACH_RCV_IN_PROGRESS;
645: self->ith_option = option;
646: self->ith_msize = max_size;
647:
648: wait_queue_assert_wait_locked(&mqueue->imq_wait_queue,
649: IPC_MQUEUE_RECEIVE,
650: interruptible,
651: TRUE); /* unlock? */
652: /* mqueue/waitq is unlocked */
653:
654: if (option & MACH_RCV_TIMEOUT) {
655: thread_set_timer(timeout, 1000*NSEC_PER_USEC);
656: }
657:
658: if (interruptible == THREAD_ABORTSAFE) {
659: counter(c_ipc_mqueue_receive_block_user++);
660: } else {
661: counter(c_ipc_mqueue_receive_block_kernel++);
662: }
663:
664: save_wait_result = thread_block((void (*)(void))0);
665:
666: if (option & MACH_RCV_TIMEOUT)
667: thread_cancel_timer();
668:
669: /*
670: * why did we wake up?
671: */
672: switch (save_wait_result) {
673: case THREAD_TIMED_OUT:
674: return MACH_RCV_TIMED_OUT;
675:
676: case THREAD_INTERRUPTED:
677: return MACH_RCV_INTERRUPTED;
678:
679: case THREAD_RESTART:
680: /* something bad happened to the port/set */
681: return MACH_RCV_PORT_CHANGED;
682:
683: case THREAD_AWAKENED:
684: /*
685: * We do not need to go select a message, somebody
686: * handed us one (or a too-large indication).
687: */
688:
689: mr = MACH_MSG_SUCCESS;
690:
691: switch (self->ith_state) {
692: case MACH_RCV_SCATTER_SMALL:
693: case MACH_RCV_TOO_LARGE:
694: /*
695: * Somebody tried to give us a too large
696: * message. If we indicated that we cared,
697: * then they only gave us the indication,
698: * otherwise they gave us the indication
699: * AND the message anyway.
700: */
701: if (option & MACH_RCV_LARGE) {
702: if (self->ith_state == MACH_RCV_TOO_LARGE)
703: *kmsgp = (ipc_kmsg_t)self->ith_msize;
704: return(self->ith_state);
705: } else {
706: mr = self->ith_state;
707: /* fall thru to SUCCESS case */
708: }
709:
710: case MACH_MSG_SUCCESS:
711: /* pick up the message that was handed to us */
712: *kmsgp = self->ith_kmsg;
713: *seqnop = self->ith_seqno;
714: return mr;
715:
716: default:
717: panic("ipc_mqueue_receive: strange ith_state");
718: }
719:
720: default:
721: panic("ipc_mqueue_receive: strange wait_result");
722: }
723: }
724:
725:
726: /*
727: * Routine: ipc_mqueue_select
728: * Purpose:
729: * A receiver discovered that there was a message on the queue
730: * before he had to block. Pick the message off the queue and
731: * "post" it to himself.
732: * Conditions:
733: * mqueue locked.
734: * There is a message.
735: * Returns:
736: * MACH_MSG_SUCCESS Actually selected a message for ourselves.
737: * MACH_RCV_TOO_LARGE May or may not have pull it, but it is large
738: */
739: kern_return_t
740: ipc_mqueue_select(
741: ipc_mqueue_t mqueue,
742: mach_msg_option_t option,
743: mach_msg_size_t max_size,
744: ipc_kmsg_t *kmsgp,
745: mach_port_seqno_t *seqnop)
746: {
747: ipc_kmsg_t kmsg;
748: mach_port_seqno_t seqno;
749: mach_msg_return_t mr;
750:
751: mr = MACH_MSG_SUCCESS;
752:
753:
754: /*
755: * Do some sanity checking of our ability to receive
756: * before pulling the message off the queue.
757: */
758: kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
759:
760: assert(kmsg != IKM_NULL);
761:
762: if (kmsg->ikm_header.msgh_size +
763: REQUESTED_TRAILER_SIZE(option) > max_size) {
764: mr = MACH_RCV_TOO_LARGE;
765: }
766: #ifdef DOITFORSCATTER
767: else if (self->ith_scatter_list != MACH_MSG_BODY_NULL) {
768: mr = ipc_kmsg_check_scatter(
769: kmsg,
770: self->ith_option,
771: &self->ith_scatter_list,
772: &self->ith_scatter_list_size);
773: }
774: #endif /* DOITFORSCATTER */
775:
776: /*
777: * If we really can't receive it, but we had the
778: * MACH_RCV_LARGE option set, then don't take it off
779: * the queue, instead return the appropriate error
780: * (and size needed).
781: */
782: if (((mr == MACH_RCV_TOO_LARGE) ||
783: (mr == MACH_RCV_SCATTER_SMALL)) &&
784: (option & MACH_RCV_LARGE)) {
785: *seqnop = 0;
786: *kmsgp = (ipc_kmsg_t)kmsg->ikm_header.msgh_size;
787: return mr;
788: }
789:
790: ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
791: ipc_mqueue_release_msgcount(mqueue);
792: *seqnop = mqueue->imq_seqno++;
793: *kmsgp = kmsg;
794:
795: current_task()->messages_received++;
796: return mr;
797: }
798:
799: /*
800: * Routine: ipc_mqueue_destroy
801: * Purpose:
802: * Destroy a message queue. Set any blocked senders running.
803: * Destroy the kmsgs in the queue.
804: * Conditions:
805: * Nothing locked.
806: * Receivers were removed when the receive right was "changed"
807: */
808: void
809: ipc_mqueue_destroy(
810: ipc_mqueue_t mqueue)
811: {
812: ipc_kmsg_queue_t kmqueue;
813: ipc_kmsg_t kmsg;
814:
815: imq_lock(mqueue);
816: /*
817: * rouse all blocked senders
818: */
819: mqueue->imq_fullwaiters = FALSE;
820: wait_queue_wakeup_all_locked(&mqueue->imq_wait_queue,
821: IPC_MQUEUE_FULL,
822: THREAD_AWAKENED,
823: FALSE);
824:
825: kmqueue = &mqueue->imq_messages;
826:
827: while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) {
828: imq_unlock(mqueue);
829: ipc_kmsg_destroy_dest(kmsg);
830: imq_lock(mqueue);
831: }
832: imq_unlock(mqueue);
833: }
834:
835: /*
836: * Routine: ipc_mqueue_set_qlimit
837: * Purpose:
838: * Changes a message queue limit; the maximum number
839: * of messages which may be queued.
840: * Conditions:
841: * Nothing locked.
842: */
843:
844: void
845: ipc_mqueue_set_qlimit(
846: ipc_mqueue_t mqueue,
847: mach_port_msgcount_t qlimit)
848: {
849: /* wake up senders allowed by the new qlimit */
850: imq_lock(mqueue);
851: if (qlimit > mqueue->imq_qlimit) {
852: mach_port_msgcount_t i, wakeup;
853:
854: /* caution: wakeup, qlimit are unsigned */
855: wakeup = qlimit - mqueue->imq_qlimit;
856:
857: for (i = 0; i < wakeup; i++) {
858: if (wait_queue_wakeup_one_locked(&mqueue->imq_wait_queue,
859: IPC_MQUEUE_FULL,
860: THREAD_AWAKENED,
861: FALSE) == KERN_NOT_WAITING) {
862: mqueue->imq_fullwaiters = FALSE;
863: break;
864: }
865: }
866: }
867: mqueue->imq_qlimit = qlimit;
868: imq_unlock(mqueue);
869: }
870:
871: /*
872: * Routine: ipc_mqueue_set_seqno
873: * Purpose:
874: * Changes an mqueue's sequence number.
875: * Conditions:
876: * Caller holds a reference to the queue's containing object.
877: */
878: void
879: ipc_mqueue_set_seqno(
880: ipc_mqueue_t mqueue,
881: mach_port_seqno_t seqno)
882: {
883: imq_lock(mqueue);
884: mqueue->imq_seqno = seqno;
885: imq_unlock(mqueue);
886: }
887:
888:
889: /*
890: * Routine: ipc_mqueue_copyin
891: * Purpose:
892: * Convert a name in a space to a message queue.
893: * Conditions:
894: * Nothing locked. If successful, the caller gets a ref for
895: * for the object. This ref ensures the continued existence of
896: * the queue.
897: * Returns:
898: * MACH_MSG_SUCCESS Found a message queue.
899: * MACH_RCV_INVALID_NAME The space is dead.
900: * MACH_RCV_INVALID_NAME The name doesn't denote a right.
901: * MACH_RCV_INVALID_NAME
902: * The denoted right is not receive or port set.
903: * MACH_RCV_IN_SET Receive right is a member of a set.
904: */
905:
906: mach_msg_return_t
907: ipc_mqueue_copyin(
908: ipc_space_t space,
909: mach_port_name_t name,
910: ipc_mqueue_t *mqueuep,
911: ipc_object_t *objectp)
912: {
913: ipc_entry_t entry;
914: ipc_object_t object;
915: ipc_mqueue_t mqueue;
916:
917: is_read_lock(space);
918: if (!space->is_active) {
919: is_read_unlock(space);
920: return MACH_RCV_INVALID_NAME;
921: }
922:
923: entry = ipc_entry_lookup(space, name);
924: if (entry == IE_NULL) {
925: is_read_unlock(space);
926: return MACH_RCV_INVALID_NAME;
927: }
928:
929: object = entry->ie_object;
930:
931: if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
932: ipc_port_t port;
933: ipc_pset_t pset;
934:
935: port = (ipc_port_t) object;
936: assert(port != IP_NULL);
937:
938: ip_lock(port);
939: assert(ip_active(port));
940: assert(port->ip_receiver_name == name);
941: assert(port->ip_receiver == space);
942: is_read_unlock(space);
943: mqueue = &port->ip_messages;
944:
945: } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
946: ipc_pset_t pset;
947:
948: pset = (ipc_pset_t) object;
949: assert(pset != IPS_NULL);
950:
951: ips_lock(pset);
952: assert(ips_active(pset));
953: assert(pset->ips_local_name == name);
954: is_read_unlock(space);
955:
956: mqueue = &pset->ips_messages;
957: } else {
958: is_read_unlock(space);
959: return MACH_RCV_INVALID_NAME;
960: }
961:
962: /*
963: * At this point, the object is locked and active,
964: * the space is unlocked, and mqueue is initialized.
965: */
966:
967: io_reference(object);
968: io_unlock(object);
969:
970: *objectp = object;
971: *mqueuep = mqueue;
972: return MACH_MSG_SUCCESS;
973: }
974:
This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.