|
|
1.1 root 1: #include "u.h"
2: #include "../port/lib.h"
3: #include "mem.h"
4: #include "dat.h"
5: #include "fns.h"
6: #include "io.h"
7: #include "../port/error.h"
8: #include "devtab.h"
9:
10: /*
11: * Part 1) Blocks
12: */
13:
14: /*
15: * Allocate a block. Put the data portion at the end of the smalloc'd
16: * chunk so that it can easily grow from the front to add protocol
17: * headers. Thank Larry Peterson for the suggestion.
18: */
19: Block *
20: allocb(ulong size)
21: {
22: Block *bp;
23: uchar *base, *lim;
24: int n;
25:
26: bp = smalloc(sizeof(Block)+size);
27:
28: base = (uchar*)bp + sizeof(Block);
29: n = msize(bp);
30: lim = (uchar*)bp + n;
31: n -= size + sizeof(Block);
32: if(n > 0)
33: memset(lim - n, 0, n);
34: bp->wptr = bp->rptr = lim - size;
35: bp->base = base;
36: bp->lim = lim;
37: bp->flags = 0;
38: bp->next = 0;
39: bp->list = 0;
40: bp->type = M_DATA;
41: bp->pc = getcallerpc(((uchar*)&size) - sizeof(size));
42: return bp;
43: }
44:
45: /*
46: * Free a block (or list of blocks). Poison its pointers so that
47: * someone trying to access it after freeing will cause a panic.
48: */
49: void
50: freeb(Block *bp)
51: {
52: Block *next;
53:
54: while(bp){
55: bp->rptr = 0;
56: bp->wptr = 0;
57: next = bp->next;
58: free(bp);
59: bp = next;
60: }
61: }
62:
63: /*
64: * Pad a block to the front with n bytes. This is used to add protocol
65: * headers to the front of blocks.
66: */
67: Block *
68: padb(Block *bp, int n)
69: {
70: Block *nbp;
71:
72: if(bp->base && bp->rptr-bp->base>=n){
73: bp->rptr -= n;
74: return bp;
75: } else {
76: nbp = allocb(n);
77: nbp->wptr = nbp->lim;
78: nbp->rptr = nbp->wptr - n;
79: nbp->next = bp;
80: return nbp;
81: }
82: }
83:
84: /*
85: * make sure the first block has n bytes
86: */
87: Block *
88: pullup(Block *bp, int n)
89: {
90: Block *nbp;
91: int i;
92:
93: /*
94: * this should almost always be true, the rest it
95: * just for to avoid every caller checking.
96: */
97: if(BLEN(bp) >= n)
98: return bp;
99:
100: /*
101: * if not enough room in the first block,
102: * add another to the front of the list.
103: */
104: if(bp->lim - bp->rptr < n){
105: nbp = allocb(n);
106: nbp->next = bp;
107: bp = nbp;
108: }
109:
110: /*
111: * copy bytes from the trailing blocks into the first
112: */
113: n -= BLEN(bp);
114: while(nbp = bp->next){
115: i = BLEN(nbp);
116: if(i >= n) {
117: memmove(bp->wptr, nbp->rptr, n);
118: bp->wptr += n;
119: nbp->rptr += n;
120: return bp;
121: } else {
122: memmove(bp->wptr, nbp->rptr, i);
123: bp->wptr += i;
124: bp->next = nbp->next;
125: nbp->next = 0;
126: freeb(nbp);
127: n -= i;
128: }
129: }
130: freeb(bp);
131: return 0;
132: }
133:
134: /*
135: * return the number of data bytes of a list of blocks
136: */
137: int
138: blen(Block *bp)
139: {
140: int len;
141:
142: len = 0;
143: while(bp) {
144: len += BLEN(bp);
145: bp = bp->next;
146: }
147:
148: return len;
149: }
150:
151: /*
152: * round a block chain to some even number of bytes. Used
153: * by devip.c becuase all IP packets must have an even number
154: * of bytes.
155: *
156: * The last block in the returned chain will have S_DELIM set.
157: */
158: int
159: bround(Block *bp, int amount)
160: {
161: Block *last;
162: int len, pad;
163:
164: len = 0;
165: SET(last); /* Ken's magic */
166:
167: while(bp) {
168: len += BLEN(bp);
169: last = bp;
170: bp = bp->next;
171: }
172:
173: pad = ((len + amount) & ~amount) - len;
174: if(pad) {
175: if(last->lim - last->wptr >= pad){
176: memset(last->wptr, 0, pad);
177: last->wptr += pad;
178: } else {
179: last->next = allocb(pad);
180: last->flags &= ~S_DELIM;
181: last = last->next;
182: last->wptr += pad;
183: last->flags |= S_DELIM;
184: }
185: }
186:
187: return len + pad;
188: }
189:
190: /*
191: * expand a block list to be one block, len bytes long. used by
192: * ethernet routines.
193: */
194: Block*
195: expandb(Block *bp, int len)
196: {
197: Block *nbp, *new;
198: int i;
199: ulong delim = 0;
200:
201: new = allocb(len);
202: if(new == 0){
203: freeb(bp);
204: return 0;
205: }
206:
207: /*
208: * copy bytes into new block
209: */
210: for(nbp = bp; len>0 && nbp; nbp = nbp->next){
211: delim = nbp->flags & S_DELIM;
212: i = BLEN(nbp);
213: if(i > len) {
214: memmove(new->wptr, nbp->rptr, len);
215: new->wptr += len;
216: break;
217: } else {
218: memmove(new->wptr, nbp->rptr, i);
219: new->wptr += i;
220: len -= i;
221: }
222: }
223: if(len){
224: memset(new->wptr, 0, len);
225: new->wptr += len;
226: }
227: new->flags |= delim;
228: freeb(bp);
229: return new;
230:
231: }
232:
233: /*
234: * make a copy of the first 'count' bytes of a block chain. Use
235: * by transport protocols.
236: */
237: Block *
238: copyb(Block *bp, int count)
239: {
240: Block *nb, *head, **p;
241: int l;
242:
243: p = &head;
244: while(count) {
245: l = BLEN(bp);
246: if(count < l)
247: l = count;
248: nb = allocb(l);
249: if(nb == 0)
250: panic("copyb.1");
251: memmove(nb->wptr, bp->rptr, l);
252: nb->wptr += l;
253: count -= l;
254: if(bp->flags & S_DELIM)
255: nb->flags |= S_DELIM;
256: *p = nb;
257: p = &nb->next;
258: bp = bp->next;
259: if(bp == 0)
260: break;
261: }
262: if(count) {
263: nb = allocb(count);
264: if(nb == 0)
265: panic("copyb.2");
266: memset(nb->wptr, 0, count);
267: nb->wptr += count;
268: nb->flags |= S_DELIM;
269: *p = nb;
270: }
271: if(blen(head) == 0)
272: print("copyb: zero length\n");
273:
274: return head;
275: }
276:
277: /*
278: * Part 2) Queues
279: */
280:
281: /*
282: * process end line discipline
283: */
284: static Streamput stputq;
285: Qinfo procinfo =
286: {
287: stputq,
288: nullput,
289: 0,
290: 0,
291: "process"
292: };
293:
294: /*
295: * line disciplines that can be pushed
296: */
297: static Qinfo *lds;
298:
299: /*
300: * make known a stream module and call its initialization routine, if
301: * it has one.
302: */
303: void
304: newqinfo(Qinfo *qi)
305: {
306: if(qi->next)
307: panic("newqinfo: already configured");
308:
309: qi->next = lds;
310: lds = qi;
311: if(qi->reset)
312: (*qi->reset)();
313: }
314:
315: /*
316: * find the info structure for line discipline 'name'
317: */
318: Qinfo *
319: qinfofind(char *name)
320: {
321: Qinfo *qi;
322:
323: if(name == 0)
324: return 0;
325: for(qi = lds; qi; qi = qi->next)
326: if(strcmp(qi->name, name)==0)
327: return qi;
328: return 0;
329: }
330:
331: /*
332: * allocate a pair of queues. flavor them with the requested put routines.
333: * the `QINUSE' flag on the read side is the only one used.
334: */
335: static Queue *
336: allocq(Qinfo *qi)
337: {
338: Queue *q, *wq;
339:
340: q = smalloc(2*sizeof(Queue));
341:
342: q->flag = QINUSE;
343: q->r.p = 0;
344: q->info = qi;
345: q->put = qi->iput;
346: q->len = q->nb = 0;
347: q->ptr = 0;
348: q->rp = &q->r;
349: wq = q->other = q + 1;
350:
351: wq->flag = QINUSE;
352: wq->r.p = 0;
353: wq->info = qi;
354: wq->put = qi->oput;
355: wq->other = q;
356: wq->ptr = 0;
357: wq->len = wq->nb = 0;
358: wq->rp = &wq->r;
359:
360: return q;
361: }
362:
363: /*
364: * free a queue
365: */
366: static void
367: freeq(Queue *q)
368: {
369: Block *bp;
370:
371: q = RD(q);
372: while(bp = getq(q))
373: freeb(bp);
374: q = WR(q);
375: while(bp = getq(q))
376: freeb(bp);
377: free(RD(q));
378: }
379:
380: /*
381: * flush a queue
382: */
383: static void
384: flushq(Queue *q)
385: {
386: Block *bp;
387:
388: q = RD(q);
389: while(bp = getq(q))
390: freeb(bp);
391: q = WR(q);
392: while(bp = getq(q))
393: freeb(bp);
394: }
395:
396: /*
397: * push a queue onto a stream referenced by the proc side write q
398: */
399: Queue *
400: pushq(Stream* s, Qinfo *qi)
401: {
402: Queue *q;
403: Queue *nq;
404:
405: q = RD(s->procq);
406:
407: /*
408: * make the new queue
409: */
410: nq = allocq(qi);
411:
412: /*
413: * push
414: */
415: qlock(s);
416: RD(nq)->next = q;
417: RD(WR(q)->next)->next = RD(nq);
418: WR(nq)->next = WR(q)->next;
419: WR(q)->next = WR(nq);
420: qunlock(s);
421:
422: if(qi->open)
423: (*qi->open)(RD(nq), s);
424:
425: return WR(nq)->next;
426: }
427:
428: /*
429: * pop off the top line discipline
430: */
431: static void
432: popq(Stream *s)
433: {
434: Queue *q;
435:
436: if(waserror()){
437: qunlock(s);
438: nexterror();
439: }
440: qlock(s);
441: if(s->procq->next == WR(s->devq))
442: error(Ebadld);
443: q = s->procq->next;
444: if(q->info->close)
445: (*q->info->close)(RD(q));
446: s->procq->next = q->next;
447: RD(q->next)->next = RD(s->procq);
448: qunlock(s);
449: freeq(q);
450: }
451:
452: /*
453: * add a block (or list of blocks) to the end of a queue. return true
454: * if one of the blocks contained a delimiter.
455: */
456: int
457: putq(Queue *q, Block *bp)
458: {
459: int delim;
460:
461: lock(q);
462: if(q->first)
463: q->last->next = bp;
464: else
465: q->first = bp;
466: q->len += BLEN(bp);
467: q->nb++;
468: delim = bp->flags & S_DELIM;
469: while(bp->next) {
470: bp = bp->next;
471: q->len += BLEN(bp);
472: q->nb++;
473: delim |= bp->flags & S_DELIM;
474: }
475: q->last = bp;
476: if(q->len >= Streamhi || q->nb >= Streambhi)
477: q->flag |= QHIWAT;
478: unlock(q);
479: return delim;
480: }
481:
482: int
483: putb(Blist *q, Block *bp)
484: {
485: int delim;
486:
487: if(q->first)
488: q->last->next = bp;
489: else
490: q->first = bp;
491: q->len += BLEN(bp);
492: delim = bp->flags & S_DELIM;
493: while(bp->next) {
494: bp = bp->next;
495: q->len += BLEN(bp);
496: delim |= bp->flags & S_DELIM;
497: }
498: q->last = bp;
499: return delim;
500: }
501:
502: /*
503: * add a block to the start of a queue
504: */
505: void
506: putbq(Blist *q, Block *bp)
507: {
508: lock(q);
509: if(q->first)
510: bp->next = q->first;
511: else
512: q->last = bp;
513: q->first = bp;
514: q->len += BLEN(bp);
515: q->nb++;
516: unlock(q);
517: }
518:
519: /*
520: * remove the first block from a queue
521: */
522: Block *
523: getq(Queue *q)
524: {
525: Block *bp;
526:
527: lock(q);
528: bp = q->first;
529: if(bp) {
530: q->first = bp->next;
531: if(q->first == 0)
532: q->last = 0;
533: q->len -= BLEN(bp);
534: q->nb--;
535: if((q->flag&QHIWAT) && q->len<Streamhi/2 && q->nb<Streambhi/2 &&q->other){
536: wakeup(q->other->next->other->rp);
537: q->flag &= ~QHIWAT;
538: }
539: bp->next = 0;
540: }
541: unlock(q);
542: return bp;
543: }
544:
545: /*
546: * grab all the blocks in a queue
547: */
548: Block *
549: grabq(Queue *q)
550: {
551: Block *bp;
552:
553: lock(q);
554: bp = q->first;
555: if(bp){
556: q->first = 0;
557: q->last = 0;
558: q->len = 0;
559: q->nb = 0;
560: if(q->flag&QHIWAT){
561: wakeup(q->other->next->other->rp);
562: q->flag &= ~QHIWAT;
563: }
564: }
565: unlock(q);
566: return bp;
567: }
568:
569: /*
570: * remove the first block from a list of blocks
571: */
572: Block *
573: getb(Blist *q)
574: {
575: Block *bp;
576:
577: bp = q->first;
578: if(bp) {
579: q->first = bp->next;
580: if(q->first == 0)
581: q->last = 0;
582: q->len -= BLEN(bp);
583: bp->next = 0;
584: }
585: return bp;
586: }
587:
588:
589: /*
590: * put a block into the bit bucket
591: */
592: void
593: nullput(Queue *q, Block *bp)
594: {
595: USED(q);
596: if(bp->type == M_HANGUP)
597: freeb(bp);
598: else {
599: freeb(bp);
600: error(Ehungup);
601: }
602: }
603:
604: /*
605: * Part 3) Streams
606: */
607:
608: /*
609: * the per stream directory structure
610: */
611: Dirtab streamdir[]={
612: "data", {Sdataqid}, 0, 0600,
613: "ctl", {Sctlqid}, 0, 0600,
614: };
615:
616: /*
617: * hash buckets containing all streams
618: */
619: enum
620: {
621: Nbits= 5,
622: Nhash= 1<<Nbits,
623: Nmask= Nhash-1,
624: };
625: typedef struct Sthash Sthash;
626: struct Sthash
627: {
628: QLock;
629: Stream *s;
630: };
631: static Sthash ht[Nhash];
632:
633: static void hangup(Stream*);
634:
635: /*
636: * A stream device consists of the contents of streamdir plus
637: * any directory supplied by the actual device.
638: *
639: * values of s:
640: * 0 to ntab-1 apply to the auxiliary directory.
641: * ntab to ntab+Shighqid-Slowqid+1 apply to streamdir.
642: */
643: int
644: streamgen(Chan *c, Dirtab *tab, int ntab, int s, Dir *dp)
645: {
646: if(s < ntab)
647: tab = &tab[s];
648: else if(s < ntab + Shighqid - Slowqid + 1)
649: tab = &streamdir[s - ntab];
650: else
651: return -1;
652:
653: devdir(c, (Qid){STREAMQID(STREAMID(c->qid.path),tab->qid.path), 0},
654: tab->name, tab->length, eve, tab->perm, dp);
655: return 1;
656: }
657:
658: /*
659: * return a hash bucket for a stream
660: */
661: static Sthash*
662: hash(int type, int dev, int id)
663: {
664: return &ht[(type*7*7 + dev*7 + id) & Nmask];
665: }
666:
667: /*
668: * create a new stream, if noopen is non-zero, don't increment the open count
669: */
670: Stream *
671: streamnew(ushort type, ushort dev, ulong id, Qinfo *qi, int noopen)
672: {
673: Stream *s;
674: Queue *q;
675: Sthash *hb;
676:
677: hb = hash(type, dev, id);
678:
679: /*
680: * if the stream already exists, just increment the reference counts.
681: */
682: qlock(hb);
683: for(s = hb->s; s; s = s->next) {
684: if(s->type == type && s->dev == dev && s->id == id){
685: s->inuse++;
686: qunlock(hb);
687: if(noopen == 0){
688: qlock(s);
689: s->opens++;
690: qunlock(s);
691: }
692: return s;
693: }
694: }
695:
696: /*
697: * create and init a new stream
698: */
699: s = smalloc(sizeof(Stream));
700: s->inuse = 1;
701: s->type = type;
702: s->dev = dev;
703: s->id = id;
704: s->err = 0;
705: s->hread = 0;
706: s->next = hb->s;
707: hb->s = s;
708:
709: /*
710: * The ordering of these 2 instructions is very important.
711: * It makes sure we finish the stream initialization before
712: * anyone else can access it.
713: */
714: qlock(s);
715: qunlock(hb);
716:
717: if(waserror()){
718: qunlock(s);
719: streamclose1(s);
720: nexterror();
721: }
722:
723: /*
724: * hang a device and process q off the stream
725: */
726: if(noopen)
727: s->opens = 0;
728: else
729: s->opens = 1;
730: q = allocq(&procinfo);
731: WR(q)->ptr = s;
732: RD(q)->ptr = s;
733: s->procq = WR(q);
734: q = allocq(qi);
735: s->devq = RD(q);
736: WR(s->procq)->next = WR(s->devq);
737: RD(s->procq)->next = 0;
738: RD(s->devq)->next = RD(s->procq);
739: WR(s->devq)->next = 0;
740:
741: if(qi->open)
742: (*qi->open)(RD(s->devq), s);
743:
744: qunlock(s);
745: poperror();
746: return s;
747: }
748:
749: /*
750: * Associate a stream with a channel
751: */
752: void
753: streamopen(Chan *c, Qinfo *qi)
754: {
755: c->stream = streamnew(c->type, c->dev, STREAMID(c->qid.path), qi, 0);
756: }
757:
758: /*
759: * Enter a stream only if the stream exists and is open. Increment the
760: * reference count so it can't disappear under foot.
761: *
762: * Return -1 if the stream no longer exists or is not opened.
763: */
764: int
765: streamenter(Stream *s)
766: {
767: Sthash *hb;
768: Stream *ns;
769:
770: hb = hash(s->type, s->dev, s->id);
771: qlock(hb);
772: for(ns = hb->s; ns; ns = ns->next)
773: if(s->type == ns->type && s->dev == ns->dev && s->id == ns->id){
774: s->inuse++;
775: qunlock(hb);
776: if(s->opens == 0){
777: streamexit(s);
778: return -1;
779: }
780: return 0;
781: }
782: qunlock(hb);
783: return -1;
784: }
785:
786: /*
787: * Decrement the reference count on a stream. If the count is
788: * zero, free the stream.
789: */
790: void
791: streamexit(Stream *s)
792: {
793: Queue *q;
794: Queue *nq;
795: Sthash *hb;
796: Stream **l, *ns;
797:
798: hb = hash(s->type, s->dev, s->id);
799: qlock(hb);
800: if(s->inuse-- == 1){
801: if(s->opens != 0)
802: panic("streamexit %d %s\n", s->opens, s->devq->info->name);
803:
804: /*
805: * ascend the stream freeing the queues
806: */
807: for(q = s->devq; q; q = nq){
808: nq = q->next;
809: freeq(q);
810: }
811: if(s->err)
812: freeb(s->err);
813:
814: /*
815: * unchain it from the hash bucket and free
816: */
817: l = &hb->s;
818: for(ns = hb->s; ns; ns = ns->next){
819: if(s == ns){
820: *l = s->next;
821: break;
822: }
823: l = &ns->next;
824: }
825: free(s);
826: }
827: qunlock(hb);
828: }
829:
830: /*
831: * nail down a stream so that it can't be closed
832: */
833: void
834: naildownstream(Stream *s)
835: {
836: s->opens++;
837: s->inuse++;
838: }
839:
840: /*
841: * Decrement the open count. When it goes to zero, call the close
842: * routines for each queue in the stream.
843: */
844: int
845: streamclose1(Stream *s)
846: {
847: Queue *q, *nq;
848: int rv;
849:
850: /*
851: * decrement the open count
852: */
853: qlock(s);
854: if(s->opens-- == 1){
855: /*
856: * descend the stream closing the queues
857: */
858: for(q = s->procq; q; q = q->next){
859: if(!waserror()){
860: if(q->info->close)
861: (*q->info->close)(q->other);
862: poperror();
863: }
864: WR(q)->put = nullput;
865:
866: /*
867: * this may be 2 streams joined device end to device end
868: */
869: if(q == s->devq->other)
870: break;
871: }
872:
873: /*
874: * ascend the stream flushing the queues
875: */
876: for(q = s->devq; q; q = nq){
877: nq = q->next;
878: flushq(q);
879: }
880: }
881: rv = s->opens;
882: qunlock(s);
883:
884: /*
885: * leave it and free it
886: */
887: streamexit(s);
888: return rv;
889: }
890: int
891: streamclose(Chan *c)
892: {
893: /*
894: * if no stream, ignore it
895: */
896: if(!c->stream)
897: return 0;
898: return streamclose1(c->stream);
899: }
900:
901: /*
902: * put a block to be read into the queue. wakeup any waiting reader
903: */
904: void
905: stputq(Queue *q, Block *bp)
906: {
907: int awaken;
908: Stream *s;
909:
910: if(bp->type == M_HANGUP){
911: s = q->ptr;
912: if(bp->rptr<bp->wptr && s->err==0)
913: s->err = bp;
914: else
915: freeb(bp);
916: q->flag |= QHUNGUP;
917: q->other->flag |= QHUNGUP;
918: wakeup(q->other->rp);
919: awaken = 1;
920: } else {
921: lock(q);
922: if(q->first)
923: q->last->next = bp;
924: else
925: q->first = bp;
926: q->len += BLEN(bp);
927: q->nb++;
928: awaken = bp->flags & S_DELIM;
929: while(bp->next) {
930: bp = bp->next;
931: q->len += BLEN(bp);
932: q->nb++;
933: awaken |= bp->flags & S_DELIM;
934: }
935: q->last = bp;
936: if(q->len >= Streamhi || q->nb >= Streambhi){
937: q->flag |= QHIWAT;
938: awaken = 1;
939: }
940: unlock(q);
941: }
942: if(awaken)
943: wakeup(q->rp);
944: }
945:
946: /*
947: * return the stream id
948: */
949: long
950: streamctlread(Chan *c, void *vbuf, long n)
951: {
952: char *buf = vbuf;
953: char num[32];
954: Stream *s;
955:
956: s = c->stream;
957: if(STREAMTYPE(c->qid.path) == Sctlqid){
958: sprint(num, "%d", s->id);
959: return readstr(c->offset, buf, n, num);
960: } else {
961: if(CHDIR & c->qid.path)
962: return devdirread(c, vbuf, n, 0, 0, streamgen);
963: else
964: panic("streamctlread");
965: }
966: return 0; /* not reached */
967: }
968:
969: /*
970: * return true if there is an output buffer available
971: */
972: static int
973: isinput(void *x)
974: {
975: Queue *q;
976:
977: q = (Queue *)x;
978: return (q->flag&QHUNGUP) || q->first!=0;
979: }
980:
981: /*
982: * read until we fill the buffer or until a DELIM is encountered
983: */
984: long
985: streamread(Chan *c, void *vbuf, long n)
986: {
987: Block *bp;
988: Block *tofree;
989: Stream *s;
990: Queue *q;
991: int left, i;
992: uchar *buf = vbuf;
993:
994: if(STREAMTYPE(c->qid.path) != Sdataqid)
995: return streamctlread(c, vbuf, n);
996:
997: /*
998: * one reader at a time
999: */
1000: s = c->stream;
1001: left = n;
1002: qlock(&s->rdlock);
1003: tofree = 0;
1004: q = 0;
1005: if(waserror()){
1006: /*
1007: * put any partially read message back into the
1008: * queue
1009: */
1010: while(tofree){
1011: bp = tofree;
1012: tofree = bp->next;
1013: bp->next = 0;
1014: putbq(q, bp);
1015: }
1016: qunlock(&s->rdlock);
1017: nexterror();
1018: }
1019:
1020: /*
1021: * sleep till data is available
1022: */
1023: q = RD(s->procq);
1024: while(left){
1025: bp = getq(q);
1026: if(bp == 0){
1027: if(q->flag & QHUNGUP){
1028: if(s->err)
1029: error((char*)s->err->rptr);
1030: else if(s->hread++<3)
1031: break;
1032: else
1033: error(Ehungup);
1034: }
1035: q->rp = &q->r;
1036: sleep(q->rp, isinput, (void *)q);
1037: continue;
1038: }
1039:
1040: i = BLEN(bp);
1041: if(i <= left){
1042: memmove(buf, bp->rptr, i);
1043: left -= i;
1044: buf += i;
1045: bp->next = tofree;
1046: tofree = bp;
1047: if(bp->flags & S_DELIM)
1048: break;
1049: } else {
1050: memmove(buf, bp->rptr, left);
1051: bp->rptr += left;
1052: putbq(q, bp);
1053: left = 0;
1054: }
1055: }
1056:
1057: /*
1058: * free completely read blocks
1059: */
1060: if(tofree)
1061: freeb(tofree);
1062:
1063: qunlock(&s->rdlock);
1064: poperror();
1065: return n - left;
1066: }
1067:
1068: /*
1069: * look for an instance of the line discipline `name' on
1070: * the stream `s'
1071: */
1072: void
1073: qlook(Stream *s, char *name)
1074: {
1075: Queue *q;
1076:
1077: for(q = s->procq; q; q = q->next){
1078: if(strcmp(q->info->name, name) == 0)
1079: return;
1080:
1081: /*
1082: * this may be 2 streams joined device end to device end
1083: */
1084: if(q == s->devq->other)
1085: break;
1086: }
1087: error(Ebadarg);
1088: }
1089:
1090: /*
1091: * Handle a ctl request. Streamwide requests are:
1092: *
1093: * hangup -- send an M_HANGUP up the stream
1094: * push ldname -- push the line discipline named ldname
1095: * pop -- pop a line discipline
1096: * look ldname -- look for a line discipline
1097: *
1098: * This routing is entered with s->wrlock'ed and must unlock.
1099: */
1100: static long
1101: streamctlwrite(Chan *c, void *a, long n)
1102: {
1103: Qinfo *qi;
1104: Block *bp;
1105: Stream *s;
1106:
1107: if(STREAMTYPE(c->qid.path) != Sctlqid)
1108: panic("streamctlwrite %lux", c->qid);
1109: s = c->stream;
1110:
1111: /*
1112: * package
1113: */
1114: bp = allocb(n+1);
1115: memmove(bp->wptr, a, n);
1116: bp->wptr[n] = 0;
1117: bp->wptr += n + 1;
1118:
1119: /*
1120: * check for standard requests
1121: */
1122: if(streamparse("hangup", bp)){
1123: hangup(s);
1124: freeb(bp);
1125: } else if(streamparse("push", bp)){
1126: qi = qinfofind((char *)bp->rptr);
1127: if(qi == 0)
1128: error(Ebadld);
1129: pushq(s, qi);
1130: freeb(bp);
1131: } else if(streamparse("pop", bp)){
1132: popq(s);
1133: freeb(bp);
1134: } else if(streamparse("look", bp)){
1135: qlook(s, (char *)bp->rptr);
1136: freeb(bp);
1137: } else {
1138: bp->type = M_CTL;
1139: bp->flags |= S_DELIM;
1140: PUTNEXT(s->procq, bp);
1141: }
1142:
1143: return n;
1144: }
1145:
1146: /*
1147: * wait till there's room in the next stream
1148: */
1149: static int
1150: notfull(void *arg)
1151: {
1152: return !QFULL((Queue *)arg);
1153: }
1154: void
1155: flowctl(Queue *q, Block *bp)
1156: {
1157: if(bp->type != M_HANGUP){
1158: qlock(&q->rlock);
1159: if(waserror()){
1160: qunlock(&q->rlock);
1161: freeb(bp);
1162: nexterror();
1163: }
1164: q->rp = &q->r;
1165: sleep(q->rp, notfull, q->next);
1166: qunlock(&q->rlock);
1167: poperror();
1168: }
1169: PUTNEXT(q, bp);
1170: }
1171:
1172: /*
1173: * send the request as a single delimited block
1174: */
1175: long
1176: streamwrite(Chan *c, void *a, long n, int docopy)
1177: {
1178: Stream *s;
1179: Queue *q;
1180: long rem;
1181: int i;
1182: Block *bp;
1183: char *va;
1184:
1185: /*
1186: * docopy will get used if I ever figure out when to avoid copying
1187: * data. -- presotto
1188: */
1189: USED(docopy);
1190:
1191: s = c->stream;
1192:
1193: /*
1194: * decode the qid
1195: */
1196: if(STREAMTYPE(c->qid.path) != Sdataqid)
1197: return streamctlwrite(c, a, n);
1198:
1199: /*
1200: * No writes allowed on hungup channels
1201: */
1202: q = s->procq;
1203: if(q->other->flag & QHUNGUP){
1204: if(s->err)
1205: error((char*)(s->err->rptr));
1206: else
1207: error(Ehungup);
1208: }
1209:
1210: /*
1211: * Write the message using blocks <= Streamhi bytes longs
1212: */
1213: va = a;
1214: rem = n;
1215: for(;;){
1216: if(rem > Streamhi)
1217: i = Streamhi;
1218: else
1219: i = rem;
1220: bp = allocb(i);
1221: memmove(bp->wptr, va, i);
1222: bp->wptr += i;
1223: va += i;
1224: rem -= i;
1225: if(rem > 0){
1226: FLOWCTL(q, bp);
1227: } else {
1228: bp->flags |= S_DELIM;
1229: FLOWCTL(q, bp);
1230: break;
1231: }
1232: }
1233: return n;
1234: }
1235:
1236: /*
1237: * stat a stream. the length is the number of bytes up to the
1238: * first delimiter.
1239: */
1240: void
1241: streamstat(Chan *c, char *db, char *name, long perm)
1242: {
1243: Dir dir;
1244: Stream *s;
1245: Queue *q;
1246: Block *bp;
1247: long n;
1248:
1249: s = c->stream;
1250: n = 0;
1251: if(s) {
1252: q = RD(s->procq);
1253: if(q->flag & QHUNGUP)
1254: error(Ehungup);
1255: lock(q);
1256: for(bp=q->first; bp; bp = bp->next){
1257: n += BLEN(bp);
1258: if(bp->flags&S_DELIM)
1259: break;
1260: }
1261: unlock(q);
1262: }
1263:
1264: devdir(c, c->qid, name, n, eve, perm, &dir);
1265: convD2M(&dir, db);
1266: }
1267:
1268: /*
1269: * send a hangup up a stream
1270: */
1271: static void
1272: hangup(Stream *s)
1273: {
1274: Block *bp;
1275:
1276: bp = allocb(0);
1277: bp->type = M_HANGUP;
1278: if(s->devq && s->devq->put)
1279: (*s->devq->put)(s->devq, bp);
1280: else
1281: freeb(bp);
1282: }
1283:
1284: /*
1285: * parse a string and return a pointer to the second element if the
1286: * first matches name. bp->rptr will be updated to point to the
1287: * second element.
1288: *
1289: * return 0 if no match.
1290: *
1291: * it is assumed that the block data is null terminated. streamwrite
1292: * guarantees this.
1293: */
1294: int
1295: streamparse(char *name, Block *bp)
1296: {
1297: int len;
1298:
1299: len = strlen(name);
1300: if(BLEN(bp) < len)
1301: return 0;
1302: if(strncmp(name, (char *)bp->rptr, len)==0){
1303: if(bp->rptr[len] == ' ')
1304: bp->rptr += len+1;
1305: else if(bp->rptr[len])
1306: return 0;
1307: else
1308: bp->rptr += len;
1309: while(*bp->rptr==' ' && bp->wptr>bp->rptr)
1310: bp->rptr++;
1311: return 1;
1312: }
1313: return 0;
1314: }
1315:
1316: /*
1317: * like andrew's getmfields but no hidden state
1318: */
1319: int
1320: getfields(char *lp, char **fields, int n, char *sep)
1321: {
1322: int i;
1323:
1324: for(i=0; lp && *lp && i<n; i++){
1325: while(*lp && strchr(sep, *lp) != 0)
1326: *lp++=0;
1327: if(*lp == 0)
1328: break;
1329: fields[i]=lp;
1330: while(*lp && strchr(sep, *lp) == 0)
1331: lp++;
1332: }
1333: return i;
1334: }
1335:
1336: static Streamopen permstopen;
1337: static Streamput permstput;
1338:
1339: /*
1340: * pushing this line discipline makes the stream unclosable, i.e., always there
1341: */
1342: Qinfo perminfo =
1343: {
1344: permstput,
1345: permstput,
1346: permstopen,
1347: 0,
1348: "permanent"
1349: };
1350:
1351: static void
1352: permstopen(Queue *q, Stream *s)
1353: {
1354: USED(q);
1355: s->opens++;
1356: s->inuse++;
1357: }
1358:
1359: static void
1360: permstput(Queue *q, Block *bp)
1361: {
1362: PUTNEXT(q, bp);
1363: }
This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.