|
|
1.1 ! root 1: #include "u.h" ! 2: #include "../port/lib.h" ! 3: #include "mem.h" ! 4: #include "dat.h" ! 5: #include "fns.h" ! 6: #include "io.h" ! 7: #include "../port/error.h" ! 8: #include "devtab.h" ! 9: ! 10: /* ! 11: * Part 1) Blocks ! 12: */ ! 13: ! 14: /* ! 15: * Allocate a block. Put the data portion at the end of the smalloc'd ! 16: * chunk so that it can easily grow from the front to add protocol ! 17: * headers. Thank Larry Peterson for the suggestion. ! 18: */ ! 19: Block * ! 20: allocb(ulong size) ! 21: { ! 22: Block *bp; ! 23: uchar *base, *lim; ! 24: int n; ! 25: ! 26: bp = smalloc(sizeof(Block)+size); ! 27: ! 28: base = (uchar*)bp + sizeof(Block); ! 29: n = msize(bp); ! 30: lim = (uchar*)bp + n; ! 31: n -= size + sizeof(Block); ! 32: if(n > 0) ! 33: memset(lim - n, 0, n); ! 34: bp->wptr = bp->rptr = lim - size; ! 35: bp->base = base; ! 36: bp->lim = lim; ! 37: bp->flags = 0; ! 38: bp->next = 0; ! 39: bp->list = 0; ! 40: bp->type = M_DATA; ! 41: bp->pc = getcallerpc(((uchar*)&size) - sizeof(size)); ! 42: return bp; ! 43: } ! 44: ! 45: /* ! 46: * Free a block (or list of blocks). Poison its pointers so that ! 47: * someone trying to access it after freeing will cause a panic. ! 48: */ ! 49: void ! 50: freeb(Block *bp) ! 51: { ! 52: Block *next; ! 53: ! 54: while(bp){ ! 55: bp->rptr = 0; ! 56: bp->wptr = 0; ! 57: next = bp->next; ! 58: free(bp); ! 59: bp = next; ! 60: } ! 61: } ! 62: ! 63: /* ! 64: * Pad a block to the front with n bytes. This is used to add protocol ! 65: * headers to the front of blocks. ! 66: */ ! 67: Block * ! 68: padb(Block *bp, int n) ! 69: { ! 70: Block *nbp; ! 71: ! 72: if(bp->base && bp->rptr-bp->base>=n){ ! 73: bp->rptr -= n; ! 74: return bp; ! 75: } else { ! 76: nbp = allocb(n); ! 77: nbp->wptr = nbp->lim; ! 78: nbp->rptr = nbp->wptr - n; ! 79: nbp->next = bp; ! 80: return nbp; ! 81: } ! 82: } ! 83: ! 84: /* ! 85: * make sure the first block has n bytes ! 86: */ ! 87: Block * ! 88: pullup(Block *bp, int n) ! 89: { ! 90: Block *nbp; ! 91: int i; ! 92: ! 93: /* ! 94: * this should almost always be true, the rest it ! 95: * just for to avoid every caller checking. ! 96: */ ! 97: if(BLEN(bp) >= n) ! 98: return bp; ! 99: ! 100: /* ! 101: * if not enough room in the first block, ! 102: * add another to the front of the list. ! 103: */ ! 104: if(bp->lim - bp->rptr < n){ ! 105: nbp = allocb(n); ! 106: nbp->next = bp; ! 107: bp = nbp; ! 108: } ! 109: ! 110: /* ! 111: * copy bytes from the trailing blocks into the first ! 112: */ ! 113: n -= BLEN(bp); ! 114: while(nbp = bp->next){ ! 115: i = BLEN(nbp); ! 116: if(i >= n) { ! 117: memmove(bp->wptr, nbp->rptr, n); ! 118: bp->wptr += n; ! 119: nbp->rptr += n; ! 120: return bp; ! 121: } else { ! 122: memmove(bp->wptr, nbp->rptr, i); ! 123: bp->wptr += i; ! 124: bp->next = nbp->next; ! 125: nbp->next = 0; ! 126: freeb(nbp); ! 127: n -= i; ! 128: } ! 129: } ! 130: freeb(bp); ! 131: return 0; ! 132: } ! 133: ! 134: /* ! 135: * return the number of data bytes of a list of blocks ! 136: */ ! 137: int ! 138: blen(Block *bp) ! 139: { ! 140: int len; ! 141: ! 142: len = 0; ! 143: while(bp) { ! 144: len += BLEN(bp); ! 145: bp = bp->next; ! 146: } ! 147: ! 148: return len; ! 149: } ! 150: ! 151: /* ! 152: * round a block chain to some even number of bytes. Used ! 153: * by devip.c becuase all IP packets must have an even number ! 154: * of bytes. ! 155: * ! 156: * The last block in the returned chain will have S_DELIM set. ! 157: */ ! 158: int ! 159: bround(Block *bp, int amount) ! 160: { ! 161: Block *last; ! 162: int len, pad; ! 163: ! 164: len = 0; ! 165: SET(last); /* Ken's magic */ ! 166: ! 167: while(bp) { ! 168: len += BLEN(bp); ! 169: last = bp; ! 170: bp = bp->next; ! 171: } ! 172: ! 173: pad = ((len + amount) & ~amount) - len; ! 174: if(pad) { ! 175: if(last->lim - last->wptr >= pad){ ! 176: memset(last->wptr, 0, pad); ! 177: last->wptr += pad; ! 178: } else { ! 179: last->next = allocb(pad); ! 180: last->flags &= ~S_DELIM; ! 181: last = last->next; ! 182: last->wptr += pad; ! 183: last->flags |= S_DELIM; ! 184: } ! 185: } ! 186: ! 187: return len + pad; ! 188: } ! 189: ! 190: /* ! 191: * expand a block list to be one block, len bytes long. used by ! 192: * ethernet routines. ! 193: */ ! 194: Block* ! 195: expandb(Block *bp, int len) ! 196: { ! 197: Block *nbp, *new; ! 198: int i; ! 199: ulong delim = 0; ! 200: ! 201: new = allocb(len); ! 202: if(new == 0){ ! 203: freeb(bp); ! 204: return 0; ! 205: } ! 206: ! 207: /* ! 208: * copy bytes into new block ! 209: */ ! 210: for(nbp = bp; len>0 && nbp; nbp = nbp->next){ ! 211: delim = nbp->flags & S_DELIM; ! 212: i = BLEN(nbp); ! 213: if(i > len) { ! 214: memmove(new->wptr, nbp->rptr, len); ! 215: new->wptr += len; ! 216: break; ! 217: } else { ! 218: memmove(new->wptr, nbp->rptr, i); ! 219: new->wptr += i; ! 220: len -= i; ! 221: } ! 222: } ! 223: if(len){ ! 224: memset(new->wptr, 0, len); ! 225: new->wptr += len; ! 226: } ! 227: new->flags |= delim; ! 228: freeb(bp); ! 229: return new; ! 230: ! 231: } ! 232: ! 233: /* ! 234: * make a copy of the first 'count' bytes of a block chain. Use ! 235: * by transport protocols. ! 236: */ ! 237: Block * ! 238: copyb(Block *bp, int count) ! 239: { ! 240: Block *nb, *head, **p; ! 241: int l; ! 242: ! 243: p = &head; ! 244: while(count) { ! 245: l = BLEN(bp); ! 246: if(count < l) ! 247: l = count; ! 248: nb = allocb(l); ! 249: if(nb == 0) ! 250: panic("copyb.1"); ! 251: memmove(nb->wptr, bp->rptr, l); ! 252: nb->wptr += l; ! 253: count -= l; ! 254: if(bp->flags & S_DELIM) ! 255: nb->flags |= S_DELIM; ! 256: *p = nb; ! 257: p = &nb->next; ! 258: bp = bp->next; ! 259: if(bp == 0) ! 260: break; ! 261: } ! 262: if(count) { ! 263: nb = allocb(count); ! 264: if(nb == 0) ! 265: panic("copyb.2"); ! 266: memset(nb->wptr, 0, count); ! 267: nb->wptr += count; ! 268: nb->flags |= S_DELIM; ! 269: *p = nb; ! 270: } ! 271: if(blen(head) == 0) ! 272: print("copyb: zero length\n"); ! 273: ! 274: return head; ! 275: } ! 276: ! 277: /* ! 278: * Part 2) Queues ! 279: */ ! 280: ! 281: /* ! 282: * process end line discipline ! 283: */ ! 284: static Streamput stputq; ! 285: Qinfo procinfo = ! 286: { ! 287: stputq, ! 288: nullput, ! 289: 0, ! 290: 0, ! 291: "process" ! 292: }; ! 293: ! 294: /* ! 295: * line disciplines that can be pushed ! 296: */ ! 297: static Qinfo *lds; ! 298: ! 299: /* ! 300: * make known a stream module and call its initialization routine, if ! 301: * it has one. ! 302: */ ! 303: void ! 304: newqinfo(Qinfo *qi) ! 305: { ! 306: if(qi->next) ! 307: panic("newqinfo: already configured"); ! 308: ! 309: qi->next = lds; ! 310: lds = qi; ! 311: if(qi->reset) ! 312: (*qi->reset)(); ! 313: } ! 314: ! 315: /* ! 316: * find the info structure for line discipline 'name' ! 317: */ ! 318: Qinfo * ! 319: qinfofind(char *name) ! 320: { ! 321: Qinfo *qi; ! 322: ! 323: if(name == 0) ! 324: return 0; ! 325: for(qi = lds; qi; qi = qi->next) ! 326: if(strcmp(qi->name, name)==0) ! 327: return qi; ! 328: return 0; ! 329: } ! 330: ! 331: /* ! 332: * allocate a pair of queues. flavor them with the requested put routines. ! 333: * the `QINUSE' flag on the read side is the only one used. ! 334: */ ! 335: static Queue * ! 336: allocq(Qinfo *qi) ! 337: { ! 338: Queue *q, *wq; ! 339: ! 340: q = smalloc(2*sizeof(Queue)); ! 341: ! 342: q->flag = QINUSE; ! 343: q->r.p = 0; ! 344: q->info = qi; ! 345: q->put = qi->iput; ! 346: q->len = q->nb = 0; ! 347: q->ptr = 0; ! 348: q->rp = &q->r; ! 349: wq = q->other = q + 1; ! 350: ! 351: wq->flag = QINUSE; ! 352: wq->r.p = 0; ! 353: wq->info = qi; ! 354: wq->put = qi->oput; ! 355: wq->other = q; ! 356: wq->ptr = 0; ! 357: wq->len = wq->nb = 0; ! 358: wq->rp = &wq->r; ! 359: ! 360: return q; ! 361: } ! 362: ! 363: /* ! 364: * free a queue ! 365: */ ! 366: static void ! 367: freeq(Queue *q) ! 368: { ! 369: Block *bp; ! 370: ! 371: q = RD(q); ! 372: while(bp = getq(q)) ! 373: freeb(bp); ! 374: q = WR(q); ! 375: while(bp = getq(q)) ! 376: freeb(bp); ! 377: free(RD(q)); ! 378: } ! 379: ! 380: /* ! 381: * flush a queue ! 382: */ ! 383: static void ! 384: flushq(Queue *q) ! 385: { ! 386: Block *bp; ! 387: ! 388: q = RD(q); ! 389: while(bp = getq(q)) ! 390: freeb(bp); ! 391: q = WR(q); ! 392: while(bp = getq(q)) ! 393: freeb(bp); ! 394: } ! 395: ! 396: /* ! 397: * push a queue onto a stream referenced by the proc side write q ! 398: */ ! 399: Queue * ! 400: pushq(Stream* s, Qinfo *qi) ! 401: { ! 402: Queue *q; ! 403: Queue *nq; ! 404: ! 405: q = RD(s->procq); ! 406: ! 407: /* ! 408: * make the new queue ! 409: */ ! 410: nq = allocq(qi); ! 411: ! 412: /* ! 413: * push ! 414: */ ! 415: qlock(s); ! 416: RD(nq)->next = q; ! 417: RD(WR(q)->next)->next = RD(nq); ! 418: WR(nq)->next = WR(q)->next; ! 419: WR(q)->next = WR(nq); ! 420: qunlock(s); ! 421: ! 422: if(qi->open) ! 423: (*qi->open)(RD(nq), s); ! 424: ! 425: return WR(nq)->next; ! 426: } ! 427: ! 428: /* ! 429: * pop off the top line discipline ! 430: */ ! 431: static void ! 432: popq(Stream *s) ! 433: { ! 434: Queue *q; ! 435: ! 436: if(waserror()){ ! 437: qunlock(s); ! 438: nexterror(); ! 439: } ! 440: qlock(s); ! 441: if(s->procq->next == WR(s->devq)) ! 442: error(Ebadld); ! 443: q = s->procq->next; ! 444: if(q->info->close) ! 445: (*q->info->close)(RD(q)); ! 446: s->procq->next = q->next; ! 447: RD(q->next)->next = RD(s->procq); ! 448: qunlock(s); ! 449: freeq(q); ! 450: } ! 451: ! 452: /* ! 453: * add a block (or list of blocks) to the end of a queue. return true ! 454: * if one of the blocks contained a delimiter. ! 455: */ ! 456: int ! 457: putq(Queue *q, Block *bp) ! 458: { ! 459: int delim; ! 460: ! 461: lock(q); ! 462: if(q->first) ! 463: q->last->next = bp; ! 464: else ! 465: q->first = bp; ! 466: q->len += BLEN(bp); ! 467: q->nb++; ! 468: delim = bp->flags & S_DELIM; ! 469: while(bp->next) { ! 470: bp = bp->next; ! 471: q->len += BLEN(bp); ! 472: q->nb++; ! 473: delim |= bp->flags & S_DELIM; ! 474: } ! 475: q->last = bp; ! 476: if(q->len >= Streamhi || q->nb >= Streambhi) ! 477: q->flag |= QHIWAT; ! 478: unlock(q); ! 479: return delim; ! 480: } ! 481: ! 482: int ! 483: putb(Blist *q, Block *bp) ! 484: { ! 485: int delim; ! 486: ! 487: if(q->first) ! 488: q->last->next = bp; ! 489: else ! 490: q->first = bp; ! 491: q->len += BLEN(bp); ! 492: delim = bp->flags & S_DELIM; ! 493: while(bp->next) { ! 494: bp = bp->next; ! 495: q->len += BLEN(bp); ! 496: delim |= bp->flags & S_DELIM; ! 497: } ! 498: q->last = bp; ! 499: return delim; ! 500: } ! 501: ! 502: /* ! 503: * add a block to the start of a queue ! 504: */ ! 505: void ! 506: putbq(Blist *q, Block *bp) ! 507: { ! 508: lock(q); ! 509: if(q->first) ! 510: bp->next = q->first; ! 511: else ! 512: q->last = bp; ! 513: q->first = bp; ! 514: q->len += BLEN(bp); ! 515: q->nb++; ! 516: unlock(q); ! 517: } ! 518: ! 519: /* ! 520: * remove the first block from a queue ! 521: */ ! 522: Block * ! 523: getq(Queue *q) ! 524: { ! 525: Block *bp; ! 526: ! 527: lock(q); ! 528: bp = q->first; ! 529: if(bp) { ! 530: q->first = bp->next; ! 531: if(q->first == 0) ! 532: q->last = 0; ! 533: q->len -= BLEN(bp); ! 534: q->nb--; ! 535: if((q->flag&QHIWAT) && q->len<Streamhi/2 && q->nb<Streambhi/2 &&q->other){ ! 536: wakeup(q->other->next->other->rp); ! 537: q->flag &= ~QHIWAT; ! 538: } ! 539: bp->next = 0; ! 540: } ! 541: unlock(q); ! 542: return bp; ! 543: } ! 544: ! 545: /* ! 546: * grab all the blocks in a queue ! 547: */ ! 548: Block * ! 549: grabq(Queue *q) ! 550: { ! 551: Block *bp; ! 552: ! 553: lock(q); ! 554: bp = q->first; ! 555: if(bp){ ! 556: q->first = 0; ! 557: q->last = 0; ! 558: q->len = 0; ! 559: q->nb = 0; ! 560: if(q->flag&QHIWAT){ ! 561: wakeup(q->other->next->other->rp); ! 562: q->flag &= ~QHIWAT; ! 563: } ! 564: } ! 565: unlock(q); ! 566: return bp; ! 567: } ! 568: ! 569: /* ! 570: * remove the first block from a list of blocks ! 571: */ ! 572: Block * ! 573: getb(Blist *q) ! 574: { ! 575: Block *bp; ! 576: ! 577: bp = q->first; ! 578: if(bp) { ! 579: q->first = bp->next; ! 580: if(q->first == 0) ! 581: q->last = 0; ! 582: q->len -= BLEN(bp); ! 583: bp->next = 0; ! 584: } ! 585: return bp; ! 586: } ! 587: ! 588: ! 589: /* ! 590: * put a block into the bit bucket ! 591: */ ! 592: void ! 593: nullput(Queue *q, Block *bp) ! 594: { ! 595: USED(q); ! 596: if(bp->type == M_HANGUP) ! 597: freeb(bp); ! 598: else { ! 599: freeb(bp); ! 600: error(Ehungup); ! 601: } ! 602: } ! 603: ! 604: /* ! 605: * Part 3) Streams ! 606: */ ! 607: ! 608: /* ! 609: * the per stream directory structure ! 610: */ ! 611: Dirtab streamdir[]={ ! 612: "data", {Sdataqid}, 0, 0600, ! 613: "ctl", {Sctlqid}, 0, 0600, ! 614: }; ! 615: ! 616: /* ! 617: * hash buckets containing all streams ! 618: */ ! 619: enum ! 620: { ! 621: Nbits= 5, ! 622: Nhash= 1<<Nbits, ! 623: Nmask= Nhash-1, ! 624: }; ! 625: typedef struct Sthash Sthash; ! 626: struct Sthash ! 627: { ! 628: QLock; ! 629: Stream *s; ! 630: }; ! 631: static Sthash ht[Nhash]; ! 632: ! 633: static void hangup(Stream*); ! 634: ! 635: /* ! 636: * A stream device consists of the contents of streamdir plus ! 637: * any directory supplied by the actual device. ! 638: * ! 639: * values of s: ! 640: * 0 to ntab-1 apply to the auxiliary directory. ! 641: * ntab to ntab+Shighqid-Slowqid+1 apply to streamdir. ! 642: */ ! 643: int ! 644: streamgen(Chan *c, Dirtab *tab, int ntab, int s, Dir *dp) ! 645: { ! 646: if(s < ntab) ! 647: tab = &tab[s]; ! 648: else if(s < ntab + Shighqid - Slowqid + 1) ! 649: tab = &streamdir[s - ntab]; ! 650: else ! 651: return -1; ! 652: ! 653: devdir(c, (Qid){STREAMQID(STREAMID(c->qid.path),tab->qid.path), 0}, ! 654: tab->name, tab->length, eve, tab->perm, dp); ! 655: return 1; ! 656: } ! 657: ! 658: /* ! 659: * return a hash bucket for a stream ! 660: */ ! 661: static Sthash* ! 662: hash(int type, int dev, int id) ! 663: { ! 664: return &ht[(type*7*7 + dev*7 + id) & Nmask]; ! 665: } ! 666: ! 667: /* ! 668: * create a new stream, if noopen is non-zero, don't increment the open count ! 669: */ ! 670: Stream * ! 671: streamnew(ushort type, ushort dev, ulong id, Qinfo *qi, int noopen) ! 672: { ! 673: Stream *s; ! 674: Queue *q; ! 675: Sthash *hb; ! 676: ! 677: hb = hash(type, dev, id); ! 678: ! 679: /* ! 680: * if the stream already exists, just increment the reference counts. ! 681: */ ! 682: qlock(hb); ! 683: for(s = hb->s; s; s = s->next) { ! 684: if(s->type == type && s->dev == dev && s->id == id){ ! 685: s->inuse++; ! 686: qunlock(hb); ! 687: if(noopen == 0){ ! 688: qlock(s); ! 689: s->opens++; ! 690: qunlock(s); ! 691: } ! 692: return s; ! 693: } ! 694: } ! 695: ! 696: /* ! 697: * create and init a new stream ! 698: */ ! 699: s = smalloc(sizeof(Stream)); ! 700: s->inuse = 1; ! 701: s->type = type; ! 702: s->dev = dev; ! 703: s->id = id; ! 704: s->err = 0; ! 705: s->hread = 0; ! 706: s->next = hb->s; ! 707: hb->s = s; ! 708: ! 709: /* ! 710: * The ordering of these 2 instructions is very important. ! 711: * It makes sure we finish the stream initialization before ! 712: * anyone else can access it. ! 713: */ ! 714: qlock(s); ! 715: qunlock(hb); ! 716: ! 717: if(waserror()){ ! 718: qunlock(s); ! 719: streamclose1(s); ! 720: nexterror(); ! 721: } ! 722: ! 723: /* ! 724: * hang a device and process q off the stream ! 725: */ ! 726: if(noopen) ! 727: s->opens = 0; ! 728: else ! 729: s->opens = 1; ! 730: q = allocq(&procinfo); ! 731: WR(q)->ptr = s; ! 732: RD(q)->ptr = s; ! 733: s->procq = WR(q); ! 734: q = allocq(qi); ! 735: s->devq = RD(q); ! 736: WR(s->procq)->next = WR(s->devq); ! 737: RD(s->procq)->next = 0; ! 738: RD(s->devq)->next = RD(s->procq); ! 739: WR(s->devq)->next = 0; ! 740: ! 741: if(qi->open) ! 742: (*qi->open)(RD(s->devq), s); ! 743: ! 744: qunlock(s); ! 745: poperror(); ! 746: return s; ! 747: } ! 748: ! 749: /* ! 750: * Associate a stream with a channel ! 751: */ ! 752: void ! 753: streamopen(Chan *c, Qinfo *qi) ! 754: { ! 755: c->stream = streamnew(c->type, c->dev, STREAMID(c->qid.path), qi, 0); ! 756: } ! 757: ! 758: /* ! 759: * Enter a stream only if the stream exists and is open. Increment the ! 760: * reference count so it can't disappear under foot. ! 761: * ! 762: * Return -1 if the stream no longer exists or is not opened. ! 763: */ ! 764: int ! 765: streamenter(Stream *s) ! 766: { ! 767: Sthash *hb; ! 768: Stream *ns; ! 769: ! 770: hb = hash(s->type, s->dev, s->id); ! 771: qlock(hb); ! 772: for(ns = hb->s; ns; ns = ns->next) ! 773: if(s->type == ns->type && s->dev == ns->dev && s->id == ns->id){ ! 774: s->inuse++; ! 775: qunlock(hb); ! 776: if(s->opens == 0){ ! 777: streamexit(s); ! 778: return -1; ! 779: } ! 780: return 0; ! 781: } ! 782: qunlock(hb); ! 783: return -1; ! 784: } ! 785: ! 786: /* ! 787: * Decrement the reference count on a stream. If the count is ! 788: * zero, free the stream. ! 789: */ ! 790: void ! 791: streamexit(Stream *s) ! 792: { ! 793: Queue *q; ! 794: Queue *nq; ! 795: Sthash *hb; ! 796: Stream **l, *ns; ! 797: ! 798: hb = hash(s->type, s->dev, s->id); ! 799: qlock(hb); ! 800: if(s->inuse-- == 1){ ! 801: if(s->opens != 0) ! 802: panic("streamexit %d %s\n", s->opens, s->devq->info->name); ! 803: ! 804: /* ! 805: * ascend the stream freeing the queues ! 806: */ ! 807: for(q = s->devq; q; q = nq){ ! 808: nq = q->next; ! 809: freeq(q); ! 810: } ! 811: if(s->err) ! 812: freeb(s->err); ! 813: ! 814: /* ! 815: * unchain it from the hash bucket and free ! 816: */ ! 817: l = &hb->s; ! 818: for(ns = hb->s; ns; ns = ns->next){ ! 819: if(s == ns){ ! 820: *l = s->next; ! 821: break; ! 822: } ! 823: l = &ns->next; ! 824: } ! 825: free(s); ! 826: } ! 827: qunlock(hb); ! 828: } ! 829: ! 830: /* ! 831: * nail down a stream so that it can't be closed ! 832: */ ! 833: void ! 834: naildownstream(Stream *s) ! 835: { ! 836: s->opens++; ! 837: s->inuse++; ! 838: } ! 839: ! 840: /* ! 841: * Decrement the open count. When it goes to zero, call the close ! 842: * routines for each queue in the stream. ! 843: */ ! 844: int ! 845: streamclose1(Stream *s) ! 846: { ! 847: Queue *q, *nq; ! 848: int rv; ! 849: ! 850: /* ! 851: * decrement the open count ! 852: */ ! 853: qlock(s); ! 854: if(s->opens-- == 1){ ! 855: /* ! 856: * descend the stream closing the queues ! 857: */ ! 858: for(q = s->procq; q; q = q->next){ ! 859: if(!waserror()){ ! 860: if(q->info->close) ! 861: (*q->info->close)(q->other); ! 862: poperror(); ! 863: } ! 864: WR(q)->put = nullput; ! 865: ! 866: /* ! 867: * this may be 2 streams joined device end to device end ! 868: */ ! 869: if(q == s->devq->other) ! 870: break; ! 871: } ! 872: ! 873: /* ! 874: * ascend the stream flushing the queues ! 875: */ ! 876: for(q = s->devq; q; q = nq){ ! 877: nq = q->next; ! 878: flushq(q); ! 879: } ! 880: } ! 881: rv = s->opens; ! 882: qunlock(s); ! 883: ! 884: /* ! 885: * leave it and free it ! 886: */ ! 887: streamexit(s); ! 888: return rv; ! 889: } ! 890: int ! 891: streamclose(Chan *c) ! 892: { ! 893: /* ! 894: * if no stream, ignore it ! 895: */ ! 896: if(!c->stream) ! 897: return 0; ! 898: return streamclose1(c->stream); ! 899: } ! 900: ! 901: /* ! 902: * put a block to be read into the queue. wakeup any waiting reader ! 903: */ ! 904: void ! 905: stputq(Queue *q, Block *bp) ! 906: { ! 907: int awaken; ! 908: Stream *s; ! 909: ! 910: if(bp->type == M_HANGUP){ ! 911: s = q->ptr; ! 912: if(bp->rptr<bp->wptr && s->err==0) ! 913: s->err = bp; ! 914: else ! 915: freeb(bp); ! 916: q->flag |= QHUNGUP; ! 917: q->other->flag |= QHUNGUP; ! 918: wakeup(q->other->rp); ! 919: awaken = 1; ! 920: } else { ! 921: lock(q); ! 922: if(q->first) ! 923: q->last->next = bp; ! 924: else ! 925: q->first = bp; ! 926: q->len += BLEN(bp); ! 927: q->nb++; ! 928: awaken = bp->flags & S_DELIM; ! 929: while(bp->next) { ! 930: bp = bp->next; ! 931: q->len += BLEN(bp); ! 932: q->nb++; ! 933: awaken |= bp->flags & S_DELIM; ! 934: } ! 935: q->last = bp; ! 936: if(q->len >= Streamhi || q->nb >= Streambhi){ ! 937: q->flag |= QHIWAT; ! 938: awaken = 1; ! 939: } ! 940: unlock(q); ! 941: } ! 942: if(awaken) ! 943: wakeup(q->rp); ! 944: } ! 945: ! 946: /* ! 947: * return the stream id ! 948: */ ! 949: long ! 950: streamctlread(Chan *c, void *vbuf, long n) ! 951: { ! 952: char *buf = vbuf; ! 953: char num[32]; ! 954: Stream *s; ! 955: ! 956: s = c->stream; ! 957: if(STREAMTYPE(c->qid.path) == Sctlqid){ ! 958: sprint(num, "%d", s->id); ! 959: return readstr(c->offset, buf, n, num); ! 960: } else { ! 961: if(CHDIR & c->qid.path) ! 962: return devdirread(c, vbuf, n, 0, 0, streamgen); ! 963: else ! 964: panic("streamctlread"); ! 965: } ! 966: return 0; /* not reached */ ! 967: } ! 968: ! 969: /* ! 970: * return true if there is an output buffer available ! 971: */ ! 972: static int ! 973: isinput(void *x) ! 974: { ! 975: Queue *q; ! 976: ! 977: q = (Queue *)x; ! 978: return (q->flag&QHUNGUP) || q->first!=0; ! 979: } ! 980: ! 981: /* ! 982: * read until we fill the buffer or until a DELIM is encountered ! 983: */ ! 984: long ! 985: streamread(Chan *c, void *vbuf, long n) ! 986: { ! 987: Block *bp; ! 988: Block *tofree; ! 989: Stream *s; ! 990: Queue *q; ! 991: int left, i; ! 992: uchar *buf = vbuf; ! 993: ! 994: if(STREAMTYPE(c->qid.path) != Sdataqid) ! 995: return streamctlread(c, vbuf, n); ! 996: ! 997: /* ! 998: * one reader at a time ! 999: */ ! 1000: s = c->stream; ! 1001: left = n; ! 1002: qlock(&s->rdlock); ! 1003: tofree = 0; ! 1004: q = 0; ! 1005: if(waserror()){ ! 1006: /* ! 1007: * put any partially read message back into the ! 1008: * queue ! 1009: */ ! 1010: while(tofree){ ! 1011: bp = tofree; ! 1012: tofree = bp->next; ! 1013: bp->next = 0; ! 1014: putbq(q, bp); ! 1015: } ! 1016: qunlock(&s->rdlock); ! 1017: nexterror(); ! 1018: } ! 1019: ! 1020: /* ! 1021: * sleep till data is available ! 1022: */ ! 1023: q = RD(s->procq); ! 1024: while(left){ ! 1025: bp = getq(q); ! 1026: if(bp == 0){ ! 1027: if(q->flag & QHUNGUP){ ! 1028: if(s->err) ! 1029: error((char*)s->err->rptr); ! 1030: else if(s->hread++<3) ! 1031: break; ! 1032: else ! 1033: error(Ehungup); ! 1034: } ! 1035: q->rp = &q->r; ! 1036: sleep(q->rp, isinput, (void *)q); ! 1037: continue; ! 1038: } ! 1039: ! 1040: i = BLEN(bp); ! 1041: if(i <= left){ ! 1042: memmove(buf, bp->rptr, i); ! 1043: left -= i; ! 1044: buf += i; ! 1045: bp->next = tofree; ! 1046: tofree = bp; ! 1047: if(bp->flags & S_DELIM) ! 1048: break; ! 1049: } else { ! 1050: memmove(buf, bp->rptr, left); ! 1051: bp->rptr += left; ! 1052: putbq(q, bp); ! 1053: left = 0; ! 1054: } ! 1055: } ! 1056: ! 1057: /* ! 1058: * free completely read blocks ! 1059: */ ! 1060: if(tofree) ! 1061: freeb(tofree); ! 1062: ! 1063: qunlock(&s->rdlock); ! 1064: poperror(); ! 1065: return n - left; ! 1066: } ! 1067: ! 1068: /* ! 1069: * look for an instance of the line discipline `name' on ! 1070: * the stream `s' ! 1071: */ ! 1072: void ! 1073: qlook(Stream *s, char *name) ! 1074: { ! 1075: Queue *q; ! 1076: ! 1077: for(q = s->procq; q; q = q->next){ ! 1078: if(strcmp(q->info->name, name) == 0) ! 1079: return; ! 1080: ! 1081: /* ! 1082: * this may be 2 streams joined device end to device end ! 1083: */ ! 1084: if(q == s->devq->other) ! 1085: break; ! 1086: } ! 1087: error(Ebadarg); ! 1088: } ! 1089: ! 1090: /* ! 1091: * Handle a ctl request. Streamwide requests are: ! 1092: * ! 1093: * hangup -- send an M_HANGUP up the stream ! 1094: * push ldname -- push the line discipline named ldname ! 1095: * pop -- pop a line discipline ! 1096: * look ldname -- look for a line discipline ! 1097: * ! 1098: * This routing is entered with s->wrlock'ed and must unlock. ! 1099: */ ! 1100: static long ! 1101: streamctlwrite(Chan *c, void *a, long n) ! 1102: { ! 1103: Qinfo *qi; ! 1104: Block *bp; ! 1105: Stream *s; ! 1106: ! 1107: if(STREAMTYPE(c->qid.path) != Sctlqid) ! 1108: panic("streamctlwrite %lux", c->qid); ! 1109: s = c->stream; ! 1110: ! 1111: /* ! 1112: * package ! 1113: */ ! 1114: bp = allocb(n+1); ! 1115: memmove(bp->wptr, a, n); ! 1116: bp->wptr[n] = 0; ! 1117: bp->wptr += n + 1; ! 1118: ! 1119: /* ! 1120: * check for standard requests ! 1121: */ ! 1122: if(streamparse("hangup", bp)){ ! 1123: hangup(s); ! 1124: freeb(bp); ! 1125: } else if(streamparse("push", bp)){ ! 1126: qi = qinfofind((char *)bp->rptr); ! 1127: if(qi == 0) ! 1128: error(Ebadld); ! 1129: pushq(s, qi); ! 1130: freeb(bp); ! 1131: } else if(streamparse("pop", bp)){ ! 1132: popq(s); ! 1133: freeb(bp); ! 1134: } else if(streamparse("look", bp)){ ! 1135: qlook(s, (char *)bp->rptr); ! 1136: freeb(bp); ! 1137: } else { ! 1138: bp->type = M_CTL; ! 1139: bp->flags |= S_DELIM; ! 1140: PUTNEXT(s->procq, bp); ! 1141: } ! 1142: ! 1143: return n; ! 1144: } ! 1145: ! 1146: /* ! 1147: * wait till there's room in the next stream ! 1148: */ ! 1149: static int ! 1150: notfull(void *arg) ! 1151: { ! 1152: return !QFULL((Queue *)arg); ! 1153: } ! 1154: void ! 1155: flowctl(Queue *q, Block *bp) ! 1156: { ! 1157: if(bp->type != M_HANGUP){ ! 1158: qlock(&q->rlock); ! 1159: if(waserror()){ ! 1160: qunlock(&q->rlock); ! 1161: freeb(bp); ! 1162: nexterror(); ! 1163: } ! 1164: q->rp = &q->r; ! 1165: sleep(q->rp, notfull, q->next); ! 1166: qunlock(&q->rlock); ! 1167: poperror(); ! 1168: } ! 1169: PUTNEXT(q, bp); ! 1170: } ! 1171: ! 1172: /* ! 1173: * send the request as a single delimited block ! 1174: */ ! 1175: long ! 1176: streamwrite(Chan *c, void *a, long n, int docopy) ! 1177: { ! 1178: Stream *s; ! 1179: Queue *q; ! 1180: long rem; ! 1181: int i; ! 1182: Block *bp; ! 1183: char *va; ! 1184: ! 1185: /* ! 1186: * docopy will get used if I ever figure out when to avoid copying ! 1187: * data. -- presotto ! 1188: */ ! 1189: USED(docopy); ! 1190: ! 1191: s = c->stream; ! 1192: ! 1193: /* ! 1194: * decode the qid ! 1195: */ ! 1196: if(STREAMTYPE(c->qid.path) != Sdataqid) ! 1197: return streamctlwrite(c, a, n); ! 1198: ! 1199: /* ! 1200: * No writes allowed on hungup channels ! 1201: */ ! 1202: q = s->procq; ! 1203: if(q->other->flag & QHUNGUP){ ! 1204: if(s->err) ! 1205: error((char*)(s->err->rptr)); ! 1206: else ! 1207: error(Ehungup); ! 1208: } ! 1209: ! 1210: /* ! 1211: * Write the message using blocks <= Streamhi bytes longs ! 1212: */ ! 1213: va = a; ! 1214: rem = n; ! 1215: for(;;){ ! 1216: if(rem > Streamhi) ! 1217: i = Streamhi; ! 1218: else ! 1219: i = rem; ! 1220: bp = allocb(i); ! 1221: memmove(bp->wptr, va, i); ! 1222: bp->wptr += i; ! 1223: va += i; ! 1224: rem -= i; ! 1225: if(rem > 0){ ! 1226: FLOWCTL(q, bp); ! 1227: } else { ! 1228: bp->flags |= S_DELIM; ! 1229: FLOWCTL(q, bp); ! 1230: break; ! 1231: } ! 1232: } ! 1233: return n; ! 1234: } ! 1235: ! 1236: /* ! 1237: * stat a stream. the length is the number of bytes up to the ! 1238: * first delimiter. ! 1239: */ ! 1240: void ! 1241: streamstat(Chan *c, char *db, char *name, long perm) ! 1242: { ! 1243: Dir dir; ! 1244: Stream *s; ! 1245: Queue *q; ! 1246: Block *bp; ! 1247: long n; ! 1248: ! 1249: s = c->stream; ! 1250: n = 0; ! 1251: if(s) { ! 1252: q = RD(s->procq); ! 1253: if(q->flag & QHUNGUP) ! 1254: error(Ehungup); ! 1255: lock(q); ! 1256: for(bp=q->first; bp; bp = bp->next){ ! 1257: n += BLEN(bp); ! 1258: if(bp->flags&S_DELIM) ! 1259: break; ! 1260: } ! 1261: unlock(q); ! 1262: } ! 1263: ! 1264: devdir(c, c->qid, name, n, eve, perm, &dir); ! 1265: convD2M(&dir, db); ! 1266: } ! 1267: ! 1268: /* ! 1269: * send a hangup up a stream ! 1270: */ ! 1271: static void ! 1272: hangup(Stream *s) ! 1273: { ! 1274: Block *bp; ! 1275: ! 1276: bp = allocb(0); ! 1277: bp->type = M_HANGUP; ! 1278: if(s->devq && s->devq->put) ! 1279: (*s->devq->put)(s->devq, bp); ! 1280: else ! 1281: freeb(bp); ! 1282: } ! 1283: ! 1284: /* ! 1285: * parse a string and return a pointer to the second element if the ! 1286: * first matches name. bp->rptr will be updated to point to the ! 1287: * second element. ! 1288: * ! 1289: * return 0 if no match. ! 1290: * ! 1291: * it is assumed that the block data is null terminated. streamwrite ! 1292: * guarantees this. ! 1293: */ ! 1294: int ! 1295: streamparse(char *name, Block *bp) ! 1296: { ! 1297: int len; ! 1298: ! 1299: len = strlen(name); ! 1300: if(BLEN(bp) < len) ! 1301: return 0; ! 1302: if(strncmp(name, (char *)bp->rptr, len)==0){ ! 1303: if(bp->rptr[len] == ' ') ! 1304: bp->rptr += len+1; ! 1305: else if(bp->rptr[len]) ! 1306: return 0; ! 1307: else ! 1308: bp->rptr += len; ! 1309: while(*bp->rptr==' ' && bp->wptr>bp->rptr) ! 1310: bp->rptr++; ! 1311: return 1; ! 1312: } ! 1313: return 0; ! 1314: } ! 1315: ! 1316: /* ! 1317: * like andrew's getmfields but no hidden state ! 1318: */ ! 1319: int ! 1320: getfields(char *lp, char **fields, int n, char *sep) ! 1321: { ! 1322: int i; ! 1323: ! 1324: for(i=0; lp && *lp && i<n; i++){ ! 1325: while(*lp && strchr(sep, *lp) != 0) ! 1326: *lp++=0; ! 1327: if(*lp == 0) ! 1328: break; ! 1329: fields[i]=lp; ! 1330: while(*lp && strchr(sep, *lp) == 0) ! 1331: lp++; ! 1332: } ! 1333: return i; ! 1334: } ! 1335: ! 1336: static Streamopen permstopen; ! 1337: static Streamput permstput; ! 1338: ! 1339: /* ! 1340: * pushing this line discipline makes the stream unclosable, i.e., always there ! 1341: */ ! 1342: Qinfo perminfo = ! 1343: { ! 1344: permstput, ! 1345: permstput, ! 1346: permstopen, ! 1347: 0, ! 1348: "permanent" ! 1349: }; ! 1350: ! 1351: static void ! 1352: permstopen(Queue *q, Stream *s) ! 1353: { ! 1354: USED(q); ! 1355: s->opens++; ! 1356: s->inuse++; ! 1357: } ! 1358: ! 1359: static void ! 1360: permstput(Queue *q, Block *bp) ! 1361: { ! 1362: PUTNEXT(q, bp); ! 1363: }
This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.