Annotation of lucent/sys/src/9/port/devpipe.c, revision 1.1.1.1

1.1       root        1: #include       "u.h"
                      2: #include       "../port/lib.h"
                      3: #include       "mem.h"
                      4: #include       "dat.h"
                      5: #include       "fns.h"
                      6: #include       "../port/error.h"
                      7: 
                      8: #include       "devtab.h"
                      9: 
                     10: typedef struct Pipe    Pipe;
                     11: struct Pipe
                     12: {
                     13:        Ref;
                     14:        QLock;
                     15:        Pipe    *next;
                     16:        ulong   path;
                     17: };
                     18: 
                     19: struct
                     20: {
                     21:        Lock;
                     22:        Pipe    *pipe;
                     23:        ulong   path;
                     24: } pipealloc;
                     25: 
                     26: static Pipe *getpipe(ulong);
                     27: static void pipeiput(Queue*, Block*);
                     28: static void pipeoput(Queue*, Block*);
                     29: static void pipestclose(Queue *);
                     30: 
                     31: Qinfo pipeinfo =
                     32: {
                     33:        pipeiput,
                     34:        pipeoput,
                     35:        0,
                     36:        pipestclose,
                     37:        "pipe"
                     38: };
                     39: 
                     40: Dirtab pipedir[] =
                     41: {
                     42:        "data",         {Sdataqid},     0,                      0600,
                     43:        "ctl",          {Sctlqid},      0,                      0600,
                     44:        "data1",        {Sdataqid},     0,                      0600,
                     45:        "ctl1",         {Sctlqid},      0,                      0600,
                     46: };
                     47: #define NPIPEDIR 4
                     48: 
                     49: void
                     50: pipeinit(void)
                     51: {
                     52: }
                     53: 
                     54: void
                     55: pipereset(void)
                     56: {
                     57: }
                     58: 
                     59: /*
                     60:  *  create a pipe, no streams are created until an open
                     61:  */
                     62: Chan*
                     63: pipeattach(char *spec)
                     64: {
                     65:        Pipe *p;
                     66:        Chan *c;
                     67: 
                     68:        c = devattach('|', spec);
                     69:        p = smalloc(sizeof(Pipe));
                     70:        p->ref = 1;
                     71: 
                     72:        lock(&pipealloc);
                     73:        p->path = ++pipealloc.path;
                     74:        p->next = pipealloc.pipe;
                     75:        pipealloc.pipe = p;
                     76:        unlock(&pipealloc);
                     77: 
                     78:        c->qid = (Qid){CHDIR|STREAMQID(2*p->path, 0), 0};
                     79:        c->dev = 0;
                     80:        return c;
                     81: }
                     82: 
                     83: Chan*
                     84: pipeclone(Chan *c, Chan *nc)
                     85: {
                     86:        Pipe *p;
                     87: 
                     88:        p = getpipe(STREAMID(c->qid.path)/2);
                     89:        nc = devclone(c, nc);
                     90:        if(incref(p) <= 1)
                     91:                panic("pipeclone");
                     92:        return nc;
                     93: }
                     94: 
                     95: int
                     96: pipegen(Chan *c, Dirtab *tab, int ntab, int i, Dir *dp)
                     97: {
                     98:        int id;
                     99: 
                    100:        id = STREAMID(c->qid.path);
                    101:        if(i > 1)
                    102:                id++;
                    103:        if(tab==0 || i>=ntab)
                    104:                return -1;
                    105:        tab += i;
                    106:        devdir(c, (Qid){STREAMQID(id, tab->qid.path),0}, tab->name, tab->length, eve, tab->perm, dp);
                    107:        return 1;
                    108: }
                    109: 
                    110: 
                    111: int
                    112: pipewalk(Chan *c, char *name)
                    113: {
                    114:        return devwalk(c, name, pipedir, NPIPEDIR, pipegen);
                    115: }
                    116: 
                    117: void
                    118: pipestat(Chan *c, char *db)
                    119: {
                    120:        streamstat(c, db, "pipe", 0666);
                    121: }
                    122: 
                    123: /*
                    124:  *  if the stream doesn't exist, create it
                    125:  */
                    126: Chan *
                    127: pipeopen(Chan *c, int omode)
                    128: {
                    129:        Pipe *p;
                    130:        int other;
                    131:        Stream *local, *remote;
                    132: 
                    133:        if(c->qid.path & CHDIR){
                    134:                if(omode != OREAD)
                    135:                        error(Ebadarg);
                    136:                c->mode = omode;
                    137:                c->flag |= COPEN;
                    138:                c->offset = 0;
                    139:                return c;
                    140:        }
                    141: 
                    142:        p = getpipe(STREAMID(c->qid.path)/2);
                    143:        if(waserror()){
                    144:                qunlock(p);
                    145:                nexterror();
                    146:        }
                    147:        qlock(p);
                    148:        streamopen(c, &pipeinfo);
                    149:        local = c->stream;
                    150:        if(local->devq->ptr == 0){
                    151:                /*
                    152:                 *  first open, create the other end also
                    153:                 */
                    154:                other = STREAMID(c->qid.path)^1;
                    155:                remote = streamnew(c->type, c->dev, other, &pipeinfo,1);
                    156: 
                    157:                /*
                    158:                 *  connect the device ends of both streams
                    159:                 */
                    160:                local->devq->ptr = remote;
                    161:                remote->devq->ptr = local;
                    162:                local->devq->other->next = remote->devq;
                    163:                remote->devq->other->next = local->devq;
                    164:        } else if(local->opens == 1){
                    165:                /*
                    166:                 *  keep other side around till last close of this side
                    167:                 */
                    168:                streamenter(local->devq->ptr);
                    169:        }
                    170:        qunlock(p);
                    171:        poperror();
                    172: 
                    173:        c->mode = openmode(omode);
                    174:        c->flag |= COPEN;
                    175:        c->offset = 0;
                    176:        return c;
                    177: }
                    178: 
                    179: void
                    180: pipecreate(Chan *c, char *name, int omode, ulong perm)
                    181: {
                    182:        USED(c, name, omode, perm);
                    183:        error(Egreg);
                    184: }
                    185: 
                    186: void
                    187: piperemove(Chan *c)
                    188: {
                    189:        USED(c);
                    190:        error(Egreg);
                    191: }
                    192: 
                    193: void
                    194: pipewstat(Chan *c, char *db)
                    195: {
                    196:        USED(c, db);
                    197:        error(Eperm);
                    198: }
                    199: 
                    200: void
                    201: pipeclose(Chan *c)
                    202: {
                    203:        Pipe *p, *f, **l;
                    204:        Stream *remote;
                    205: 
                    206:        p = getpipe(STREAMID(c->qid.path)/2);
                    207: 
                    208:        /*
                    209:         *  take care of local and remote streams
                    210:         */
                    211:        if(c->stream){
                    212:                qlock(p);
                    213:                remote = c->stream->devq->ptr;
                    214:                if(streamclose(c) == 0){
                    215:                        if(remote)
                    216:                                streamexit(remote);
                    217:                }
                    218:                qunlock(p);
                    219:        }
                    220: 
                    221:        /*
                    222:         *  free the structure
                    223:         */
                    224:        if(decref(p) == 0){
                    225:                lock(&pipealloc);
                    226:                l = &pipealloc.pipe;
                    227:                for(f = *l; f; f = f->next) {
                    228:                        if(f == p) {
                    229:                                *l = p->next;
                    230:                                break;
                    231:                        }
                    232:                        l = &f->next;
                    233:                }
                    234:                unlock(&pipealloc);
                    235:                free(p);
                    236:        }
                    237: }
                    238: 
                    239: long
                    240: piperead(Chan *c, void *va, long n, ulong offset)
                    241: {
                    242:        USED(offset);
                    243:        if(c->qid.path & CHDIR)
                    244:                return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
                    245: 
                    246:        return streamread(c, va, n);
                    247: }
                    248: 
                    249: /*
                    250:  *  a write to a closed pipe causes a note to be sent to
                    251:  *  the process.
                    252:  */
                    253: long
                    254: pipewrite(Chan *c, void *va, long n, ulong offset)
                    255: {
                    256:        USED(offset);
                    257: 
                    258:        /* avoid notes when pipe is a mounted stream */
                    259:        if(c->flag & CMSG)
                    260:                return streamwrite(c, va, n, 0);
                    261: 
                    262:        if(waserror()) {
                    263:                postnote(u->p, 1, "sys: write on closed pipe", NUser);
                    264:                error(Ehungup);
                    265:        }
                    266:        n = streamwrite(c, va, n, 0);
                    267:        poperror();
                    268:        return n;
                    269: }
                    270: 
                    271: /*
                    272:  *  send a block upstream to the process.
                    273:  *  sleep until there's room upstream.
                    274:  */
                    275: static void
                    276: pipeiput(Queue *q, Block *bp)
                    277: {
                    278:        FLOWCTL(q, bp);
                    279: }
                    280: 
                    281: /*
                    282:  *  send the block to the other side
                    283:  */
                    284: static void
                    285: pipeoput(Queue *q, Block *bp)
                    286: {
                    287:        PUTNEXT(q, bp);
                    288: }
                    289: 
                    290: /*
                    291:  *  send a hangup and disconnect the streams
                    292:  */
                    293: static void
                    294: pipestclose(Queue *q)
                    295: {
                    296:        Block *bp;
                    297: 
                    298:        /*
                    299:         *  point to the bit-bucket and let any in-progress
                    300:         *  write's finish.
                    301:         */
                    302:        q->put = nullput;
                    303:        wakeup(&q->r);
                    304: 
                    305:        /*
                    306:         *  send a hangup
                    307:         */
                    308:        q = q->other;
                    309:        if(q->next == 0)
                    310:                return;
                    311:        bp = allocb(0);
                    312:        bp->type = M_HANGUP;
                    313:        PUTNEXT(q, bp);
                    314: }
                    315: 
                    316: Pipe*
                    317: getpipe(ulong path)
                    318: {
                    319:        Pipe *p;
                    320: 
                    321:        lock(&pipealloc);
                    322:        for(p = pipealloc.pipe; p; p = p->next) {
                    323:                if(path == p->path) {
                    324:                        unlock(&pipealloc);
                    325:                        return p;
                    326:                }
                    327:        }
                    328:        unlock(&pipealloc);
                    329:        panic("getpipe");
                    330:        return 0;               /* not reached */
                    331: }

unix.superglobalmegacorp.com

This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.