|
|
1.1 root 1: /*
2: * Copyright (c) 1998-2000 Apple Computer, Inc. All rights reserved.
3: *
4: * @APPLE_LICENSE_HEADER_START@
5: *
6: * The contents of this file constitute Original Code as defined in and
7: * are subject to the Apple Public Source License Version 1.1 (the
8: * "License"). You may not use this file except in compliance with the
9: * License. Please obtain a copy of the License at
10: * http://www.apple.com/publicsource and read it before using this file.
11: *
12: * This Original Code and all software distributed under the License are
13: * distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, EITHER
14: * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
15: * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
16: * FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT. Please see the
17: * License for the specific language governing rights and limitations
18: * under the License.
19: *
20: * @APPLE_LICENSE_HEADER_END@
21: */
22: /*
23: * Copyright (c) 1998 Apple Computer, Inc. All rights reserved.
24: *
25: * IOOutputQueue.cpp
26: *
27: * HISTORY
28: * 2-Feb-1999 Joe Liu (jliu) created.
29: *
30: */
31:
32: #include <IOKit/assert.h>
33: #include <IOKit/IOWorkLoop.h>
34: #include <IOKit/network/IOPacketQueue.h>
35: #include <IOKit/network/IOOutputQueue.h>
36: #include <IOKit/network/IOOQLockFIFOQueue.h>
37: #include <IOKit/network/IOOQGateFIFOQueue.h>
38: #include <IOKit/network/IONetworkStats.h>
39: #include <IOKit/network/IONetworkController.h>
40:
41: //===========================================================================
42: // IOOutputQueue class.
43: //
44: // An object that queues output packets on behalf of its target, usually an
45: // IONetworkController instance, and allows the target to control the packet
46: // flow. IOOutputQueue is is an abstract class that provides an interface for
47: // its subclasses. Concrete subclasses will complete the implementation while
48: // providing unique synchronization options.
49: //===========================================================================
50:
51: #warning need real atomic operators
52: #define ATOMIC_SET(x, v) ((x) = (v))
53: #define ATOMIC_ADD(x, y) ((x) += (y))
54:
55: #undef super
56: #define super OSObject
57: OSDefineMetaClass( IOOutputQueue, OSObject )
58: OSDefineAbstractStructors( IOOutputQueue, OSObject )
59:
60: #define kIOOQParamMagic ((void *) 0xfeedbeef)
61:
62: //---------------------------------------------------------------------------
63: // Initialize an IOOutputQueue instance.
64: //
65: // target: The object that shall receive packets from the queue,
66: // and is usually a subclass of IONetworkController. If the target is
67: // not an IONetworkController instance, then the target must immediately
68: // call registerOutputHandler() after initializing the queue.
69: //
70: // capacity: The initial capacity of the output queue, defined as
71: // the number of packets that the queue can hold without dropping.
72: //
73: // Returns true if initialized successfully, false otherwise.
74:
75: bool IOOutputQueue::init(OSObject * target,
76: UInt32 capacity)
77: {
78: #if 0
79: _target = 0;
80: _outAction = 0;
81: _stats = 0;
82: _param = 0;
83: _callEntry = 0;
84: _queue = 0;
85: #endif
86:
87: if (!super::init())
88: return false;
89:
90: if (OSDynamicCast(IONetworkController, target))
91: {
92: // If the target is a type of IONetworkController, then call
93: // its getOutputHandler() method to obtain a pointer to its
94: // output method.
95:
96: if (registerOutputHandler(target,
97: ((IONetworkController *) target)->getOutputHandler()) == false)
98: return false;
99: }
100:
101: // Allocate an IOPacketQueue (or a subclass).
102: //
103: _queue = createPacketQueue(capacity);
104: if (!_queue)
105: return false;
106:
107: // Allocate and initialize the callout entry.
108: //
109: _callEntry = thread_call_allocate((thread_call_func_t)&runServiceThread,
110: (void *)this);
111: if (!_callEntry)
112: return false;
113:
114: // Create a data object for queue statistics.
115: //
116: _statsData = IONetworkData::withName(
117: kIOOutputQueueStatsKey,
118: sizeof(IOOutputQueueStats),
119: 0,
120: kIONDBasicAccessTypes | kIONDAccessTypeReset,
121: this,
122: (IONetworkData::Action)
123: &IOOutputQueue::dataReadAndResetHandler,
124: kIOOQParamMagic);
125: if (!_statsData)
126: return false;
127:
128: _stats = (IOOutputQueueStats *) _statsData->getBuffer();
129:
130: return true;
131: }
132:
133: //---------------------------------------------------------------------------
134: // Frees the IOOutputQueue instance.
135:
136: void IOOutputQueue::free()
137: {
138: if (_statsData)
139: _statsData->release();
140:
141: if (_callEntry)
142: {
143: cancelServiceThread();
144: thread_call_free(_callEntry);
145: }
146:
147: if (_queue)
148: _queue->release();
149:
150: super::free();
151: }
152:
153: //---------------------------------------------------------------------------
154: // This method is called by our IONetworkData object when it receives
155: // a read or a reset request. We need to be notified in order to intervene
156: // in the request handling.
157: //
158: // data: The IONetworkData object.
159: // opFlag: The operation requested.
160: // arg: Argument for the request.
161:
162: IOReturn IOOutputQueue::dataReadAndResetHandler(IONetworkData * data,
163: UInt32 type,
164: void * arg)
165: {
166: IOReturn ret = kIOReturnSuccess;
167:
168: assert(data && (arg == kIOOQParamMagic));
169:
170: // Check the type of data request.
171: //
172: switch (type)
173: {
174: case kIONDAccessTypeRead:
175: case kIONDAccessTypeSerialize:
176: _stats->capacity = getCapacity();
177: _stats->size = getSize();
178: _stats->peakSize = getPeakSize();
179: break;
180:
181: case kIONDAccessTypeReset:
182: getPeakSize(true);
183: getDropCount(true);
184: getOutputCount(true);
185: getRetryCount(true);
186: getStallCount(true);
187: break;
188:
189: default:
190: ret = kIOReturnNotWritable;
191: break;
192: }
193:
194: return ret;
195: }
196:
197: //---------------------------------------------------------------------------
198: // Returns the current state of the queue object.
199:
200: IOOQState IOOutputQueue::getState() const
201: {
202: return _state;
203: }
204:
205: //---------------------------------------------------------------------------
206: // Schedule a service thread callout, which will then execute the
207: // serviceThread() method. Subclasses should not override this method.
208:
209: bool IOOutputQueue::scheduleServiceThread()
210: {
211: thread_call_enter(_callEntry);
212: return true;
213: }
214:
215: //---------------------------------------------------------------------------
216: // Cancel the service thread callout. Subclasses should not override this
217: // method.
218:
219: bool IOOutputQueue::cancelServiceThread()
220: {
221: return thread_call_cancel(_callEntry);
222: }
223:
224: //---------------------------------------------------------------------------
225: // A glue function that is registered as the service thread callout handler.
226: // This function in turn will call the serviceThread() method.
227:
228: void IOOutputQueue::runServiceThread(IOOutputQueue * self, thread_call_param_t)
229: {
230: assert(self);
231: self->serviceThread();
232: }
233:
234: //---------------------------------------------------------------------------
235: // Must be implemented by subclasses that calls scheduleServiceThread().
236: // The default implementation is a placeholder and performs no useful action.
237:
238: void IOOutputQueue::serviceThread()
239: {
240: }
241:
242: //---------------------------------------------------------------------------
243: // Release all packets held in the queue. The size of the queue is reset
244: // to zero. The drop packet counter is incremented by the number of packets
245: // freed. See getDropCount().
246: //
247: // Returns the number of packets freed.
248:
249: UInt32 IOOutputQueue::flush()
250: {
251: UInt32 flushCount = _queue->lockFlush();
252: ATOMIC_ADD(_stats->dropCount, flushCount);
253: return flushCount;
254: }
255:
256: //---------------------------------------------------------------------------
257: // capacity: The new capacity of the queue.
258: //
259: // Returns true if the new capacity was accepted, false otherwise.
260:
261: bool IOOutputQueue::setCapacity(UInt32 capacity)
262: {
263: return _queue->setCapacity(capacity);
264: }
265:
266: //---------------------------------------------------------------------------
267: // Returns the current queue capacity.
268:
269: UInt32 IOOutputQueue::getCapacity() const
270: {
271: return _queue->getCapacity();
272: }
273:
274: //---------------------------------------------------------------------------
275: // Returns the current queue size.
276:
277: UInt32 IOOutputQueue::getSize() const
278: {
279: return _queue->getSize();
280: }
281:
282: //---------------------------------------------------------------------------
283: // reset: Resets the counter if true.
284: //
285: // Return the peak queue size.
286:
287: UInt32 IOOutputQueue::getPeakSize(bool reset = false)
288: {
289: return _queue->getPeakSize(reset);
290: }
291:
292: //---------------------------------------------------------------------------
293: // This method returns the number of times that a kIOOQReturnStatusDropped
294: // status code is received from the target's output handler, indicating
295: // that the packet given was dropped. This count is also incremented when
296: // the queue drops a packet due to overcapacity, or by an explicit flush()
297: // call.
298: //
299: // reset: Resets the counter if true.
300: //
301: // Returns the number of dropped packets.
302: //
303: // FIXME - need atomic exchange
304:
305: UInt32 IOOutputQueue::getDropCount(bool reset = false)
306: {
307: UInt32 count = _stats->dropCount;
308: if (reset)
309: _stats->dropCount = 0;
310: return count;
311: }
312:
313: //---------------------------------------------------------------------------
314: // This method returns the number of times that a kIOOQReturnStatusSuccess
315: // status code is received from the target's output handler, indicating
316: // that the packet given was accepted, and is ready to be (or already has been)
317: // transmitted.
318: //
319: // reset: Resets the counter if true.
320: //
321: // Returns the number of packets accepted by our target.
322: //
323: // FIXME - need atomic exchange
324:
325: UInt32 IOOutputQueue::getOutputCount(bool reset = false)
326: {
327: UInt32 count = _stats->outputCount;
328: if (reset)
329: _stats->outputCount = 0;
330: return count;
331: }
332:
333: //---------------------------------------------------------------------------
334: // This method returns the number of times that a kIOOQReturnStatusRetry
335: // status code is received from the target's output handler, indicating
336: // that the target is temporarily unable to handle the packet given, and
337: // the queue should try to resend the same packet at some later time.
338: //
339: // reset: Resets the counter if true.
340: //
341: // Returns the number of retries issued by the target.
342: //
343: // FIXME - need atomic exchange
344:
345: UInt32 IOOutputQueue::getRetryCount(bool reset = false)
346: {
347: UInt32 count = _stats->retryCount;
348: if (reset)
349: _stats->retryCount = 0;
350: return count;
351: }
352:
353: //---------------------------------------------------------------------------
354: // Each time the queue is stalled, when a kIOOQReturnActionStall action code
355: // is received from the target's output handler, a counter is incremented.
356: // This method returns the value stored in this counter.
357: //
358: // reset: Resets the counter if true.
359: //
360: // Returns the number of times that the queue was stalled.
361: //
362: // FIXME - need atomic exchange
363:
364: UInt32 IOOutputQueue::getStallCount(bool reset = false)
365: {
366: UInt32 count = _stats->stallCount;
367: if (reset)
368: _stats->stallCount = 0;
369: return count;
370: }
371:
372: //---------------------------------------------------------------------------
373: // Allows subclasses to override the default action, which is to allocate
374: // and return an IOPacketQueue instance. The returned object is used to
375: // implement the queueing behavior of the IOOutputQueue.
376: //
377: // capacity: The initial capacity of the queue.
378: //
379: // Returns a newly allocated and initialized IOPacketQueue instance.
380:
381: IOPacketQueue * IOOutputQueue::createPacketQueue(UInt32 capacity) const
382: {
383: return IOPacketQueue::withCapacity(capacity);
384: }
385:
386: //---------------------------------------------------------------------------
387: // Return an address of a method that is designated to handle
388: // packets sent to the queue object.
389: //
390: // Returns the address of the output packet handler.
391:
392: IOOutputAction IOOutputQueue::getOutputHandler() const
393: {
394: return (IOOutputAction) &IOOutputQueue::enqueue;
395: }
396:
397: //---------------------------------------------------------------------------
398: // Register the target object and method to call to handle packets
399: // removed from the queue.
400: //
401: // handler: Pointer to an initialized IOOutputHandler structure passed in
402: // by the caller.
403: //
404: // Returns true if the structure given was accepted, otherwise return false.
405:
406: bool IOOutputQueue::registerOutputHandler(OSObject * target,
407: IOOutputAction action)
408: {
409: if (!target || !action)
410: return false;
411:
412: // Cache the structure fields to instance variables.
413: //
414: _target = target;
415: _outAction = action;
416:
417: return true;
418: }
419:
420: //---------------------------------------------------------------------------
421: // Return an IONetworkData object containing an IOOutputQueueStats structure.
422:
423: IONetworkData * IOOutputQueue::getStatisticsData() const
424: {
425: return _statsData;
426: }
427:
428: //===========================================================================
429: // IOOQLockFIFOQueue
430: //===========================================================================
431:
432: #undef super
433: #define super IOOutputQueue
434: OSDefineMetaClassAndStructors( IOOQLockFIFOQueue, IOOutputQueue )
435:
436: #define LOCK IOTakeLock(_mutex)
437: #define UNLOCK IOUnlock(_mutex)
438:
439: //---------------------------------------------------------------------------
440: // Initialize an IOOQLockFIFOQueue instance.
441: //
442: // target: The object that shall receive packets from the queue,
443: // and is usually a subclass of IONetworkController. If the target is
444: // not an IONetworkController instance, then the target must immediately
445: // call registerOutputHandler() after initializing the queue.
446: //
447: // capacity: The initial capacity of the output queue, defined as
448: // the number of packets that the queue can hold without dropping.
449: //
450: // Returns true if initialized successfully, false otherwise.
451:
452: bool IOOQLockFIFOQueue::init(OSObject * target, UInt32 capacity = 0)
453: {
454: if (!super::init(target, capacity))
455: return false;
456:
457: // Allocate the mutex lock.
458: //
459: _mutex = IOLockAlloc();
460: if (!_mutex)
461: return false;
462:
463: ATOMIC_SET(_state, kIOOQStateStopped);
464:
465: return true;
466: }
467:
468: //---------------------------------------------------------------------------
469: // Factory method that will construct and initialize an IOOQLockFIFOQueue
470: // instance.
471:
472: IOOQLockFIFOQueue * IOOQLockFIFOQueue::withTarget(OSObject * target,
473: UInt32 capacity = 0)
474: {
475: IOOQLockFIFOQueue * queue = new IOOQLockFIFOQueue;
476:
477: if (queue && !queue->init(target, capacity))
478: {
479: queue->release();
480: queue = 0;
481: }
482: return queue;
483: }
484:
485: //---------------------------------------------------------------------------
486: // Frees the IOOQLockFIFOQueue instance.
487:
488: void IOOQLockFIFOQueue::free()
489: {
490: cancelServiceThread();
491:
492: if (_mutex) IOLockFree(_mutex);
493:
494: super::free();
495: }
496:
497: //---------------------------------------------------------------------------
498: // Provide an implementation for the interface defined in IOOutputQueue.
499: // This method is called by a callout thread when service() is performed
500: // asynchronously.
501:
502: void IOOQLockFIFOQueue::serviceThread()
503: {
504: // Bump the enqueue count, and service the queue if 'serviceActive'
505: // is false.
506:
507: ATOMIC_ADD(_enqueueCount, 1);
508:
509: if ((_state == kIOOQStateRunning) && !_serviceActive)
510: {
511: dequeue();
512: }
513: }
514:
515: //---------------------------------------------------------------------------
516: // Handles packet (or a packet chain) sent to the queue. This method can
517: // handle calls from multiple simultaneous client threads. A client thread
518: // that calls enqueue() will acquire a mutex lock and call dequeue() if it
519: // detects that no other thread is actively dequeueing packets from the queue.
520: // The dequeue() method will return when the queue becomes empty, or if the
521: // target stalls the queue. This method may block its caller.
522: //
523: // m: The packet (or a packet chain) to be queued for transmission.
524: //
525: // Returns: The number of dropped packets.
526:
527: UInt32 IOOQLockFIFOQueue::enqueue(struct mbuf * m)
528: {
529: UInt32 dropped = _queue->lockEnqueue(m);
530:
531: // Increment the dropped packet counter.
532: if (dropped)
533: ATOMIC_ADD(_stats->dropCount, dropped);
534:
535: // Bump the enqueue count, and service the queue if 'serviceActive'
536: // is false.
537:
538: ATOMIC_ADD(_enqueueCount, 1);
539:
540: if ((_state == kIOOQStateRunning) && !_serviceActive)
541: {
542: dequeue();
543: }
544:
545: return dropped;
546: }
547:
548: //---------------------------------------------------------------------------
549: // Responsible for removing all packets from the queue and calling
550: // the target's output handler. This method returns when the queue becomes
551: // empty or if the queue's state is no longer kIOOQStateRunning. The target's
552: // output handler is called for every packet removed from the queue. Only a
553: // single packet is sent to the target for every call, the packets are never
554: // chained. A mutex lock enforces single threaded access to the target's
555: // output handler.
556:
557: void IOOQLockFIFOQueue::dequeue()
558: {
559: struct mbuf * m;
560: UInt32 r;
561: UInt32 dequeueCount;
562:
563: LOCK;
564:
565: do {
566: // Take a snapshot of the current enqueueCount.
567: //
568: dequeueCount = _enqueueCount;
569:
570: _serviceActive = true; // -- mark dequeuing active --
571:
572: while ((_state == kIOOQStateRunning) &&
573: _queue->getSize() && (m = _queue->lockDequeue()))
574: {
575: ATOMIC_SET(_state, kIOOQStateStalled);
576:
577: // Call the target's output handler.
578: //
579: r = (_target->*_outAction)(m);
580:
581: // Decide the fate of the dequeued packet and
582: // update statistics counters.
583: //
584: switch (r & kIOOQReturnStatusMask)
585: {
586: default:
587: case kIOOQReturnStatusDropped:
588: ATOMIC_ADD(_stats->dropCount, 1);
589: break;
590:
591: case kIOOQReturnStatusSuccess:
592: ATOMIC_ADD(_stats->outputCount, 1);
593: break;
594:
595: case kIOOQReturnStatusRetry:
596: _queue->lockPrepend(m);
597: ATOMIC_ADD(_stats->retryCount, 1);
598: break;
599: }
600:
601: // Handle the requested action.
602: //
603: switch (r & kIOOQReturnActionMask)
604: {
605: default:
606: case kIOOQReturnActionNone:
607: ATOMIC_SET(_state, kIOOQStateRunning);
608: break;
609:
610: case kIOOQReturnActionStall:
611: ATOMIC_ADD(_stats->stallCount, 1);
612: break;
613: }
614:
615: // Consume all enqueueCounts before looping.
616: //
617: dequeueCount = _enqueueCount;
618:
619: } /* while [ running and queue has packets ] */
620:
621: _serviceActive = false; // -- mark dequeuing inactive --
622:
623: } while (dequeueCount != _enqueueCount);
624:
625: UNLOCK;
626: }
627:
628: //---------------------------------------------------------------------------
629: // This method is called by the target to start the queue. Once started,
630: // the queue will be allowed to call the target's output handler.
631: // Before that, with the queue stopped, the queue will absorb incoming
632: // packets sent to the enqueue() method, but no packets will be dequeued,
633: // and the target's output handler will not be called.
634: //
635: // Returns true if the queue was successfully started, false otherwise.
636:
637: bool IOOQLockFIFOQueue::start()
638: {
639: service(true);
640: return true; // always return success
641: }
642:
643: //---------------------------------------------------------------------------
644: // Stops the queue to prevent it from calling the target's output handler.
645: // This call is synchronous the caller may block. The target's output handler
646: // must never call this method, or any other queue methods.
647:
648: void IOOQLockFIFOQueue::stop()
649: {
650: LOCK;
651: ATOMIC_SET(_state, kIOOQStateStopped);
652: UNLOCK;
653: }
654:
655: //---------------------------------------------------------------------------
656: // If the queue becomes stalled, then service() must be called to restart
657: // the queue when the target is ready to accept more packets. If the queue
658: // is not empty, this method will also call dequeue(). Note that if the
659: // target never sends a kIOOQReturnActionStall action code to the queue,
660: // then the queue will never stall on its own accord. Calling this method
661: // on a running queue that is not stalled is harmless.
662: //
663: // sync: True if the service action should be performed synchronously,
664: // false to perform the action asynchronously without blocking the caller,
665: // but with a much higher latency cost.
666: //
667: // Returns true if the queue needed servicing, false otherwise.
668:
669: bool IOOQLockFIFOQueue::service(bool sync = true)
670: {
671: bool started = false;
672:
673: ATOMIC_SET(_state, kIOOQStateRunning);
674:
675: if (_queue->getSize())
676: {
677: ATOMIC_ADD(_enqueueCount, 1);
678:
679: if (!_serviceActive)
680: {
681: if (sync)
682: dequeue();
683: else
684: scheduleServiceThread();
685:
686: started = true;
687: }
688: }
689:
690: return started;
691: }
692:
693: //===========================================================================
694: // IOOQGateFIFOQueue
695: //===========================================================================
696:
697: #undef super
698: #define super IOOutputQueue
699: OSDefineMetaClassAndStructors( IOOQGateFIFOQueue, IOOutputQueue )
700:
701: #define GATED_DEQUEUE _gate->runCommand()
702:
703: //---------------------------------------------------------------------------
704: // Initialize an IOOQGateFIFOQueue instance.
705: //
706: // target: The object that shall receive packets from the queue,
707: // and is usually a subclass of IONetworkController. If the target is
708: // not an IONetworkController instance, then the target must immediately
709: // call registerOutputHandler() after initializing the queue.
710: //
711: // workloop: A workloop object provided by the target that the
712: // queue will use to add an internal IOCommandGate as an event source.
713: //
714: // capacity: The initial capacity of the output queue, defined as
715: // the number of packets that the queue can hold without dropping.
716: //
717: // Returns true if initialized successfully, false otherwise.
718:
719: bool IOOQGateFIFOQueue::init(OSObject * target,
720: IOWorkLoop * workloop,
721: UInt32 capacity = 0)
722: {
723: if (!super::init(target, capacity))
724: return false;
725:
726: // Verify that the IOWorkLoop is valid.
727: //
728: if (!OSDynamicCast(IOWorkLoop, workloop))
729: return false;
730:
731: // Allocate and attach an IOCommandGate instance to the workloop.
732: // Set the default action to gatedDequeue().
733: //
734: _gate = IOCommandGate::commandGate(this,
735: (IOCommandGate::Action) &IOOQGateFIFOQueue:: gatedDequeue);
736: if (!_gate || (workloop->addEventSource(_gate) != kIOReturnSuccess))
737: return false;
738:
739: ATOMIC_SET(_state, kIOOQStateStopped);
740:
741: return true;
742: }
743:
744: //---------------------------------------------------------------------------
745: // Factory method that will construct and initialize an IOOQGateFIFOQueue
746: // instance.
747: //
748: // Returns an IOOQGateFIFOQueue instance upon success, or 0 otherwise.
749:
750: IOOQGateFIFOQueue *
751: IOOQGateFIFOQueue::withTarget(OSObject * target,
752: IOWorkLoop * workloop,
753: UInt32 capacity = 0)
754: {
755: IOOQGateFIFOQueue * queue = new IOOQGateFIFOQueue;
756:
757: if (queue && !queue->init(target, workloop, capacity))
758: {
759: queue->release();
760: queue = 0;
761: }
762: return queue;
763: }
764:
765: //---------------------------------------------------------------------------
766: // Frees the IOOQGateFIFOQueue instance.
767:
768: void IOOQGateFIFOQueue::free()
769: {
770: cancelServiceThread();
771:
772: if (_gate)
773: _gate->release();
774:
775: super::free();
776: }
777:
778: //---------------------------------------------------------------------------
779: // Provide an implementation for the interface defined in IOOutputQueue.
780: // This method is called by a callout thread when service() is performed
781: // asynchronously.
782:
783: void IOOQGateFIFOQueue::serviceThread()
784: {
785: // Bump the enqueue count, and service the queue if 'serviceActive'
786: // is false.
787:
788: ATOMIC_ADD(_enqueueCount, 1);
789:
790: if ((_state == kIOOQStateRunning) && !_serviceActive)
791: {
792: GATED_DEQUEUE;
793: }
794: }
795:
796: //---------------------------------------------------------------------------
797: // Handles packet (or a packet chain) sent to the queue. This method can
798: // handle calls from multiple simultaneous client threads. A client thread
799: // that calls enqueue() will acquire a mutex lock and call dequeue() if it
800: // detects that no other thread is actively dequeueing packets from the queue.
801: // The dequeue() method will return when the queue becomes empty, or if the
802: // target stalls the queue. This method may block its caller.
803: //
804: // m: The packet (or a packet chain) to be queued for transmission.
805:
806: UInt32 IOOQGateFIFOQueue::enqueue(struct mbuf * m)
807: {
808: UInt32 dropped = _queue->lockEnqueue(m);
809:
810: // Increment the dropped packet counter.
811: if (dropped)
812: ATOMIC_ADD(_stats->dropCount, dropped);
813:
814: // Bump the enqueue count, and service the queue if 'serviceActive'
815: // is false.
816:
817: ATOMIC_ADD(_enqueueCount, 1);
818:
819: if ((_state == kIOOQStateRunning) && !_serviceActive)
820: {
821: GATED_DEQUEUE;
822: }
823:
824: return dropped;
825: }
826:
827: //---------------------------------------------------------------------------
828: // Responsible for removing all packets from the queue and calling
829: // the target's output handler. This method returns when the queue becomes
830: // empty or if the queue's state is no longer kIOOQStateRunning. The target's
831: // output handler is called for every packet removed from the queue. Only a
832: // single packet is sent to the target for every call, the packets are never
833: // chained. An IOCommandGate attached to an workloop provided by the target
834: // ensures mutual exclusion between the dequeueing action (and calls to the
835: // target's output handler), and any other action performed by the workloop's
836: // thread.
837:
838: void IOOQGateFIFOQueue::dequeue()
839: {
840: GATED_DEQUEUE;
841: }
842:
843: void IOOQGateFIFOQueue::gatedDequeue()
844: {
845: struct mbuf * m;
846: UInt32 r;
847: UInt32 dequeueCount;
848:
849: do {
850: // Take a snapshot of the current enqueueCount.
851: //
852: dequeueCount = _enqueueCount;
853:
854: _serviceActive = true; // -- mark dequeuing active --
855:
856: while ((_state == kIOOQStateRunning) &&
857: _queue->getSize() && (m = _queue->lockDequeue()))
858: {
859: ATOMIC_SET(_state, kIOOQStateStalled);
860:
861: // Call the controller's output handler.
862: //
863: r = (_target->*_outAction)(m);
864:
865: // Decide what should happen to the dequeued packet and
866: // update statistics counters.
867: //
868: switch (r & kIOOQReturnStatusMask)
869: {
870: default:
871: case kIOOQReturnStatusDropped:
872: ATOMIC_ADD(_stats->dropCount, 1);
873: break;
874:
875: case kIOOQReturnStatusSuccess:
876: ATOMIC_ADD(_stats->outputCount, 1);
877: break;
878:
879: case kIOOQReturnStatusRetry:
880: _queue->lockPrepend(m);
881: ATOMIC_ADD(_stats->retryCount, 1);
882: break;
883: }
884:
885: // Handle the requested action.
886: //
887: switch (r & kIOOQReturnActionMask)
888: {
889: default:
890: case kIOOQReturnActionNone:
891: ATOMIC_SET(_state, kIOOQStateRunning);
892: break;
893:
894: case kIOOQReturnActionStall:
895: ATOMIC_ADD(_stats->stallCount, 1);
896: break;
897: }
898:
899: // Consume all enqueueCounts before looping.
900: //
901: dequeueCount = _enqueueCount;
902:
903: } /* while [ running and queue has packets ] */
904:
905: _serviceActive = false; // -- mark dequeuing inactive --
906:
907: } while (dequeueCount != _enqueueCount);
908: }
909:
910: //---------------------------------------------------------------------------
911: // This method is called by the target to start the queue. Once started,
912: // the queue will be allowed to call the target's output handler.
913: // Before that, with the queue stopped, the queue will absorb incoming
914: // packets sent to the enqueue() method, but no packets will be dequeued,
915: // and the target's output handler will not be called.
916: //
917: // Returns true if the queue was successfully started, false otherwise.
918:
919: bool IOOQGateFIFOQueue::start()
920: {
921: service(true);
922: return true; // always return success
923: }
924:
925: //---------------------------------------------------------------------------
926: // Stops the queue to prevent it from calling the target's output handler.
927: // This call is synchronous the caller may block. The target's output handler
928: // must never call this method, or any other queue methods.
929:
930: void IOOQGateFIFOQueue::stop()
931: {
932: _gate->runAction( (IOCommandGate::Action) &IOOQGateFIFOQueue::gatedStop );
933: }
934:
935: void IOOQGateFIFOQueue::gatedStop()
936: {
937: ATOMIC_SET(_state, kIOOQStateStopped);
938: }
939:
940: //---------------------------------------------------------------------------
941: // If the queue becomes stalled, then service() must be called to restart
942: // the queue when the target is ready to accept more packets. If the queue
943: // is not empty, this method will also call dequeue(). Note that if the
944: // target never sends a kIOOQReturnActionStall action code to the queue,
945: // then the queue will never stall on its own accord. Calling this method
946: // on a running queue that is not stalled is harmless.
947: //
948: // sync: True if the service action should be performed synchronously,
949: // false to perform the action asynchronously without blocking the caller,
950: // but with a much higher latency cost.
951: //
952: // Returns true if the queue needed servicing, false otherwise.
953:
954: bool IOOQGateFIFOQueue::service(bool sync = true)
955: {
956: bool started = false;
957:
958: ATOMIC_SET(_state, kIOOQStateRunning);
959:
960: if (_queue->getSize())
961: {
962: ATOMIC_ADD(_enqueueCount, 1);
963:
964: if (!_serviceActive)
965: {
966: if (sync)
967: GATED_DEQUEUE;
968: else
969: scheduleServiceThread();
970:
971: started = true;
972: }
973: }
974:
975: return started;
976: }
This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.