Annotation of lucent/sys/src/9/port/stream.c, revision 1.1.1.1

1.1       root        1: #include       "u.h"
                      2: #include       "../port/lib.h"
                      3: #include       "mem.h"
                      4: #include       "dat.h"
                      5: #include       "fns.h"
                      6: #include       "io.h"
                      7: #include       "../port/error.h"
                      8: #include       "devtab.h"
                      9: 
                     10: /*
                     11:  *  Part 1) Blocks
                     12:  */
                     13: 
                     14: /*
                     15:  *  Allocate a block.  Put the data portion at the end of the smalloc'd
                     16:  *  chunk so that it can easily grow from the front to add protocol
                     17:  *  headers.  Thank Larry Peterson for the suggestion.
                     18:  */
                     19: Block *
                     20: allocb(ulong size)
                     21: {
                     22:        Block *bp;
                     23:        uchar *base, *lim;
                     24:        int n;
                     25: 
                     26:        bp = smalloc(sizeof(Block)+size);
                     27: 
                     28:        base = (uchar*)bp + sizeof(Block);
                     29:        n = msize(bp);
                     30:        lim = (uchar*)bp + n;
                     31:        n -= size + sizeof(Block);
                     32:        if(n > 0)
                     33:                memset(lim - n, 0, n);
                     34:        bp->wptr = bp->rptr = lim - size;
                     35:        bp->base = base;
                     36:        bp->lim = lim;
                     37:        bp->flags = 0;
                     38:        bp->next = 0;
                     39:        bp->list = 0;
                     40:        bp->type = M_DATA;
                     41:        bp->pc = getcallerpc(((uchar*)&size) - sizeof(size));
                     42:        return bp;
                     43: }
                     44: 
                     45: /*
                     46:  *  Free a block (or list of blocks).  Poison its pointers so that
                     47:  *  someone trying to access it after freeing will cause a panic.
                     48:  */
                     49: void
                     50: freeb(Block *bp)
                     51: {
                     52:        Block *next;
                     53: 
                     54:        while(bp){
                     55:                bp->rptr = 0;
                     56:                bp->wptr = 0;
                     57:                next = bp->next;
                     58:                free(bp);
                     59:                bp = next;      
                     60:        }
                     61: }
                     62: 
                     63: /*
                     64:  *  Pad a block to the front with n bytes.  This is used to add protocol
                     65:  *  headers to the front of blocks.
                     66:  */
                     67: Block *
                     68: padb(Block *bp, int n)
                     69: {
                     70:        Block *nbp;
                     71: 
                     72:        if(bp->base && bp->rptr-bp->base>=n){
                     73:                bp->rptr -= n;
                     74:                return bp;
                     75:        } else {
                     76:                nbp = allocb(n);
                     77:                nbp->wptr = nbp->lim;
                     78:                nbp->rptr = nbp->wptr - n;
                     79:                nbp->next = bp;
                     80:                return nbp;
                     81:        }
                     82: } 
                     83: 
                     84: /*
                     85:  *  make sure the first block has n bytes
                     86:  */
                     87: Block *
                     88: pullup(Block *bp, int n)
                     89: {
                     90:        Block *nbp;
                     91:        int i;
                     92: 
                     93:        /*
                     94:         *  this should almost always be true, the rest it
                     95:         *  just for to avoid every caller checking.
                     96:         */
                     97:        if(BLEN(bp) >= n)
                     98:                return bp;
                     99: 
                    100:        /*
                    101:         *  if not enough room in the first block,
                    102:         *  add another to the front of the list.
                    103:         */
                    104:        if(bp->lim - bp->rptr < n){
                    105:                nbp = allocb(n);
                    106:                nbp->next = bp;
                    107:                bp = nbp;
                    108:        }
                    109: 
                    110:        /*
                    111:         *  copy bytes from the trailing blocks into the first
                    112:         */
                    113:        n -= BLEN(bp);
                    114:        while(nbp = bp->next){
                    115:                i = BLEN(nbp);
                    116:                if(i >= n) {
                    117:                        memmove(bp->wptr, nbp->rptr, n);
                    118:                        bp->wptr += n;
                    119:                        nbp->rptr += n;
                    120:                        return bp;
                    121:                } else {
                    122:                        memmove(bp->wptr, nbp->rptr, i);
                    123:                        bp->wptr += i;
                    124:                        bp->next = nbp->next;
                    125:                        nbp->next = 0;
                    126:                        freeb(nbp);
                    127:                        n -= i;
                    128:                }
                    129:        }
                    130:        freeb(bp);
                    131:        return 0;
                    132: }
                    133: 
                    134: /*
                    135:  *  return the number of data bytes of a list of blocks
                    136:  */
                    137: int
                    138: blen(Block *bp)
                    139: {
                    140:        int len;
                    141: 
                    142:        len = 0;
                    143:        while(bp) {
                    144:                len += BLEN(bp);
                    145:                bp = bp->next;
                    146:        }
                    147: 
                    148:        return len;
                    149: }
                    150: 
                    151: /*
                    152:  *  round a block chain to some even number of bytes.  Used
                    153:  *  by devip.c becuase all IP packets must have an even number
                    154:  *  of bytes.
                    155:  *
                    156:  *  The last block in the returned chain will have S_DELIM set.
                    157:  */
                    158: int
                    159: bround(Block *bp, int amount)
                    160: {
                    161:        Block *last;
                    162:        int len, pad;
                    163: 
                    164:        len = 0;
                    165:        SET(last);      /* Ken's magic */
                    166: 
                    167:        while(bp) {
                    168:                len += BLEN(bp);
                    169:                last = bp;
                    170:                bp = bp->next;
                    171:        }
                    172: 
                    173:        pad = ((len + amount) & ~amount) - len;
                    174:        if(pad) {
                    175:                if(last->lim - last->wptr >= pad){
                    176:                        memset(last->wptr, 0, pad);
                    177:                        last->wptr += pad;
                    178:                } else {
                    179:                        last->next = allocb(pad);
                    180:                        last->flags &= ~S_DELIM;
                    181:                        last = last->next;
                    182:                        last->wptr += pad;
                    183:                        last->flags |= S_DELIM;
                    184:                }
                    185:        }
                    186: 
                    187:        return len + pad;
                    188: }
                    189: 
                    190: /*
                    191:  *  expand a block list to be one block, len bytes long.  used by
                    192:  *  ethernet routines.
                    193:  */
                    194: Block*
                    195: expandb(Block *bp, int len)
                    196: {
                    197:        Block *nbp, *new;
                    198:        int i;
                    199:        ulong delim = 0;
                    200: 
                    201:        new = allocb(len);
                    202:        if(new == 0){
                    203:                freeb(bp);
                    204:                return 0;
                    205:        }
                    206: 
                    207:        /*
                    208:         *  copy bytes into new block
                    209:         */
                    210:        for(nbp = bp; len>0 && nbp; nbp = nbp->next){
                    211:                delim = nbp->flags & S_DELIM;
                    212:                i = BLEN(nbp);
                    213:                if(i > len) {
                    214:                        memmove(new->wptr, nbp->rptr, len);
                    215:                        new->wptr += len;
                    216:                        break;
                    217:                } else {
                    218:                        memmove(new->wptr, nbp->rptr, i);
                    219:                        new->wptr += i;
                    220:                        len -= i;
                    221:                }
                    222:        }
                    223:        if(len){
                    224:                memset(new->wptr, 0, len);
                    225:                new->wptr += len;
                    226:        }
                    227:        new->flags |= delim;
                    228:        freeb(bp);
                    229:        return new;
                    230: 
                    231: }
                    232: 
                    233: /*
                    234:  *  make a copy of the first 'count' bytes of a block chain.  Use
                    235:  *  by transport protocols.
                    236:  */
                    237: Block *
                    238: copyb(Block *bp, int count)
                    239: {
                    240:        Block *nb, *head, **p;
                    241:        int l;
                    242: 
                    243:        p = &head;
                    244:        while(count) {
                    245:                l = BLEN(bp);
                    246:                if(count < l)
                    247:                        l = count;
                    248:                nb = allocb(l);
                    249:                if(nb == 0)
                    250:                        panic("copyb.1");
                    251:                memmove(nb->wptr, bp->rptr, l);
                    252:                nb->wptr += l;
                    253:                count -= l;
                    254:                if(bp->flags & S_DELIM)
                    255:                        nb->flags |= S_DELIM;
                    256:                *p = nb;
                    257:                p = &nb->next;
                    258:                bp = bp->next;
                    259:                if(bp == 0)
                    260:                        break;
                    261:        }
                    262:        if(count) {
                    263:                nb = allocb(count);
                    264:                if(nb == 0)
                    265:                        panic("copyb.2");
                    266:                memset(nb->wptr, 0, count);
                    267:                nb->wptr += count;
                    268:                nb->flags |= S_DELIM;
                    269:                *p = nb;
                    270:        }
                    271:        if(blen(head) == 0)
                    272:                print("copyb: zero length\n");
                    273: 
                    274:        return head;
                    275: }
                    276: 
                    277: /*
                    278:  *  Part 2) Queues
                    279:  */
                    280: 
                    281: /*
                    282:  *  process end line discipline
                    283:  */
                    284: static Streamput stputq;
                    285: Qinfo procinfo =
                    286: {
                    287:        stputq,
                    288:        nullput,
                    289:        0,
                    290:        0,
                    291:        "process"
                    292: };
                    293: 
                    294: /*
                    295:  *  line disciplines that can be pushed
                    296:  */
                    297: static Qinfo *lds;
                    298: 
                    299: /*
                    300:  *  make known a stream module and call its initialization routine, if
                    301:  *  it has one.
                    302:  */
                    303: void
                    304: newqinfo(Qinfo *qi)
                    305: {
                    306:        if(qi->next)
                    307:                panic("newqinfo: already configured");
                    308: 
                    309:        qi->next = lds;
                    310:        lds = qi;
                    311:        if(qi->reset)
                    312:                (*qi->reset)();
                    313: }
                    314: 
                    315: /*
                    316:  *  find the info structure for line discipline 'name'
                    317:  */
                    318: Qinfo *
                    319: qinfofind(char *name)
                    320: {
                    321:        Qinfo *qi;
                    322: 
                    323:        if(name == 0)
                    324:                return 0;
                    325:        for(qi = lds; qi; qi = qi->next)
                    326:                if(strcmp(qi->name, name)==0)
                    327:                        return qi;
                    328:        return 0;
                    329: }
                    330: 
                    331: /*
                    332:  *  allocate a pair of queues.  flavor them with the requested put routines.
                    333:  *  the `QINUSE' flag on the read side is the only one used.
                    334:  */
                    335: static Queue *
                    336: allocq(Qinfo *qi)
                    337: {
                    338:        Queue *q, *wq;
                    339: 
                    340:        q = smalloc(2*sizeof(Queue));
                    341: 
                    342:        q->flag = QINUSE;
                    343:        q->r.p = 0;
                    344:        q->info = qi;
                    345:        q->put = qi->iput;
                    346:        q->len = q->nb = 0;
                    347:        q->ptr = 0;
                    348:        q->rp = &q->r;
                    349:        wq = q->other = q + 1;
                    350: 
                    351:        wq->flag = QINUSE;
                    352:        wq->r.p = 0;
                    353:        wq->info = qi;
                    354:        wq->put = qi->oput;
                    355:        wq->other = q;
                    356:        wq->ptr = 0;
                    357:        wq->len = wq->nb = 0;
                    358:        wq->rp = &wq->r;
                    359: 
                    360:        return q;
                    361: }
                    362: 
                    363: /*
                    364:  *  free a queue
                    365:  */
                    366: static void
                    367: freeq(Queue *q)
                    368: {
                    369:        Block *bp;
                    370: 
                    371:        q = RD(q);
                    372:        while(bp = getq(q))
                    373:                freeb(bp);
                    374:        q = WR(q);
                    375:        while(bp = getq(q))
                    376:                freeb(bp);
                    377:        free(RD(q));
                    378: }
                    379: 
                    380: /*
                    381:  *  flush a queue
                    382:  */
                    383: static void
                    384: flushq(Queue *q)
                    385: {
                    386:        Block *bp;
                    387: 
                    388:        q = RD(q);
                    389:        while(bp = getq(q))
                    390:                freeb(bp);
                    391:        q = WR(q);
                    392:        while(bp = getq(q))
                    393:                freeb(bp);
                    394: }
                    395: 
                    396: /*
                    397:  *  push a queue onto a stream referenced by the proc side write q
                    398:  */
                    399: Queue *
                    400: pushq(Stream* s, Qinfo *qi)
                    401: {
                    402:        Queue *q;
                    403:        Queue *nq;
                    404: 
                    405:        q = RD(s->procq);
                    406: 
                    407:        /*
                    408:         *  make the new queue
                    409:         */
                    410:        nq = allocq(qi);
                    411: 
                    412:        /*
                    413:         *  push
                    414:         */
                    415:        qlock(s);
                    416:        RD(nq)->next = q;
                    417:        RD(WR(q)->next)->next = RD(nq);
                    418:        WR(nq)->next = WR(q)->next;
                    419:        WR(q)->next = WR(nq);
                    420:        qunlock(s);
                    421: 
                    422:        if(qi->open)
                    423:                (*qi->open)(RD(nq), s);
                    424: 
                    425:        return WR(nq)->next;
                    426: }
                    427: 
                    428: /*
                    429:  *  pop off the top line discipline
                    430:  */
                    431: static void
                    432: popq(Stream *s)
                    433: {
                    434:        Queue *q;
                    435: 
                    436:        if(waserror()){
                    437:                qunlock(s);
                    438:                nexterror();
                    439:        }
                    440:        qlock(s);
                    441:        if(s->procq->next == WR(s->devq))
                    442:                error(Ebadld);
                    443:        q = s->procq->next;
                    444:        if(q->info->close)
                    445:                (*q->info->close)(RD(q));
                    446:        s->procq->next = q->next;
                    447:        RD(q->next)->next = RD(s->procq);
                    448:        qunlock(s);
                    449:        freeq(q);
                    450: }
                    451: 
                    452: /*
                    453:  *  add a block (or list of blocks) to the end of a queue.  return true
                    454:  *  if one of the blocks contained a delimiter. 
                    455:  */
                    456: int
                    457: putq(Queue *q, Block *bp)
                    458: {
                    459:        int delim;
                    460: 
                    461:        lock(q);
                    462:        if(q->first)
                    463:                q->last->next = bp;
                    464:        else
                    465:                q->first = bp;
                    466:        q->len += BLEN(bp);
                    467:        q->nb++;
                    468:        delim = bp->flags & S_DELIM;
                    469:        while(bp->next) {
                    470:                bp = bp->next;
                    471:                q->len += BLEN(bp);
                    472:                q->nb++;
                    473:                delim |= bp->flags & S_DELIM;
                    474:        }
                    475:        q->last = bp;
                    476:        if(q->len >= Streamhi || q->nb >= Streambhi)
                    477:                q->flag |= QHIWAT;
                    478:        unlock(q);
                    479:        return delim;
                    480: }
                    481: 
                    482: int
                    483: putb(Blist *q, Block *bp)
                    484: {
                    485:        int delim;
                    486: 
                    487:        if(q->first)
                    488:                q->last->next = bp;
                    489:        else
                    490:                q->first = bp;
                    491:        q->len += BLEN(bp);
                    492:        delim = bp->flags & S_DELIM;
                    493:        while(bp->next) {
                    494:                bp = bp->next;
                    495:                q->len += BLEN(bp);
                    496:                delim |= bp->flags & S_DELIM;
                    497:        }
                    498:        q->last = bp;
                    499:        return delim;
                    500: }
                    501: 
                    502: /*
                    503:  *  add a block to the start of a queue 
                    504:  */
                    505: void
                    506: putbq(Blist *q, Block *bp)
                    507: {
                    508:        lock(q);
                    509:        if(q->first)
                    510:                bp->next = q->first;
                    511:        else
                    512:                q->last = bp;
                    513:        q->first = bp;
                    514:        q->len += BLEN(bp);
                    515:        q->nb++;
                    516:        unlock(q);
                    517: }
                    518: 
                    519: /*
                    520:  *  remove the first block from a queue
                    521:  */
                    522: Block *
                    523: getq(Queue *q)
                    524: {
                    525:        Block *bp;
                    526: 
                    527:        lock(q);
                    528:        bp = q->first;
                    529:        if(bp) {
                    530:                q->first = bp->next;
                    531:                if(q->first == 0)
                    532:                        q->last = 0;
                    533:                q->len -= BLEN(bp);
                    534:                q->nb--;
                    535:                if((q->flag&QHIWAT) && q->len<Streamhi/2 && q->nb<Streambhi/2 &&q->other){
                    536:                        wakeup(q->other->next->other->rp);
                    537:                        q->flag &= ~QHIWAT;
                    538:                }
                    539:                bp->next = 0;
                    540:        }
                    541:        unlock(q);
                    542:        return bp;
                    543: }
                    544: 
                    545: /*
                    546:  *  grab all the blocks in a queue
                    547:  */
                    548: Block *
                    549: grabq(Queue *q)
                    550: {
                    551:        Block *bp;
                    552: 
                    553:        lock(q);
                    554:        bp = q->first;
                    555:        if(bp){
                    556:                q->first = 0;
                    557:                q->last = 0;
                    558:                q->len = 0;
                    559:                q->nb = 0;
                    560:                if(q->flag&QHIWAT){
                    561:                        wakeup(q->other->next->other->rp);
                    562:                        q->flag &= ~QHIWAT;
                    563:                }
                    564:        }
                    565:        unlock(q);
                    566:        return bp;
                    567: }
                    568: 
                    569: /*
                    570:  *  remove the first block from a list of blocks
                    571:  */
                    572: Block *
                    573: getb(Blist *q)
                    574: {
                    575:        Block *bp;
                    576: 
                    577:        bp = q->first;
                    578:        if(bp) {
                    579:                q->first = bp->next;
                    580:                if(q->first == 0)
                    581:                        q->last = 0;
                    582:                q->len -= BLEN(bp);
                    583:                bp->next = 0;
                    584:        }
                    585:        return bp;
                    586: }
                    587: 
                    588: 
                    589: /*
                    590:  *  put a block into the bit bucket
                    591:  */
                    592: void
                    593: nullput(Queue *q, Block *bp)
                    594: {
                    595:        USED(q);
                    596:        if(bp->type == M_HANGUP)
                    597:                freeb(bp);
                    598:        else {
                    599:                freeb(bp);
                    600:                error(Ehungup);
                    601:        }
                    602: }
                    603: 
                    604: /*
                    605:  *  Part 3) Streams
                    606:  */
                    607: 
                    608: /*
                    609:  *  the per stream directory structure
                    610:  */
                    611: Dirtab streamdir[]={
                    612:        "data",         {Sdataqid},     0,                      0600,
                    613:        "ctl",          {Sctlqid},      0,                      0600,
                    614: };
                    615: 
                    616: /*
                    617:  *  hash buckets containing all streams
                    618:  */
                    619: enum
                    620: {
                    621:        Nbits=  5,
                    622:        Nhash=  1<<Nbits,
                    623:        Nmask=  Nhash-1,
                    624: };
                    625: typedef struct Sthash Sthash;
                    626: struct Sthash
                    627: {
                    628:        QLock;
                    629:        Stream  *s;
                    630: };
                    631: static Sthash ht[Nhash];
                    632: 
                    633: static void    hangup(Stream*);
                    634: 
                    635: /*
                    636:  *  A stream device consists of the contents of streamdir plus
                    637:  *  any directory supplied by the actual device.
                    638:  *
                    639:  *  values of s:
                    640:  *     0 to ntab-1 apply to the auxiliary directory.
                    641:  *     ntab to ntab+Shighqid-Slowqid+1 apply to streamdir.
                    642:  */
                    643: int
                    644: streamgen(Chan *c, Dirtab *tab, int ntab, int s, Dir *dp)
                    645: {
                    646:        if(s < ntab)
                    647:                tab = &tab[s];
                    648:        else if(s < ntab + Shighqid - Slowqid + 1)
                    649:                tab = &streamdir[s - ntab];
                    650:        else
                    651:                return -1;
                    652: 
                    653:        devdir(c, (Qid){STREAMQID(STREAMID(c->qid.path),tab->qid.path), 0}, 
                    654:                tab->name, tab->length, eve, tab->perm, dp);
                    655:        return 1;
                    656: }
                    657: 
                    658: /*
                    659:  *  return a hash bucket for a stream
                    660:  */
                    661: static Sthash*
                    662: hash(int type, int dev, int id)
                    663: {
                    664:        return &ht[(type*7*7 + dev*7 + id) & Nmask];
                    665: }
                    666: 
                    667: /*
                    668:  *  create a new stream, if noopen is non-zero, don't increment the open count
                    669:  */
                    670: Stream *
                    671: streamnew(ushort type, ushort dev, ulong id, Qinfo *qi, int noopen)
                    672: {
                    673:        Stream *s;
                    674:        Queue *q;
                    675:        Sthash *hb;
                    676: 
                    677:        hb = hash(type, dev, id);
                    678: 
                    679:        /*
                    680:         *  if the stream already exists, just increment the reference counts.
                    681:         */
                    682:        qlock(hb);
                    683:        for(s = hb->s; s; s = s->next) {
                    684:                if(s->type == type && s->dev == dev && s->id == id){
                    685:                        s->inuse++;
                    686:                        qunlock(hb);
                    687:                        if(noopen == 0){
                    688:                                qlock(s);
                    689:                                s->opens++;
                    690:                                qunlock(s);
                    691:                        }
                    692:                        return s;
                    693:                }
                    694:        }
                    695: 
                    696:        /*
                    697:         *  create and init a new stream
                    698:         */
                    699:        s = smalloc(sizeof(Stream));
                    700:        s->inuse = 1;
                    701:        s->type = type;
                    702:        s->dev = dev;
                    703:        s->id = id;
                    704:        s->err = 0;
                    705:        s->hread = 0;
                    706:        s->next = hb->s;
                    707:        hb->s = s;
                    708: 
                    709:        /*
                    710:         *  The ordering of these 2 instructions is very important.
                    711:         *  It makes sure we finish the stream initialization before
                    712:         *  anyone else can access it.
                    713:         */
                    714:        qlock(s);
                    715:        qunlock(hb);
                    716: 
                    717:        if(waserror()){
                    718:                qunlock(s);
                    719:                streamclose1(s);
                    720:                nexterror();
                    721:        }
                    722: 
                    723:        /*
                    724:         *  hang a device and process q off the stream
                    725:         */
                    726:        if(noopen)
                    727:                s->opens = 0;
                    728:        else
                    729:                s->opens = 1;
                    730:        q = allocq(&procinfo);
                    731:        WR(q)->ptr = s;
                    732:        RD(q)->ptr = s;
                    733:        s->procq = WR(q);
                    734:        q = allocq(qi);
                    735:        s->devq = RD(q);
                    736:        WR(s->procq)->next = WR(s->devq);
                    737:        RD(s->procq)->next = 0;
                    738:        RD(s->devq)->next = RD(s->procq);
                    739:        WR(s->devq)->next = 0;
                    740: 
                    741:        if(qi->open)
                    742:                (*qi->open)(RD(s->devq), s);
                    743: 
                    744:        qunlock(s);
                    745:        poperror();
                    746:        return s;
                    747: }
                    748: 
                    749: /*
                    750:  *  Associate a stream with a channel
                    751:  */
                    752: void
                    753: streamopen(Chan *c, Qinfo *qi)
                    754: {
                    755:        c->stream = streamnew(c->type, c->dev, STREAMID(c->qid.path), qi, 0);
                    756: }
                    757: 
                    758: /*
                    759:  *  Enter a stream only if the stream exists and is open.  Increment the
                    760:  *  reference count so it can't disappear under foot.
                    761:  *
                    762:  *  Return -1 if the stream no longer exists or is not opened.
                    763:  */
                    764: int
                    765: streamenter(Stream *s)
                    766: {
                    767:        Sthash *hb;
                    768:        Stream *ns;
                    769: 
                    770:        hb = hash(s->type, s->dev, s->id);
                    771:        qlock(hb);
                    772:        for(ns = hb->s; ns; ns = ns->next)
                    773:                if(s->type == ns->type && s->dev == ns->dev && s->id == ns->id){
                    774:                        s->inuse++;
                    775:                        qunlock(hb);
                    776:                        if(s->opens == 0){
                    777:                                streamexit(s);
                    778:                                return -1;
                    779:                        }
                    780:                        return 0;
                    781:                }
                    782:        qunlock(hb);
                    783:        return -1;
                    784: }
                    785: 
                    786: /*
                    787:  *  Decrement the reference count on a stream.  If the count is
                    788:  *  zero, free the stream.
                    789:  */
                    790: void
                    791: streamexit(Stream *s)
                    792: {
                    793:        Queue *q;
                    794:        Queue *nq;
                    795:        Sthash *hb;
                    796:        Stream **l, *ns;
                    797: 
                    798:        hb = hash(s->type, s->dev, s->id);
                    799:        qlock(hb);
                    800:        if(s->inuse-- == 1){
                    801:                if(s->opens != 0)
                    802:                        panic("streamexit %d %s\n", s->opens, s->devq->info->name);
                    803: 
                    804:                /*
                    805:                 *  ascend the stream freeing the queues
                    806:                 */
                    807:                for(q = s->devq; q; q = nq){
                    808:                        nq = q->next;
                    809:                        freeq(q);
                    810:                }
                    811:                if(s->err)
                    812:                        freeb(s->err);
                    813: 
                    814:                /*
                    815:                 *  unchain it from the hash bucket and free
                    816:                 */
                    817:                l = &hb->s;
                    818:                for(ns = hb->s; ns; ns = ns->next){
                    819:                        if(s == ns){
                    820:                                *l = s->next;
                    821:                                break;
                    822:                        }
                    823:                        l = &ns->next;
                    824:                }
                    825:                free(s);
                    826:        }
                    827:        qunlock(hb);
                    828: }
                    829: 
                    830: /*
                    831:  *  nail down a stream so that it can't be closed
                    832:  */
                    833: void
                    834: naildownstream(Stream *s)
                    835: {
                    836:        s->opens++;
                    837:        s->inuse++;
                    838: }
                    839: 
                    840: /*
                    841:  *  Decrement the open count.  When it goes to zero, call the close
                    842:  *  routines for each queue in the stream.
                    843:  */
                    844: int
                    845: streamclose1(Stream *s)
                    846: {
                    847:        Queue *q, *nq;
                    848:        int rv;
                    849: 
                    850:        /*
                    851:         *  decrement the open count
                    852:         */
                    853:        qlock(s);
                    854:        if(s->opens-- == 1){
                    855:                /*
                    856:                 *  descend the stream closing the queues
                    857:                 */
                    858:                for(q = s->procq; q; q = q->next){
                    859:                        if(!waserror()){
                    860:                                if(q->info->close)
                    861:                                        (*q->info->close)(q->other);
                    862:                                poperror();
                    863:                        }
                    864:                        WR(q)->put = nullput;
                    865: 
                    866:                        /*
                    867:                         *  this may be 2 streams joined device end to device end
                    868:                         */
                    869:                        if(q == s->devq->other)
                    870:                                break;
                    871:                }
                    872:        
                    873:                /*
                    874:                 *  ascend the stream flushing the queues
                    875:                 */
                    876:                for(q = s->devq; q; q = nq){
                    877:                        nq = q->next;
                    878:                        flushq(q);
                    879:                }
                    880:        }
                    881:        rv = s->opens;
                    882:        qunlock(s);
                    883: 
                    884:        /*
                    885:         *  leave it and free it
                    886:         */
                    887:        streamexit(s);
                    888:        return rv;
                    889: }
                    890: int
                    891: streamclose(Chan *c)
                    892: {
                    893:        /*
                    894:         *  if no stream, ignore it
                    895:         */
                    896:        if(!c->stream)
                    897:                return 0;
                    898:        return streamclose1(c->stream);
                    899: }
                    900: 
                    901: /*
                    902:  *  put a block to be read into the queue.  wakeup any waiting reader
                    903:  */
                    904: void
                    905: stputq(Queue *q, Block *bp)
                    906: {
                    907:        int awaken;
                    908:        Stream *s;
                    909: 
                    910:        if(bp->type == M_HANGUP){
                    911:                s = q->ptr;
                    912:                if(bp->rptr<bp->wptr && s->err==0)
                    913:                        s->err = bp;
                    914:                else
                    915:                        freeb(bp);
                    916:                q->flag |= QHUNGUP;
                    917:                q->other->flag |= QHUNGUP;
                    918:                wakeup(q->other->rp);
                    919:                awaken = 1;
                    920:        } else {
                    921:                lock(q);
                    922:                if(q->first)
                    923:                        q->last->next = bp;
                    924:                else
                    925:                        q->first = bp;
                    926:                q->len += BLEN(bp);
                    927:                q->nb++;
                    928:                awaken = bp->flags & S_DELIM;
                    929:                while(bp->next) {
                    930:                        bp = bp->next;
                    931:                        q->len += BLEN(bp);
                    932:                        q->nb++;
                    933:                        awaken |= bp->flags & S_DELIM;
                    934:                }
                    935:                q->last = bp;
                    936:                if(q->len >= Streamhi || q->nb >= Streambhi){
                    937:                        q->flag |= QHIWAT;
                    938:                        awaken = 1;
                    939:                }
                    940:                unlock(q);
                    941:        }
                    942:        if(awaken)
                    943:                wakeup(q->rp);
                    944: }
                    945: 
                    946: /*
                    947:  *  return the stream id
                    948:  */
                    949: long
                    950: streamctlread(Chan *c, void *vbuf, long n)
                    951: {
                    952:        char *buf = vbuf;
                    953:        char num[32];
                    954:        Stream *s;
                    955: 
                    956:        s = c->stream;
                    957:        if(STREAMTYPE(c->qid.path) == Sctlqid){
                    958:                sprint(num, "%d", s->id);
                    959:                return readstr(c->offset, buf, n, num);
                    960:        } else {
                    961:                if(CHDIR & c->qid.path)
                    962:                        return devdirread(c, vbuf, n, 0, 0, streamgen);
                    963:                else
                    964:                        panic("streamctlread");
                    965:        }
                    966:        return 0;       /* not reached */
                    967: }
                    968: 
                    969: /*
                    970:  *  return true if there is an output buffer available
                    971:  */
                    972: static int
                    973: isinput(void *x)
                    974: {
                    975:        Queue *q;
                    976: 
                    977:        q = (Queue *)x;
                    978:        return (q->flag&QHUNGUP) || q->first!=0;
                    979: }
                    980: 
                    981: /*
                    982:  *  read until we fill the buffer or until a DELIM is encountered
                    983:  */
                    984: long
                    985: streamread(Chan *c, void *vbuf, long n)
                    986: {
                    987:        Block *bp;
                    988:        Block *tofree;
                    989:        Stream *s;
                    990:        Queue *q;
                    991:        int left, i;
                    992:        uchar *buf = vbuf;
                    993: 
                    994:        if(STREAMTYPE(c->qid.path) != Sdataqid)
                    995:                return streamctlread(c, vbuf, n);
                    996: 
                    997:        /*
                    998:         *  one reader at a time
                    999:         */
                   1000:        s = c->stream;
                   1001:        left = n;
                   1002:        qlock(&s->rdlock);
                   1003:        tofree = 0;
                   1004:        q = 0;
                   1005:        if(waserror()){
                   1006:                /*
                   1007:                 *  put any partially read message back into the
                   1008:                 *  queue
                   1009:                 */
                   1010:                while(tofree){
                   1011:                        bp = tofree;
                   1012:                        tofree = bp->next;
                   1013:                        bp->next = 0;
                   1014:                        putbq(q, bp);
                   1015:                }
                   1016:                qunlock(&s->rdlock);
                   1017:                nexterror();
                   1018:        }
                   1019: 
                   1020:        /*
                   1021:         *  sleep till data is available
                   1022:         */
                   1023:        q = RD(s->procq);
                   1024:        while(left){
                   1025:                bp = getq(q);
                   1026:                if(bp == 0){
                   1027:                        if(q->flag & QHUNGUP){
                   1028:                                if(s->err)
                   1029:                                        error((char*)s->err->rptr);
                   1030:                                else if(s->hread++<3)
                   1031:                                        break;
                   1032:                                else
                   1033:                                        error(Ehungup);
                   1034:                        }
                   1035:                        q->rp = &q->r;
                   1036:                        sleep(q->rp, isinput, (void *)q);
                   1037:                        continue;
                   1038:                }
                   1039: 
                   1040:                i = BLEN(bp);
                   1041:                if(i <= left){
                   1042:                        memmove(buf, bp->rptr, i);
                   1043:                        left -= i;
                   1044:                        buf += i;
                   1045:                        bp->next = tofree;
                   1046:                        tofree = bp;
                   1047:                        if(bp->flags & S_DELIM)
                   1048:                                break;
                   1049:                } else {
                   1050:                        memmove(buf, bp->rptr, left);
                   1051:                        bp->rptr += left;
                   1052:                        putbq(q, bp);
                   1053:                        left = 0;
                   1054:                }
                   1055:        }
                   1056: 
                   1057:        /*
                   1058:         *  free completely read blocks
                   1059:         */
                   1060:        if(tofree)
                   1061:                freeb(tofree);
                   1062: 
                   1063:        qunlock(&s->rdlock);
                   1064:        poperror();
                   1065:        return n - left;        
                   1066: }
                   1067: 
                   1068: /*
                   1069:  *  look for an instance of the line discipline `name' on
                   1070:  *  the stream `s'
                   1071:  */
                   1072: void
                   1073: qlook(Stream *s, char *name)
                   1074: {
                   1075:        Queue *q;
                   1076: 
                   1077:        for(q = s->procq; q; q = q->next){
                   1078:                if(strcmp(q->info->name, name) == 0)
                   1079:                        return;
                   1080: 
                   1081:                /*
                   1082:                 *  this may be 2 streams joined device end to device end
                   1083:                 */
                   1084:                if(q == s->devq->other)
                   1085:                        break;
                   1086:        }
                   1087:        error(Ebadarg);
                   1088: }
                   1089: 
                   1090: /*
                   1091:  *  Handle a ctl request.  Streamwide requests are:
                   1092:  *
                   1093:  *     hangup                  -- send an M_HANGUP up the stream
                   1094:  *     push ldname             -- push the line discipline named ldname
                   1095:  *     pop                     -- pop a line discipline
                   1096:  *     look ldname             -- look for a line discipline
                   1097:  *
                   1098:  *  This routing is entered with s->wrlock'ed and must unlock.
                   1099:  */
                   1100: static long
                   1101: streamctlwrite(Chan *c, void *a, long n)
                   1102: {
                   1103:        Qinfo *qi;
                   1104:        Block *bp;
                   1105:        Stream *s;
                   1106: 
                   1107:        if(STREAMTYPE(c->qid.path) != Sctlqid)
                   1108:                panic("streamctlwrite %lux", c->qid);
                   1109:        s = c->stream;
                   1110: 
                   1111:        /*
                   1112:         *  package
                   1113:         */
                   1114:        bp = allocb(n+1);
                   1115:        memmove(bp->wptr, a, n);
                   1116:        bp->wptr[n] = 0;
                   1117:        bp->wptr += n + 1;
                   1118: 
                   1119:        /*
                   1120:         *  check for standard requests
                   1121:         */
                   1122:        if(streamparse("hangup", bp)){
                   1123:                hangup(s);
                   1124:                freeb(bp);
                   1125:        } else if(streamparse("push", bp)){
                   1126:                qi = qinfofind((char *)bp->rptr);
                   1127:                if(qi == 0)
                   1128:                        error(Ebadld);
                   1129:                pushq(s, qi);
                   1130:                freeb(bp);
                   1131:        } else if(streamparse("pop", bp)){
                   1132:                popq(s);
                   1133:                freeb(bp);
                   1134:        } else if(streamparse("look", bp)){
                   1135:                qlook(s, (char *)bp->rptr);
                   1136:                freeb(bp);
                   1137:        } else {
                   1138:                bp->type = M_CTL;
                   1139:                bp->flags |= S_DELIM;
                   1140:                PUTNEXT(s->procq, bp);
                   1141:        }
                   1142: 
                   1143:        return n;
                   1144: }
                   1145: 
                   1146: /*
                   1147:  *  wait till there's room in the next stream
                   1148:  */
                   1149: static int
                   1150: notfull(void *arg)
                   1151: {
                   1152:        return !QFULL((Queue *)arg);
                   1153: }
                   1154: void
                   1155: flowctl(Queue *q, Block *bp)
                   1156: {
                   1157:        if(bp->type != M_HANGUP){
                   1158:                qlock(&q->rlock);
                   1159:                if(waserror()){
                   1160:                        qunlock(&q->rlock);
                   1161:                        freeb(bp);
                   1162:                        nexterror();
                   1163:                }
                   1164:                q->rp = &q->r;
                   1165:                sleep(q->rp, notfull, q->next);
                   1166:                qunlock(&q->rlock);
                   1167:                poperror();
                   1168:        }
                   1169:        PUTNEXT(q, bp);
                   1170: }
                   1171: 
                   1172: /*
                   1173:  *  send the request as a single delimited block
                   1174:  */
                   1175: long
                   1176: streamwrite(Chan *c, void *a, long n, int docopy)
                   1177: {
                   1178:        Stream *s;
                   1179:        Queue *q;
                   1180:        long rem;
                   1181:        int i;
                   1182:        Block *bp;
                   1183:        char *va;
                   1184: 
                   1185:        /*
                   1186:         *  docopy will get used if I ever figure out when to avoid copying
                   1187:         *  data. -- presotto
                   1188:         */
                   1189:        USED(docopy);
                   1190: 
                   1191:        s = c->stream;
                   1192: 
                   1193:        /*
                   1194:         *  decode the qid
                   1195:         */
                   1196:        if(STREAMTYPE(c->qid.path) != Sdataqid)
                   1197:                return streamctlwrite(c, a, n);
                   1198: 
                   1199:        /*
                   1200:         *  No writes allowed on hungup channels
                   1201:         */
                   1202:        q = s->procq;
                   1203:        if(q->other->flag & QHUNGUP){
                   1204:                if(s->err)
                   1205:                        error((char*)(s->err->rptr));
                   1206:                else
                   1207:                        error(Ehungup);
                   1208:        }
                   1209: 
                   1210:        /*
                   1211:         *  Write the message using blocks <= Streamhi bytes longs
                   1212:         */
                   1213:        va = a;
                   1214:        rem = n;
                   1215:        for(;;){
                   1216:                if(rem > Streamhi)
                   1217:                        i = Streamhi;
                   1218:                else
                   1219:                        i = rem;
                   1220:                bp = allocb(i);
                   1221:                memmove(bp->wptr, va, i);
                   1222:                bp->wptr += i;
                   1223:                va += i;
                   1224:                rem -= i;
                   1225:                if(rem > 0){
                   1226:                        FLOWCTL(q, bp);
                   1227:                } else {
                   1228:                        bp->flags |= S_DELIM;
                   1229:                        FLOWCTL(q, bp);
                   1230:                        break;
                   1231:                }
                   1232:        }
                   1233:        return n;
                   1234: }
                   1235: 
                   1236: /*
                   1237:  *  stat a stream.  the length is the number of bytes up to the
                   1238:  *  first delimiter.
                   1239:  */
                   1240: void
                   1241: streamstat(Chan *c, char *db, char *name, long perm)
                   1242: {
                   1243:        Dir dir;
                   1244:        Stream *s;
                   1245:        Queue *q;
                   1246:        Block *bp;
                   1247:        long n;
                   1248: 
                   1249:        s = c->stream;
                   1250:        n = 0;
                   1251:        if(s) {
                   1252:                q = RD(s->procq);
                   1253:                if(q->flag & QHUNGUP)
                   1254:                        error(Ehungup);
                   1255:                lock(q);
                   1256:                for(bp=q->first; bp; bp = bp->next){
                   1257:                        n += BLEN(bp);
                   1258:                        if(bp->flags&S_DELIM)
                   1259:                                break;
                   1260:                }
                   1261:                unlock(q);
                   1262:        }
                   1263: 
                   1264:        devdir(c, c->qid, name, n, eve, perm, &dir);
                   1265:        convD2M(&dir, db);
                   1266: }
                   1267: 
                   1268: /*
                   1269:  *  send a hangup up a stream
                   1270:  */
                   1271: static void
                   1272: hangup(Stream *s)
                   1273: {
                   1274:        Block *bp;
                   1275: 
                   1276:        bp = allocb(0);
                   1277:        bp->type = M_HANGUP;
                   1278:        if(s->devq && s->devq->put)
                   1279:                (*s->devq->put)(s->devq, bp);
                   1280:        else
                   1281:                freeb(bp);
                   1282: }
                   1283: 
                   1284: /*
                   1285:  *  parse a string and return a pointer to the second element if the 
                   1286:  *  first matches name.  bp->rptr will be updated to point to the
                   1287:  *  second element.
                   1288:  *
                   1289:  *  return 0 if no match.
                   1290:  *
                   1291:  *  it is assumed that the block data is null terminated.  streamwrite
                   1292:  *  guarantees this.
                   1293:  */
                   1294: int
                   1295: streamparse(char *name, Block *bp)
                   1296: {
                   1297:        int len;
                   1298: 
                   1299:        len = strlen(name);
                   1300:        if(BLEN(bp) < len)
                   1301:                return 0;
                   1302:        if(strncmp(name, (char *)bp->rptr, len)==0){
                   1303:                if(bp->rptr[len] == ' ')
                   1304:                        bp->rptr += len+1;
                   1305:                else if(bp->rptr[len])
                   1306:                        return 0;
                   1307:                else
                   1308:                        bp->rptr += len;
                   1309:                while(*bp->rptr==' ' && bp->wptr>bp->rptr)
                   1310:                        bp->rptr++;
                   1311:                return 1;
                   1312:        }
                   1313:        return 0;
                   1314: }
                   1315: 
                   1316: /*
                   1317:  *  like andrew's getmfields but no hidden state
                   1318:  */
                   1319: int
                   1320: getfields(char *lp, char **fields, int n, char *sep)
                   1321: {
                   1322:        int i;
                   1323: 
                   1324:        for(i=0; lp && *lp && i<n; i++){
                   1325:                while(*lp && strchr(sep, *lp) != 0)
                   1326:                        *lp++=0;
                   1327:                if(*lp == 0)
                   1328:                        break;
                   1329:                fields[i]=lp;
                   1330:                while(*lp && strchr(sep, *lp) == 0)
                   1331:                        lp++;
                   1332:        }
                   1333:        return i;
                   1334: }
                   1335: 
                   1336: static Streamopen      permstopen;
                   1337: static Streamput       permstput;
                   1338: 
                   1339: /*
                   1340:  *  pushing this line discipline makes the stream unclosable, i.e., always there
                   1341:  */
                   1342: Qinfo perminfo =
                   1343: {
                   1344:        permstput,
                   1345:        permstput,
                   1346:        permstopen,
                   1347:        0,
                   1348:        "permanent"
                   1349: };
                   1350: 
                   1351: static void
                   1352: permstopen(Queue *q, Stream *s)
                   1353: {
                   1354:        USED(q);
                   1355:        s->opens++;
                   1356:        s->inuse++;
                   1357: }
                   1358: 
                   1359: static void
                   1360: permstput(Queue *q, Block *bp)
                   1361: {
                   1362:        PUTNEXT(q, bp);
                   1363: }

unix.superglobalmegacorp.com

This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.