|
|
1.1 ! root 1: #include "u.h" ! 2: #include "../port/lib.h" ! 3: #include "mem.h" ! 4: #include "dat.h" ! 5: #include "fns.h" ! 6: #include "../port/error.h" ! 7: ! 8: #include "devtab.h" ! 9: ! 10: typedef struct Pipe Pipe; ! 11: struct Pipe ! 12: { ! 13: Ref; ! 14: QLock; ! 15: Pipe *next; ! 16: ulong path; ! 17: }; ! 18: ! 19: struct ! 20: { ! 21: Lock; ! 22: Pipe *pipe; ! 23: ulong path; ! 24: } pipealloc; ! 25: ! 26: static Pipe *getpipe(ulong); ! 27: static void pipeiput(Queue*, Block*); ! 28: static void pipeoput(Queue*, Block*); ! 29: static void pipestclose(Queue *); ! 30: ! 31: Qinfo pipeinfo = ! 32: { ! 33: pipeiput, ! 34: pipeoput, ! 35: 0, ! 36: pipestclose, ! 37: "pipe" ! 38: }; ! 39: ! 40: Dirtab pipedir[] = ! 41: { ! 42: "data", {Sdataqid}, 0, 0600, ! 43: "ctl", {Sctlqid}, 0, 0600, ! 44: "data1", {Sdataqid}, 0, 0600, ! 45: "ctl1", {Sctlqid}, 0, 0600, ! 46: }; ! 47: #define NPIPEDIR 4 ! 48: ! 49: void ! 50: pipeinit(void) ! 51: { ! 52: } ! 53: ! 54: void ! 55: pipereset(void) ! 56: { ! 57: } ! 58: ! 59: /* ! 60: * create a pipe, no streams are created until an open ! 61: */ ! 62: Chan* ! 63: pipeattach(char *spec) ! 64: { ! 65: Pipe *p; ! 66: Chan *c; ! 67: ! 68: c = devattach('|', spec); ! 69: p = smalloc(sizeof(Pipe)); ! 70: p->ref = 1; ! 71: ! 72: lock(&pipealloc); ! 73: p->path = ++pipealloc.path; ! 74: p->next = pipealloc.pipe; ! 75: pipealloc.pipe = p; ! 76: unlock(&pipealloc); ! 77: ! 78: c->qid = (Qid){CHDIR|STREAMQID(2*p->path, 0), 0}; ! 79: c->dev = 0; ! 80: return c; ! 81: } ! 82: ! 83: Chan* ! 84: pipeclone(Chan *c, Chan *nc) ! 85: { ! 86: Pipe *p; ! 87: ! 88: p = getpipe(STREAMID(c->qid.path)/2); ! 89: nc = devclone(c, nc); ! 90: if(incref(p) <= 1) ! 91: panic("pipeclone"); ! 92: return nc; ! 93: } ! 94: ! 95: int ! 96: pipegen(Chan *c, Dirtab *tab, int ntab, int i, Dir *dp) ! 97: { ! 98: int id; ! 99: ! 100: id = STREAMID(c->qid.path); ! 101: if(i > 1) ! 102: id++; ! 103: if(tab==0 || i>=ntab) ! 104: return -1; ! 105: tab += i; ! 106: devdir(c, (Qid){STREAMQID(id, tab->qid.path),0}, tab->name, tab->length, eve, tab->perm, dp); ! 107: return 1; ! 108: } ! 109: ! 110: ! 111: int ! 112: pipewalk(Chan *c, char *name) ! 113: { ! 114: return devwalk(c, name, pipedir, NPIPEDIR, pipegen); ! 115: } ! 116: ! 117: void ! 118: pipestat(Chan *c, char *db) ! 119: { ! 120: streamstat(c, db, "pipe", 0666); ! 121: } ! 122: ! 123: /* ! 124: * if the stream doesn't exist, create it ! 125: */ ! 126: Chan * ! 127: pipeopen(Chan *c, int omode) ! 128: { ! 129: Pipe *p; ! 130: int other; ! 131: Stream *local, *remote; ! 132: ! 133: if(c->qid.path & CHDIR){ ! 134: if(omode != OREAD) ! 135: error(Ebadarg); ! 136: c->mode = omode; ! 137: c->flag |= COPEN; ! 138: c->offset = 0; ! 139: return c; ! 140: } ! 141: ! 142: p = getpipe(STREAMID(c->qid.path)/2); ! 143: if(waserror()){ ! 144: qunlock(p); ! 145: nexterror(); ! 146: } ! 147: qlock(p); ! 148: streamopen(c, &pipeinfo); ! 149: local = c->stream; ! 150: if(local->devq->ptr == 0){ ! 151: /* ! 152: * first open, create the other end also ! 153: */ ! 154: other = STREAMID(c->qid.path)^1; ! 155: remote = streamnew(c->type, c->dev, other, &pipeinfo,1); ! 156: ! 157: /* ! 158: * connect the device ends of both streams ! 159: */ ! 160: local->devq->ptr = remote; ! 161: remote->devq->ptr = local; ! 162: local->devq->other->next = remote->devq; ! 163: remote->devq->other->next = local->devq; ! 164: } else if(local->opens == 1){ ! 165: /* ! 166: * keep other side around till last close of this side ! 167: */ ! 168: streamenter(local->devq->ptr); ! 169: } ! 170: qunlock(p); ! 171: poperror(); ! 172: ! 173: c->mode = openmode(omode); ! 174: c->flag |= COPEN; ! 175: c->offset = 0; ! 176: return c; ! 177: } ! 178: ! 179: void ! 180: pipecreate(Chan *c, char *name, int omode, ulong perm) ! 181: { ! 182: USED(c, name, omode, perm); ! 183: error(Egreg); ! 184: } ! 185: ! 186: void ! 187: piperemove(Chan *c) ! 188: { ! 189: USED(c); ! 190: error(Egreg); ! 191: } ! 192: ! 193: void ! 194: pipewstat(Chan *c, char *db) ! 195: { ! 196: USED(c, db); ! 197: error(Eperm); ! 198: } ! 199: ! 200: void ! 201: pipeclose(Chan *c) ! 202: { ! 203: Pipe *p, *f, **l; ! 204: Stream *remote; ! 205: ! 206: p = getpipe(STREAMID(c->qid.path)/2); ! 207: ! 208: /* ! 209: * take care of local and remote streams ! 210: */ ! 211: if(c->stream){ ! 212: qlock(p); ! 213: remote = c->stream->devq->ptr; ! 214: if(streamclose(c) == 0){ ! 215: if(remote) ! 216: streamexit(remote); ! 217: } ! 218: qunlock(p); ! 219: } ! 220: ! 221: /* ! 222: * free the structure ! 223: */ ! 224: if(decref(p) == 0){ ! 225: lock(&pipealloc); ! 226: l = &pipealloc.pipe; ! 227: for(f = *l; f; f = f->next) { ! 228: if(f == p) { ! 229: *l = p->next; ! 230: break; ! 231: } ! 232: l = &f->next; ! 233: } ! 234: unlock(&pipealloc); ! 235: free(p); ! 236: } ! 237: } ! 238: ! 239: long ! 240: piperead(Chan *c, void *va, long n, ulong offset) ! 241: { ! 242: USED(offset); ! 243: if(c->qid.path & CHDIR) ! 244: return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen); ! 245: ! 246: return streamread(c, va, n); ! 247: } ! 248: ! 249: /* ! 250: * a write to a closed pipe causes a note to be sent to ! 251: * the process. ! 252: */ ! 253: long ! 254: pipewrite(Chan *c, void *va, long n, ulong offset) ! 255: { ! 256: USED(offset); ! 257: ! 258: /* avoid notes when pipe is a mounted stream */ ! 259: if(c->flag & CMSG) ! 260: return streamwrite(c, va, n, 0); ! 261: ! 262: if(waserror()) { ! 263: postnote(u->p, 1, "sys: write on closed pipe", NUser); ! 264: error(Ehungup); ! 265: } ! 266: n = streamwrite(c, va, n, 0); ! 267: poperror(); ! 268: return n; ! 269: } ! 270: ! 271: /* ! 272: * send a block upstream to the process. ! 273: * sleep until there's room upstream. ! 274: */ ! 275: static void ! 276: pipeiput(Queue *q, Block *bp) ! 277: { ! 278: FLOWCTL(q, bp); ! 279: } ! 280: ! 281: /* ! 282: * send the block to the other side ! 283: */ ! 284: static void ! 285: pipeoput(Queue *q, Block *bp) ! 286: { ! 287: PUTNEXT(q, bp); ! 288: } ! 289: ! 290: /* ! 291: * send a hangup and disconnect the streams ! 292: */ ! 293: static void ! 294: pipestclose(Queue *q) ! 295: { ! 296: Block *bp; ! 297: ! 298: /* ! 299: * point to the bit-bucket and let any in-progress ! 300: * write's finish. ! 301: */ ! 302: q->put = nullput; ! 303: wakeup(&q->r); ! 304: ! 305: /* ! 306: * send a hangup ! 307: */ ! 308: q = q->other; ! 309: if(q->next == 0) ! 310: return; ! 311: bp = allocb(0); ! 312: bp->type = M_HANGUP; ! 313: PUTNEXT(q, bp); ! 314: } ! 315: ! 316: Pipe* ! 317: getpipe(ulong path) ! 318: { ! 319: Pipe *p; ! 320: ! 321: lock(&pipealloc); ! 322: for(p = pipealloc.pipe; p; p = p->next) { ! 323: if(path == p->path) { ! 324: unlock(&pipealloc); ! 325: return p; ! 326: } ! 327: } ! 328: unlock(&pipealloc); ! 329: panic("getpipe"); ! 330: return 0; /* not reached */ ! 331: }
This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.