Annotation of lucent/sys/src/9/port/stream.c, revision 1.1

1.1     ! root        1: #include       "u.h"
        !             2: #include       "../port/lib.h"
        !             3: #include       "mem.h"
        !             4: #include       "dat.h"
        !             5: #include       "fns.h"
        !             6: #include       "io.h"
        !             7: #include       "../port/error.h"
        !             8: #include       "devtab.h"
        !             9: 
        !            10: /*
        !            11:  *  Part 1) Blocks
        !            12:  */
        !            13: 
        !            14: /*
        !            15:  *  Allocate a block.  Put the data portion at the end of the smalloc'd
        !            16:  *  chunk so that it can easily grow from the front to add protocol
        !            17:  *  headers.  Thank Larry Peterson for the suggestion.
        !            18:  */
        !            19: Block *
        !            20: allocb(ulong size)
        !            21: {
        !            22:        Block *bp;
        !            23:        uchar *base, *lim;
        !            24:        int n;
        !            25: 
        !            26:        bp = smalloc(sizeof(Block)+size);
        !            27: 
        !            28:        base = (uchar*)bp + sizeof(Block);
        !            29:        n = msize(bp);
        !            30:        lim = (uchar*)bp + n;
        !            31:        n -= size + sizeof(Block);
        !            32:        if(n > 0)
        !            33:                memset(lim - n, 0, n);
        !            34:        bp->wptr = bp->rptr = lim - size;
        !            35:        bp->base = base;
        !            36:        bp->lim = lim;
        !            37:        bp->flags = 0;
        !            38:        bp->next = 0;
        !            39:        bp->list = 0;
        !            40:        bp->type = M_DATA;
        !            41:        bp->pc = getcallerpc(((uchar*)&size) - sizeof(size));
        !            42:        return bp;
        !            43: }
        !            44: 
        !            45: /*
        !            46:  *  Free a block (or list of blocks).  Poison its pointers so that
        !            47:  *  someone trying to access it after freeing will cause a panic.
        !            48:  */
        !            49: void
        !            50: freeb(Block *bp)
        !            51: {
        !            52:        Block *next;
        !            53: 
        !            54:        while(bp){
        !            55:                bp->rptr = 0;
        !            56:                bp->wptr = 0;
        !            57:                next = bp->next;
        !            58:                free(bp);
        !            59:                bp = next;      
        !            60:        }
        !            61: }
        !            62: 
        !            63: /*
        !            64:  *  Pad a block to the front with n bytes.  This is used to add protocol
        !            65:  *  headers to the front of blocks.
        !            66:  */
        !            67: Block *
        !            68: padb(Block *bp, int n)
        !            69: {
        !            70:        Block *nbp;
        !            71: 
        !            72:        if(bp->base && bp->rptr-bp->base>=n){
        !            73:                bp->rptr -= n;
        !            74:                return bp;
        !            75:        } else {
        !            76:                nbp = allocb(n);
        !            77:                nbp->wptr = nbp->lim;
        !            78:                nbp->rptr = nbp->wptr - n;
        !            79:                nbp->next = bp;
        !            80:                return nbp;
        !            81:        }
        !            82: } 
        !            83: 
        !            84: /*
        !            85:  *  make sure the first block has n bytes
        !            86:  */
        !            87: Block *
        !            88: pullup(Block *bp, int n)
        !            89: {
        !            90:        Block *nbp;
        !            91:        int i;
        !            92: 
        !            93:        /*
        !            94:         *  this should almost always be true, the rest it
        !            95:         *  just for to avoid every caller checking.
        !            96:         */
        !            97:        if(BLEN(bp) >= n)
        !            98:                return bp;
        !            99: 
        !           100:        /*
        !           101:         *  if not enough room in the first block,
        !           102:         *  add another to the front of the list.
        !           103:         */
        !           104:        if(bp->lim - bp->rptr < n){
        !           105:                nbp = allocb(n);
        !           106:                nbp->next = bp;
        !           107:                bp = nbp;
        !           108:        }
        !           109: 
        !           110:        /*
        !           111:         *  copy bytes from the trailing blocks into the first
        !           112:         */
        !           113:        n -= BLEN(bp);
        !           114:        while(nbp = bp->next){
        !           115:                i = BLEN(nbp);
        !           116:                if(i >= n) {
        !           117:                        memmove(bp->wptr, nbp->rptr, n);
        !           118:                        bp->wptr += n;
        !           119:                        nbp->rptr += n;
        !           120:                        return bp;
        !           121:                } else {
        !           122:                        memmove(bp->wptr, nbp->rptr, i);
        !           123:                        bp->wptr += i;
        !           124:                        bp->next = nbp->next;
        !           125:                        nbp->next = 0;
        !           126:                        freeb(nbp);
        !           127:                        n -= i;
        !           128:                }
        !           129:        }
        !           130:        freeb(bp);
        !           131:        return 0;
        !           132: }
        !           133: 
        !           134: /*
        !           135:  *  return the number of data bytes of a list of blocks
        !           136:  */
        !           137: int
        !           138: blen(Block *bp)
        !           139: {
        !           140:        int len;
        !           141: 
        !           142:        len = 0;
        !           143:        while(bp) {
        !           144:                len += BLEN(bp);
        !           145:                bp = bp->next;
        !           146:        }
        !           147: 
        !           148:        return len;
        !           149: }
        !           150: 
        !           151: /*
        !           152:  *  round a block chain to some even number of bytes.  Used
        !           153:  *  by devip.c becuase all IP packets must have an even number
        !           154:  *  of bytes.
        !           155:  *
        !           156:  *  The last block in the returned chain will have S_DELIM set.
        !           157:  */
        !           158: int
        !           159: bround(Block *bp, int amount)
        !           160: {
        !           161:        Block *last;
        !           162:        int len, pad;
        !           163: 
        !           164:        len = 0;
        !           165:        SET(last);      /* Ken's magic */
        !           166: 
        !           167:        while(bp) {
        !           168:                len += BLEN(bp);
        !           169:                last = bp;
        !           170:                bp = bp->next;
        !           171:        }
        !           172: 
        !           173:        pad = ((len + amount) & ~amount) - len;
        !           174:        if(pad) {
        !           175:                if(last->lim - last->wptr >= pad){
        !           176:                        memset(last->wptr, 0, pad);
        !           177:                        last->wptr += pad;
        !           178:                } else {
        !           179:                        last->next = allocb(pad);
        !           180:                        last->flags &= ~S_DELIM;
        !           181:                        last = last->next;
        !           182:                        last->wptr += pad;
        !           183:                        last->flags |= S_DELIM;
        !           184:                }
        !           185:        }
        !           186: 
        !           187:        return len + pad;
        !           188: }
        !           189: 
        !           190: /*
        !           191:  *  expand a block list to be one block, len bytes long.  used by
        !           192:  *  ethernet routines.
        !           193:  */
        !           194: Block*
        !           195: expandb(Block *bp, int len)
        !           196: {
        !           197:        Block *nbp, *new;
        !           198:        int i;
        !           199:        ulong delim = 0;
        !           200: 
        !           201:        new = allocb(len);
        !           202:        if(new == 0){
        !           203:                freeb(bp);
        !           204:                return 0;
        !           205:        }
        !           206: 
        !           207:        /*
        !           208:         *  copy bytes into new block
        !           209:         */
        !           210:        for(nbp = bp; len>0 && nbp; nbp = nbp->next){
        !           211:                delim = nbp->flags & S_DELIM;
        !           212:                i = BLEN(nbp);
        !           213:                if(i > len) {
        !           214:                        memmove(new->wptr, nbp->rptr, len);
        !           215:                        new->wptr += len;
        !           216:                        break;
        !           217:                } else {
        !           218:                        memmove(new->wptr, nbp->rptr, i);
        !           219:                        new->wptr += i;
        !           220:                        len -= i;
        !           221:                }
        !           222:        }
        !           223:        if(len){
        !           224:                memset(new->wptr, 0, len);
        !           225:                new->wptr += len;
        !           226:        }
        !           227:        new->flags |= delim;
        !           228:        freeb(bp);
        !           229:        return new;
        !           230: 
        !           231: }
        !           232: 
        !           233: /*
        !           234:  *  make a copy of the first 'count' bytes of a block chain.  Use
        !           235:  *  by transport protocols.
        !           236:  */
        !           237: Block *
        !           238: copyb(Block *bp, int count)
        !           239: {
        !           240:        Block *nb, *head, **p;
        !           241:        int l;
        !           242: 
        !           243:        p = &head;
        !           244:        while(count) {
        !           245:                l = BLEN(bp);
        !           246:                if(count < l)
        !           247:                        l = count;
        !           248:                nb = allocb(l);
        !           249:                if(nb == 0)
        !           250:                        panic("copyb.1");
        !           251:                memmove(nb->wptr, bp->rptr, l);
        !           252:                nb->wptr += l;
        !           253:                count -= l;
        !           254:                if(bp->flags & S_DELIM)
        !           255:                        nb->flags |= S_DELIM;
        !           256:                *p = nb;
        !           257:                p = &nb->next;
        !           258:                bp = bp->next;
        !           259:                if(bp == 0)
        !           260:                        break;
        !           261:        }
        !           262:        if(count) {
        !           263:                nb = allocb(count);
        !           264:                if(nb == 0)
        !           265:                        panic("copyb.2");
        !           266:                memset(nb->wptr, 0, count);
        !           267:                nb->wptr += count;
        !           268:                nb->flags |= S_DELIM;
        !           269:                *p = nb;
        !           270:        }
        !           271:        if(blen(head) == 0)
        !           272:                print("copyb: zero length\n");
        !           273: 
        !           274:        return head;
        !           275: }
        !           276: 
        !           277: /*
        !           278:  *  Part 2) Queues
        !           279:  */
        !           280: 
        !           281: /*
        !           282:  *  process end line discipline
        !           283:  */
        !           284: static Streamput stputq;
        !           285: Qinfo procinfo =
        !           286: {
        !           287:        stputq,
        !           288:        nullput,
        !           289:        0,
        !           290:        0,
        !           291:        "process"
        !           292: };
        !           293: 
        !           294: /*
        !           295:  *  line disciplines that can be pushed
        !           296:  */
        !           297: static Qinfo *lds;
        !           298: 
        !           299: /*
        !           300:  *  make known a stream module and call its initialization routine, if
        !           301:  *  it has one.
        !           302:  */
        !           303: void
        !           304: newqinfo(Qinfo *qi)
        !           305: {
        !           306:        if(qi->next)
        !           307:                panic("newqinfo: already configured");
        !           308: 
        !           309:        qi->next = lds;
        !           310:        lds = qi;
        !           311:        if(qi->reset)
        !           312:                (*qi->reset)();
        !           313: }
        !           314: 
        !           315: /*
        !           316:  *  find the info structure for line discipline 'name'
        !           317:  */
        !           318: Qinfo *
        !           319: qinfofind(char *name)
        !           320: {
        !           321:        Qinfo *qi;
        !           322: 
        !           323:        if(name == 0)
        !           324:                return 0;
        !           325:        for(qi = lds; qi; qi = qi->next)
        !           326:                if(strcmp(qi->name, name)==0)
        !           327:                        return qi;
        !           328:        return 0;
        !           329: }
        !           330: 
        !           331: /*
        !           332:  *  allocate a pair of queues.  flavor them with the requested put routines.
        !           333:  *  the `QINUSE' flag on the read side is the only one used.
        !           334:  */
        !           335: static Queue *
        !           336: allocq(Qinfo *qi)
        !           337: {
        !           338:        Queue *q, *wq;
        !           339: 
        !           340:        q = smalloc(2*sizeof(Queue));
        !           341: 
        !           342:        q->flag = QINUSE;
        !           343:        q->r.p = 0;
        !           344:        q->info = qi;
        !           345:        q->put = qi->iput;
        !           346:        q->len = q->nb = 0;
        !           347:        q->ptr = 0;
        !           348:        q->rp = &q->r;
        !           349:        wq = q->other = q + 1;
        !           350: 
        !           351:        wq->flag = QINUSE;
        !           352:        wq->r.p = 0;
        !           353:        wq->info = qi;
        !           354:        wq->put = qi->oput;
        !           355:        wq->other = q;
        !           356:        wq->ptr = 0;
        !           357:        wq->len = wq->nb = 0;
        !           358:        wq->rp = &wq->r;
        !           359: 
        !           360:        return q;
        !           361: }
        !           362: 
        !           363: /*
        !           364:  *  free a queue
        !           365:  */
        !           366: static void
        !           367: freeq(Queue *q)
        !           368: {
        !           369:        Block *bp;
        !           370: 
        !           371:        q = RD(q);
        !           372:        while(bp = getq(q))
        !           373:                freeb(bp);
        !           374:        q = WR(q);
        !           375:        while(bp = getq(q))
        !           376:                freeb(bp);
        !           377:        free(RD(q));
        !           378: }
        !           379: 
        !           380: /*
        !           381:  *  flush a queue
        !           382:  */
        !           383: static void
        !           384: flushq(Queue *q)
        !           385: {
        !           386:        Block *bp;
        !           387: 
        !           388:        q = RD(q);
        !           389:        while(bp = getq(q))
        !           390:                freeb(bp);
        !           391:        q = WR(q);
        !           392:        while(bp = getq(q))
        !           393:                freeb(bp);
        !           394: }
        !           395: 
        !           396: /*
        !           397:  *  push a queue onto a stream referenced by the proc side write q
        !           398:  */
        !           399: Queue *
        !           400: pushq(Stream* s, Qinfo *qi)
        !           401: {
        !           402:        Queue *q;
        !           403:        Queue *nq;
        !           404: 
        !           405:        q = RD(s->procq);
        !           406: 
        !           407:        /*
        !           408:         *  make the new queue
        !           409:         */
        !           410:        nq = allocq(qi);
        !           411: 
        !           412:        /*
        !           413:         *  push
        !           414:         */
        !           415:        qlock(s);
        !           416:        RD(nq)->next = q;
        !           417:        RD(WR(q)->next)->next = RD(nq);
        !           418:        WR(nq)->next = WR(q)->next;
        !           419:        WR(q)->next = WR(nq);
        !           420:        qunlock(s);
        !           421: 
        !           422:        if(qi->open)
        !           423:                (*qi->open)(RD(nq), s);
        !           424: 
        !           425:        return WR(nq)->next;
        !           426: }
        !           427: 
        !           428: /*
        !           429:  *  pop off the top line discipline
        !           430:  */
        !           431: static void
        !           432: popq(Stream *s)
        !           433: {
        !           434:        Queue *q;
        !           435: 
        !           436:        if(waserror()){
        !           437:                qunlock(s);
        !           438:                nexterror();
        !           439:        }
        !           440:        qlock(s);
        !           441:        if(s->procq->next == WR(s->devq))
        !           442:                error(Ebadld);
        !           443:        q = s->procq->next;
        !           444:        if(q->info->close)
        !           445:                (*q->info->close)(RD(q));
        !           446:        s->procq->next = q->next;
        !           447:        RD(q->next)->next = RD(s->procq);
        !           448:        qunlock(s);
        !           449:        freeq(q);
        !           450: }
        !           451: 
        !           452: /*
        !           453:  *  add a block (or list of blocks) to the end of a queue.  return true
        !           454:  *  if one of the blocks contained a delimiter. 
        !           455:  */
        !           456: int
        !           457: putq(Queue *q, Block *bp)
        !           458: {
        !           459:        int delim;
        !           460: 
        !           461:        lock(q);
        !           462:        if(q->first)
        !           463:                q->last->next = bp;
        !           464:        else
        !           465:                q->first = bp;
        !           466:        q->len += BLEN(bp);
        !           467:        q->nb++;
        !           468:        delim = bp->flags & S_DELIM;
        !           469:        while(bp->next) {
        !           470:                bp = bp->next;
        !           471:                q->len += BLEN(bp);
        !           472:                q->nb++;
        !           473:                delim |= bp->flags & S_DELIM;
        !           474:        }
        !           475:        q->last = bp;
        !           476:        if(q->len >= Streamhi || q->nb >= Streambhi)
        !           477:                q->flag |= QHIWAT;
        !           478:        unlock(q);
        !           479:        return delim;
        !           480: }
        !           481: 
        !           482: int
        !           483: putb(Blist *q, Block *bp)
        !           484: {
        !           485:        int delim;
        !           486: 
        !           487:        if(q->first)
        !           488:                q->last->next = bp;
        !           489:        else
        !           490:                q->first = bp;
        !           491:        q->len += BLEN(bp);
        !           492:        delim = bp->flags & S_DELIM;
        !           493:        while(bp->next) {
        !           494:                bp = bp->next;
        !           495:                q->len += BLEN(bp);
        !           496:                delim |= bp->flags & S_DELIM;
        !           497:        }
        !           498:        q->last = bp;
        !           499:        return delim;
        !           500: }
        !           501: 
        !           502: /*
        !           503:  *  add a block to the start of a queue 
        !           504:  */
        !           505: void
        !           506: putbq(Blist *q, Block *bp)
        !           507: {
        !           508:        lock(q);
        !           509:        if(q->first)
        !           510:                bp->next = q->first;
        !           511:        else
        !           512:                q->last = bp;
        !           513:        q->first = bp;
        !           514:        q->len += BLEN(bp);
        !           515:        q->nb++;
        !           516:        unlock(q);
        !           517: }
        !           518: 
        !           519: /*
        !           520:  *  remove the first block from a queue
        !           521:  */
        !           522: Block *
        !           523: getq(Queue *q)
        !           524: {
        !           525:        Block *bp;
        !           526: 
        !           527:        lock(q);
        !           528:        bp = q->first;
        !           529:        if(bp) {
        !           530:                q->first = bp->next;
        !           531:                if(q->first == 0)
        !           532:                        q->last = 0;
        !           533:                q->len -= BLEN(bp);
        !           534:                q->nb--;
        !           535:                if((q->flag&QHIWAT) && q->len<Streamhi/2 && q->nb<Streambhi/2 &&q->other){
        !           536:                        wakeup(q->other->next->other->rp);
        !           537:                        q->flag &= ~QHIWAT;
        !           538:                }
        !           539:                bp->next = 0;
        !           540:        }
        !           541:        unlock(q);
        !           542:        return bp;
        !           543: }
        !           544: 
        !           545: /*
        !           546:  *  grab all the blocks in a queue
        !           547:  */
        !           548: Block *
        !           549: grabq(Queue *q)
        !           550: {
        !           551:        Block *bp;
        !           552: 
        !           553:        lock(q);
        !           554:        bp = q->first;
        !           555:        if(bp){
        !           556:                q->first = 0;
        !           557:                q->last = 0;
        !           558:                q->len = 0;
        !           559:                q->nb = 0;
        !           560:                if(q->flag&QHIWAT){
        !           561:                        wakeup(q->other->next->other->rp);
        !           562:                        q->flag &= ~QHIWAT;
        !           563:                }
        !           564:        }
        !           565:        unlock(q);
        !           566:        return bp;
        !           567: }
        !           568: 
        !           569: /*
        !           570:  *  remove the first block from a list of blocks
        !           571:  */
        !           572: Block *
        !           573: getb(Blist *q)
        !           574: {
        !           575:        Block *bp;
        !           576: 
        !           577:        bp = q->first;
        !           578:        if(bp) {
        !           579:                q->first = bp->next;
        !           580:                if(q->first == 0)
        !           581:                        q->last = 0;
        !           582:                q->len -= BLEN(bp);
        !           583:                bp->next = 0;
        !           584:        }
        !           585:        return bp;
        !           586: }
        !           587: 
        !           588: 
        !           589: /*
        !           590:  *  put a block into the bit bucket
        !           591:  */
        !           592: void
        !           593: nullput(Queue *q, Block *bp)
        !           594: {
        !           595:        USED(q);
        !           596:        if(bp->type == M_HANGUP)
        !           597:                freeb(bp);
        !           598:        else {
        !           599:                freeb(bp);
        !           600:                error(Ehungup);
        !           601:        }
        !           602: }
        !           603: 
        !           604: /*
        !           605:  *  Part 3) Streams
        !           606:  */
        !           607: 
        !           608: /*
        !           609:  *  the per stream directory structure
        !           610:  */
        !           611: Dirtab streamdir[]={
        !           612:        "data",         {Sdataqid},     0,                      0600,
        !           613:        "ctl",          {Sctlqid},      0,                      0600,
        !           614: };
        !           615: 
        !           616: /*
        !           617:  *  hash buckets containing all streams
        !           618:  */
        !           619: enum
        !           620: {
        !           621:        Nbits=  5,
        !           622:        Nhash=  1<<Nbits,
        !           623:        Nmask=  Nhash-1,
        !           624: };
        !           625: typedef struct Sthash Sthash;
        !           626: struct Sthash
        !           627: {
        !           628:        QLock;
        !           629:        Stream  *s;
        !           630: };
        !           631: static Sthash ht[Nhash];
        !           632: 
        !           633: static void    hangup(Stream*);
        !           634: 
        !           635: /*
        !           636:  *  A stream device consists of the contents of streamdir plus
        !           637:  *  any directory supplied by the actual device.
        !           638:  *
        !           639:  *  values of s:
        !           640:  *     0 to ntab-1 apply to the auxiliary directory.
        !           641:  *     ntab to ntab+Shighqid-Slowqid+1 apply to streamdir.
        !           642:  */
        !           643: int
        !           644: streamgen(Chan *c, Dirtab *tab, int ntab, int s, Dir *dp)
        !           645: {
        !           646:        if(s < ntab)
        !           647:                tab = &tab[s];
        !           648:        else if(s < ntab + Shighqid - Slowqid + 1)
        !           649:                tab = &streamdir[s - ntab];
        !           650:        else
        !           651:                return -1;
        !           652: 
        !           653:        devdir(c, (Qid){STREAMQID(STREAMID(c->qid.path),tab->qid.path), 0}, 
        !           654:                tab->name, tab->length, eve, tab->perm, dp);
        !           655:        return 1;
        !           656: }
        !           657: 
        !           658: /*
        !           659:  *  return a hash bucket for a stream
        !           660:  */
        !           661: static Sthash*
        !           662: hash(int type, int dev, int id)
        !           663: {
        !           664:        return &ht[(type*7*7 + dev*7 + id) & Nmask];
        !           665: }
        !           666: 
        !           667: /*
        !           668:  *  create a new stream, if noopen is non-zero, don't increment the open count
        !           669:  */
        !           670: Stream *
        !           671: streamnew(ushort type, ushort dev, ulong id, Qinfo *qi, int noopen)
        !           672: {
        !           673:        Stream *s;
        !           674:        Queue *q;
        !           675:        Sthash *hb;
        !           676: 
        !           677:        hb = hash(type, dev, id);
        !           678: 
        !           679:        /*
        !           680:         *  if the stream already exists, just increment the reference counts.
        !           681:         */
        !           682:        qlock(hb);
        !           683:        for(s = hb->s; s; s = s->next) {
        !           684:                if(s->type == type && s->dev == dev && s->id == id){
        !           685:                        s->inuse++;
        !           686:                        qunlock(hb);
        !           687:                        if(noopen == 0){
        !           688:                                qlock(s);
        !           689:                                s->opens++;
        !           690:                                qunlock(s);
        !           691:                        }
        !           692:                        return s;
        !           693:                }
        !           694:        }
        !           695: 
        !           696:        /*
        !           697:         *  create and init a new stream
        !           698:         */
        !           699:        s = smalloc(sizeof(Stream));
        !           700:        s->inuse = 1;
        !           701:        s->type = type;
        !           702:        s->dev = dev;
        !           703:        s->id = id;
        !           704:        s->err = 0;
        !           705:        s->hread = 0;
        !           706:        s->next = hb->s;
        !           707:        hb->s = s;
        !           708: 
        !           709:        /*
        !           710:         *  The ordering of these 2 instructions is very important.
        !           711:         *  It makes sure we finish the stream initialization before
        !           712:         *  anyone else can access it.
        !           713:         */
        !           714:        qlock(s);
        !           715:        qunlock(hb);
        !           716: 
        !           717:        if(waserror()){
        !           718:                qunlock(s);
        !           719:                streamclose1(s);
        !           720:                nexterror();
        !           721:        }
        !           722: 
        !           723:        /*
        !           724:         *  hang a device and process q off the stream
        !           725:         */
        !           726:        if(noopen)
        !           727:                s->opens = 0;
        !           728:        else
        !           729:                s->opens = 1;
        !           730:        q = allocq(&procinfo);
        !           731:        WR(q)->ptr = s;
        !           732:        RD(q)->ptr = s;
        !           733:        s->procq = WR(q);
        !           734:        q = allocq(qi);
        !           735:        s->devq = RD(q);
        !           736:        WR(s->procq)->next = WR(s->devq);
        !           737:        RD(s->procq)->next = 0;
        !           738:        RD(s->devq)->next = RD(s->procq);
        !           739:        WR(s->devq)->next = 0;
        !           740: 
        !           741:        if(qi->open)
        !           742:                (*qi->open)(RD(s->devq), s);
        !           743: 
        !           744:        qunlock(s);
        !           745:        poperror();
        !           746:        return s;
        !           747: }
        !           748: 
        !           749: /*
        !           750:  *  Associate a stream with a channel
        !           751:  */
        !           752: void
        !           753: streamopen(Chan *c, Qinfo *qi)
        !           754: {
        !           755:        c->stream = streamnew(c->type, c->dev, STREAMID(c->qid.path), qi, 0);
        !           756: }
        !           757: 
        !           758: /*
        !           759:  *  Enter a stream only if the stream exists and is open.  Increment the
        !           760:  *  reference count so it can't disappear under foot.
        !           761:  *
        !           762:  *  Return -1 if the stream no longer exists or is not opened.
        !           763:  */
        !           764: int
        !           765: streamenter(Stream *s)
        !           766: {
        !           767:        Sthash *hb;
        !           768:        Stream *ns;
        !           769: 
        !           770:        hb = hash(s->type, s->dev, s->id);
        !           771:        qlock(hb);
        !           772:        for(ns = hb->s; ns; ns = ns->next)
        !           773:                if(s->type == ns->type && s->dev == ns->dev && s->id == ns->id){
        !           774:                        s->inuse++;
        !           775:                        qunlock(hb);
        !           776:                        if(s->opens == 0){
        !           777:                                streamexit(s);
        !           778:                                return -1;
        !           779:                        }
        !           780:                        return 0;
        !           781:                }
        !           782:        qunlock(hb);
        !           783:        return -1;
        !           784: }
        !           785: 
        !           786: /*
        !           787:  *  Decrement the reference count on a stream.  If the count is
        !           788:  *  zero, free the stream.
        !           789:  */
        !           790: void
        !           791: streamexit(Stream *s)
        !           792: {
        !           793:        Queue *q;
        !           794:        Queue *nq;
        !           795:        Sthash *hb;
        !           796:        Stream **l, *ns;
        !           797: 
        !           798:        hb = hash(s->type, s->dev, s->id);
        !           799:        qlock(hb);
        !           800:        if(s->inuse-- == 1){
        !           801:                if(s->opens != 0)
        !           802:                        panic("streamexit %d %s\n", s->opens, s->devq->info->name);
        !           803: 
        !           804:                /*
        !           805:                 *  ascend the stream freeing the queues
        !           806:                 */
        !           807:                for(q = s->devq; q; q = nq){
        !           808:                        nq = q->next;
        !           809:                        freeq(q);
        !           810:                }
        !           811:                if(s->err)
        !           812:                        freeb(s->err);
        !           813: 
        !           814:                /*
        !           815:                 *  unchain it from the hash bucket and free
        !           816:                 */
        !           817:                l = &hb->s;
        !           818:                for(ns = hb->s; ns; ns = ns->next){
        !           819:                        if(s == ns){
        !           820:                                *l = s->next;
        !           821:                                break;
        !           822:                        }
        !           823:                        l = &ns->next;
        !           824:                }
        !           825:                free(s);
        !           826:        }
        !           827:        qunlock(hb);
        !           828: }
        !           829: 
        !           830: /*
        !           831:  *  nail down a stream so that it can't be closed
        !           832:  */
        !           833: void
        !           834: naildownstream(Stream *s)
        !           835: {
        !           836:        s->opens++;
        !           837:        s->inuse++;
        !           838: }
        !           839: 
        !           840: /*
        !           841:  *  Decrement the open count.  When it goes to zero, call the close
        !           842:  *  routines for each queue in the stream.
        !           843:  */
        !           844: int
        !           845: streamclose1(Stream *s)
        !           846: {
        !           847:        Queue *q, *nq;
        !           848:        int rv;
        !           849: 
        !           850:        /*
        !           851:         *  decrement the open count
        !           852:         */
        !           853:        qlock(s);
        !           854:        if(s->opens-- == 1){
        !           855:                /*
        !           856:                 *  descend the stream closing the queues
        !           857:                 */
        !           858:                for(q = s->procq; q; q = q->next){
        !           859:                        if(!waserror()){
        !           860:                                if(q->info->close)
        !           861:                                        (*q->info->close)(q->other);
        !           862:                                poperror();
        !           863:                        }
        !           864:                        WR(q)->put = nullput;
        !           865: 
        !           866:                        /*
        !           867:                         *  this may be 2 streams joined device end to device end
        !           868:                         */
        !           869:                        if(q == s->devq->other)
        !           870:                                break;
        !           871:                }
        !           872:        
        !           873:                /*
        !           874:                 *  ascend the stream flushing the queues
        !           875:                 */
        !           876:                for(q = s->devq; q; q = nq){
        !           877:                        nq = q->next;
        !           878:                        flushq(q);
        !           879:                }
        !           880:        }
        !           881:        rv = s->opens;
        !           882:        qunlock(s);
        !           883: 
        !           884:        /*
        !           885:         *  leave it and free it
        !           886:         */
        !           887:        streamexit(s);
        !           888:        return rv;
        !           889: }
        !           890: int
        !           891: streamclose(Chan *c)
        !           892: {
        !           893:        /*
        !           894:         *  if no stream, ignore it
        !           895:         */
        !           896:        if(!c->stream)
        !           897:                return 0;
        !           898:        return streamclose1(c->stream);
        !           899: }
        !           900: 
        !           901: /*
        !           902:  *  put a block to be read into the queue.  wakeup any waiting reader
        !           903:  */
        !           904: void
        !           905: stputq(Queue *q, Block *bp)
        !           906: {
        !           907:        int awaken;
        !           908:        Stream *s;
        !           909: 
        !           910:        if(bp->type == M_HANGUP){
        !           911:                s = q->ptr;
        !           912:                if(bp->rptr<bp->wptr && s->err==0)
        !           913:                        s->err = bp;
        !           914:                else
        !           915:                        freeb(bp);
        !           916:                q->flag |= QHUNGUP;
        !           917:                q->other->flag |= QHUNGUP;
        !           918:                wakeup(q->other->rp);
        !           919:                awaken = 1;
        !           920:        } else {
        !           921:                lock(q);
        !           922:                if(q->first)
        !           923:                        q->last->next = bp;
        !           924:                else
        !           925:                        q->first = bp;
        !           926:                q->len += BLEN(bp);
        !           927:                q->nb++;
        !           928:                awaken = bp->flags & S_DELIM;
        !           929:                while(bp->next) {
        !           930:                        bp = bp->next;
        !           931:                        q->len += BLEN(bp);
        !           932:                        q->nb++;
        !           933:                        awaken |= bp->flags & S_DELIM;
        !           934:                }
        !           935:                q->last = bp;
        !           936:                if(q->len >= Streamhi || q->nb >= Streambhi){
        !           937:                        q->flag |= QHIWAT;
        !           938:                        awaken = 1;
        !           939:                }
        !           940:                unlock(q);
        !           941:        }
        !           942:        if(awaken)
        !           943:                wakeup(q->rp);
        !           944: }
        !           945: 
        !           946: /*
        !           947:  *  return the stream id
        !           948:  */
        !           949: long
        !           950: streamctlread(Chan *c, void *vbuf, long n)
        !           951: {
        !           952:        char *buf = vbuf;
        !           953:        char num[32];
        !           954:        Stream *s;
        !           955: 
        !           956:        s = c->stream;
        !           957:        if(STREAMTYPE(c->qid.path) == Sctlqid){
        !           958:                sprint(num, "%d", s->id);
        !           959:                return readstr(c->offset, buf, n, num);
        !           960:        } else {
        !           961:                if(CHDIR & c->qid.path)
        !           962:                        return devdirread(c, vbuf, n, 0, 0, streamgen);
        !           963:                else
        !           964:                        panic("streamctlread");
        !           965:        }
        !           966:        return 0;       /* not reached */
        !           967: }
        !           968: 
        !           969: /*
        !           970:  *  return true if there is an output buffer available
        !           971:  */
        !           972: static int
        !           973: isinput(void *x)
        !           974: {
        !           975:        Queue *q;
        !           976: 
        !           977:        q = (Queue *)x;
        !           978:        return (q->flag&QHUNGUP) || q->first!=0;
        !           979: }
        !           980: 
        !           981: /*
        !           982:  *  read until we fill the buffer or until a DELIM is encountered
        !           983:  */
        !           984: long
        !           985: streamread(Chan *c, void *vbuf, long n)
        !           986: {
        !           987:        Block *bp;
        !           988:        Block *tofree;
        !           989:        Stream *s;
        !           990:        Queue *q;
        !           991:        int left, i;
        !           992:        uchar *buf = vbuf;
        !           993: 
        !           994:        if(STREAMTYPE(c->qid.path) != Sdataqid)
        !           995:                return streamctlread(c, vbuf, n);
        !           996: 
        !           997:        /*
        !           998:         *  one reader at a time
        !           999:         */
        !          1000:        s = c->stream;
        !          1001:        left = n;
        !          1002:        qlock(&s->rdlock);
        !          1003:        tofree = 0;
        !          1004:        q = 0;
        !          1005:        if(waserror()){
        !          1006:                /*
        !          1007:                 *  put any partially read message back into the
        !          1008:                 *  queue
        !          1009:                 */
        !          1010:                while(tofree){
        !          1011:                        bp = tofree;
        !          1012:                        tofree = bp->next;
        !          1013:                        bp->next = 0;
        !          1014:                        putbq(q, bp);
        !          1015:                }
        !          1016:                qunlock(&s->rdlock);
        !          1017:                nexterror();
        !          1018:        }
        !          1019: 
        !          1020:        /*
        !          1021:         *  sleep till data is available
        !          1022:         */
        !          1023:        q = RD(s->procq);
        !          1024:        while(left){
        !          1025:                bp = getq(q);
        !          1026:                if(bp == 0){
        !          1027:                        if(q->flag & QHUNGUP){
        !          1028:                                if(s->err)
        !          1029:                                        error((char*)s->err->rptr);
        !          1030:                                else if(s->hread++<3)
        !          1031:                                        break;
        !          1032:                                else
        !          1033:                                        error(Ehungup);
        !          1034:                        }
        !          1035:                        q->rp = &q->r;
        !          1036:                        sleep(q->rp, isinput, (void *)q);
        !          1037:                        continue;
        !          1038:                }
        !          1039: 
        !          1040:                i = BLEN(bp);
        !          1041:                if(i <= left){
        !          1042:                        memmove(buf, bp->rptr, i);
        !          1043:                        left -= i;
        !          1044:                        buf += i;
        !          1045:                        bp->next = tofree;
        !          1046:                        tofree = bp;
        !          1047:                        if(bp->flags & S_DELIM)
        !          1048:                                break;
        !          1049:                } else {
        !          1050:                        memmove(buf, bp->rptr, left);
        !          1051:                        bp->rptr += left;
        !          1052:                        putbq(q, bp);
        !          1053:                        left = 0;
        !          1054:                }
        !          1055:        }
        !          1056: 
        !          1057:        /*
        !          1058:         *  free completely read blocks
        !          1059:         */
        !          1060:        if(tofree)
        !          1061:                freeb(tofree);
        !          1062: 
        !          1063:        qunlock(&s->rdlock);
        !          1064:        poperror();
        !          1065:        return n - left;        
        !          1066: }
        !          1067: 
        !          1068: /*
        !          1069:  *  look for an instance of the line discipline `name' on
        !          1070:  *  the stream `s'
        !          1071:  */
        !          1072: void
        !          1073: qlook(Stream *s, char *name)
        !          1074: {
        !          1075:        Queue *q;
        !          1076: 
        !          1077:        for(q = s->procq; q; q = q->next){
        !          1078:                if(strcmp(q->info->name, name) == 0)
        !          1079:                        return;
        !          1080: 
        !          1081:                /*
        !          1082:                 *  this may be 2 streams joined device end to device end
        !          1083:                 */
        !          1084:                if(q == s->devq->other)
        !          1085:                        break;
        !          1086:        }
        !          1087:        error(Ebadarg);
        !          1088: }
        !          1089: 
        !          1090: /*
        !          1091:  *  Handle a ctl request.  Streamwide requests are:
        !          1092:  *
        !          1093:  *     hangup                  -- send an M_HANGUP up the stream
        !          1094:  *     push ldname             -- push the line discipline named ldname
        !          1095:  *     pop                     -- pop a line discipline
        !          1096:  *     look ldname             -- look for a line discipline
        !          1097:  *
        !          1098:  *  This routing is entered with s->wrlock'ed and must unlock.
        !          1099:  */
        !          1100: static long
        !          1101: streamctlwrite(Chan *c, void *a, long n)
        !          1102: {
        !          1103:        Qinfo *qi;
        !          1104:        Block *bp;
        !          1105:        Stream *s;
        !          1106: 
        !          1107:        if(STREAMTYPE(c->qid.path) != Sctlqid)
        !          1108:                panic("streamctlwrite %lux", c->qid);
        !          1109:        s = c->stream;
        !          1110: 
        !          1111:        /*
        !          1112:         *  package
        !          1113:         */
        !          1114:        bp = allocb(n+1);
        !          1115:        memmove(bp->wptr, a, n);
        !          1116:        bp->wptr[n] = 0;
        !          1117:        bp->wptr += n + 1;
        !          1118: 
        !          1119:        /*
        !          1120:         *  check for standard requests
        !          1121:         */
        !          1122:        if(streamparse("hangup", bp)){
        !          1123:                hangup(s);
        !          1124:                freeb(bp);
        !          1125:        } else if(streamparse("push", bp)){
        !          1126:                qi = qinfofind((char *)bp->rptr);
        !          1127:                if(qi == 0)
        !          1128:                        error(Ebadld);
        !          1129:                pushq(s, qi);
        !          1130:                freeb(bp);
        !          1131:        } else if(streamparse("pop", bp)){
        !          1132:                popq(s);
        !          1133:                freeb(bp);
        !          1134:        } else if(streamparse("look", bp)){
        !          1135:                qlook(s, (char *)bp->rptr);
        !          1136:                freeb(bp);
        !          1137:        } else {
        !          1138:                bp->type = M_CTL;
        !          1139:                bp->flags |= S_DELIM;
        !          1140:                PUTNEXT(s->procq, bp);
        !          1141:        }
        !          1142: 
        !          1143:        return n;
        !          1144: }
        !          1145: 
        !          1146: /*
        !          1147:  *  wait till there's room in the next stream
        !          1148:  */
        !          1149: static int
        !          1150: notfull(void *arg)
        !          1151: {
        !          1152:        return !QFULL((Queue *)arg);
        !          1153: }
        !          1154: void
        !          1155: flowctl(Queue *q, Block *bp)
        !          1156: {
        !          1157:        if(bp->type != M_HANGUP){
        !          1158:                qlock(&q->rlock);
        !          1159:                if(waserror()){
        !          1160:                        qunlock(&q->rlock);
        !          1161:                        freeb(bp);
        !          1162:                        nexterror();
        !          1163:                }
        !          1164:                q->rp = &q->r;
        !          1165:                sleep(q->rp, notfull, q->next);
        !          1166:                qunlock(&q->rlock);
        !          1167:                poperror();
        !          1168:        }
        !          1169:        PUTNEXT(q, bp);
        !          1170: }
        !          1171: 
        !          1172: /*
        !          1173:  *  send the request as a single delimited block
        !          1174:  */
        !          1175: long
        !          1176: streamwrite(Chan *c, void *a, long n, int docopy)
        !          1177: {
        !          1178:        Stream *s;
        !          1179:        Queue *q;
        !          1180:        long rem;
        !          1181:        int i;
        !          1182:        Block *bp;
        !          1183:        char *va;
        !          1184: 
        !          1185:        /*
        !          1186:         *  docopy will get used if I ever figure out when to avoid copying
        !          1187:         *  data. -- presotto
        !          1188:         */
        !          1189:        USED(docopy);
        !          1190: 
        !          1191:        s = c->stream;
        !          1192: 
        !          1193:        /*
        !          1194:         *  decode the qid
        !          1195:         */
        !          1196:        if(STREAMTYPE(c->qid.path) != Sdataqid)
        !          1197:                return streamctlwrite(c, a, n);
        !          1198: 
        !          1199:        /*
        !          1200:         *  No writes allowed on hungup channels
        !          1201:         */
        !          1202:        q = s->procq;
        !          1203:        if(q->other->flag & QHUNGUP){
        !          1204:                if(s->err)
        !          1205:                        error((char*)(s->err->rptr));
        !          1206:                else
        !          1207:                        error(Ehungup);
        !          1208:        }
        !          1209: 
        !          1210:        /*
        !          1211:         *  Write the message using blocks <= Streamhi bytes longs
        !          1212:         */
        !          1213:        va = a;
        !          1214:        rem = n;
        !          1215:        for(;;){
        !          1216:                if(rem > Streamhi)
        !          1217:                        i = Streamhi;
        !          1218:                else
        !          1219:                        i = rem;
        !          1220:                bp = allocb(i);
        !          1221:                memmove(bp->wptr, va, i);
        !          1222:                bp->wptr += i;
        !          1223:                va += i;
        !          1224:                rem -= i;
        !          1225:                if(rem > 0){
        !          1226:                        FLOWCTL(q, bp);
        !          1227:                } else {
        !          1228:                        bp->flags |= S_DELIM;
        !          1229:                        FLOWCTL(q, bp);
        !          1230:                        break;
        !          1231:                }
        !          1232:        }
        !          1233:        return n;
        !          1234: }
        !          1235: 
        !          1236: /*
        !          1237:  *  stat a stream.  the length is the number of bytes up to the
        !          1238:  *  first delimiter.
        !          1239:  */
        !          1240: void
        !          1241: streamstat(Chan *c, char *db, char *name, long perm)
        !          1242: {
        !          1243:        Dir dir;
        !          1244:        Stream *s;
        !          1245:        Queue *q;
        !          1246:        Block *bp;
        !          1247:        long n;
        !          1248: 
        !          1249:        s = c->stream;
        !          1250:        n = 0;
        !          1251:        if(s) {
        !          1252:                q = RD(s->procq);
        !          1253:                if(q->flag & QHUNGUP)
        !          1254:                        error(Ehungup);
        !          1255:                lock(q);
        !          1256:                for(bp=q->first; bp; bp = bp->next){
        !          1257:                        n += BLEN(bp);
        !          1258:                        if(bp->flags&S_DELIM)
        !          1259:                                break;
        !          1260:                }
        !          1261:                unlock(q);
        !          1262:        }
        !          1263: 
        !          1264:        devdir(c, c->qid, name, n, eve, perm, &dir);
        !          1265:        convD2M(&dir, db);
        !          1266: }
        !          1267: 
        !          1268: /*
        !          1269:  *  send a hangup up a stream
        !          1270:  */
        !          1271: static void
        !          1272: hangup(Stream *s)
        !          1273: {
        !          1274:        Block *bp;
        !          1275: 
        !          1276:        bp = allocb(0);
        !          1277:        bp->type = M_HANGUP;
        !          1278:        if(s->devq && s->devq->put)
        !          1279:                (*s->devq->put)(s->devq, bp);
        !          1280:        else
        !          1281:                freeb(bp);
        !          1282: }
        !          1283: 
        !          1284: /*
        !          1285:  *  parse a string and return a pointer to the second element if the 
        !          1286:  *  first matches name.  bp->rptr will be updated to point to the
        !          1287:  *  second element.
        !          1288:  *
        !          1289:  *  return 0 if no match.
        !          1290:  *
        !          1291:  *  it is assumed that the block data is null terminated.  streamwrite
        !          1292:  *  guarantees this.
        !          1293:  */
        !          1294: int
        !          1295: streamparse(char *name, Block *bp)
        !          1296: {
        !          1297:        int len;
        !          1298: 
        !          1299:        len = strlen(name);
        !          1300:        if(BLEN(bp) < len)
        !          1301:                return 0;
        !          1302:        if(strncmp(name, (char *)bp->rptr, len)==0){
        !          1303:                if(bp->rptr[len] == ' ')
        !          1304:                        bp->rptr += len+1;
        !          1305:                else if(bp->rptr[len])
        !          1306:                        return 0;
        !          1307:                else
        !          1308:                        bp->rptr += len;
        !          1309:                while(*bp->rptr==' ' && bp->wptr>bp->rptr)
        !          1310:                        bp->rptr++;
        !          1311:                return 1;
        !          1312:        }
        !          1313:        return 0;
        !          1314: }
        !          1315: 
        !          1316: /*
        !          1317:  *  like andrew's getmfields but no hidden state
        !          1318:  */
        !          1319: int
        !          1320: getfields(char *lp, char **fields, int n, char *sep)
        !          1321: {
        !          1322:        int i;
        !          1323: 
        !          1324:        for(i=0; lp && *lp && i<n; i++){
        !          1325:                while(*lp && strchr(sep, *lp) != 0)
        !          1326:                        *lp++=0;
        !          1327:                if(*lp == 0)
        !          1328:                        break;
        !          1329:                fields[i]=lp;
        !          1330:                while(*lp && strchr(sep, *lp) == 0)
        !          1331:                        lp++;
        !          1332:        }
        !          1333:        return i;
        !          1334: }
        !          1335: 
        !          1336: static Streamopen      permstopen;
        !          1337: static Streamput       permstput;
        !          1338: 
        !          1339: /*
        !          1340:  *  pushing this line discipline makes the stream unclosable, i.e., always there
        !          1341:  */
        !          1342: Qinfo perminfo =
        !          1343: {
        !          1344:        permstput,
        !          1345:        permstput,
        !          1346:        permstopen,
        !          1347:        0,
        !          1348:        "permanent"
        !          1349: };
        !          1350: 
        !          1351: static void
        !          1352: permstopen(Queue *q, Stream *s)
        !          1353: {
        !          1354:        USED(q);
        !          1355:        s->opens++;
        !          1356:        s->inuse++;
        !          1357: }
        !          1358: 
        !          1359: static void
        !          1360: permstput(Queue *q, Block *bp)
        !          1361: {
        !          1362:        PUTNEXT(q, bp);
        !          1363: }

unix.superglobalmegacorp.com

This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.