|
|
1.1 root 1: #include "u.h"
2: #include "../port/lib.h"
3: #include "mem.h"
4: #include "dat.h"
5: #include "fns.h"
6: #include "../port/error.h"
7:
8: #include "devtab.h"
9:
10: typedef struct Pipe Pipe;
11: struct Pipe
12: {
13: Ref;
14: QLock;
15: Pipe *next;
16: ulong path;
17: };
18:
19: struct
20: {
21: Lock;
22: Pipe *pipe;
23: ulong path;
24: } pipealloc;
25:
26: static Pipe *getpipe(ulong);
27: static void pipeiput(Queue*, Block*);
28: static void pipeoput(Queue*, Block*);
29: static void pipestclose(Queue *);
30:
31: Qinfo pipeinfo =
32: {
33: pipeiput,
34: pipeoput,
35: 0,
36: pipestclose,
37: "pipe"
38: };
39:
40: Dirtab pipedir[] =
41: {
42: "data", {Sdataqid}, 0, 0600,
43: "ctl", {Sctlqid}, 0, 0600,
44: "data1", {Sdataqid}, 0, 0600,
45: "ctl1", {Sctlqid}, 0, 0600,
46: };
47: #define NPIPEDIR 4
48:
49: void
50: pipeinit(void)
51: {
52: }
53:
54: void
55: pipereset(void)
56: {
57: }
58:
59: /*
60: * create a pipe, no streams are created until an open
61: */
62: Chan*
63: pipeattach(char *spec)
64: {
65: Pipe *p;
66: Chan *c;
67:
68: c = devattach('|', spec);
69: p = smalloc(sizeof(Pipe));
70: p->ref = 1;
71:
72: lock(&pipealloc);
73: p->path = ++pipealloc.path;
74: p->next = pipealloc.pipe;
75: pipealloc.pipe = p;
76: unlock(&pipealloc);
77:
78: c->qid = (Qid){CHDIR|STREAMQID(2*p->path, 0), 0};
79: c->dev = 0;
80: return c;
81: }
82:
83: Chan*
84: pipeclone(Chan *c, Chan *nc)
85: {
86: Pipe *p;
87:
88: p = getpipe(STREAMID(c->qid.path)/2);
89: nc = devclone(c, nc);
90: if(incref(p) <= 1)
91: panic("pipeclone");
92: return nc;
93: }
94:
95: int
96: pipegen(Chan *c, Dirtab *tab, int ntab, int i, Dir *dp)
97: {
98: int id;
99:
100: id = STREAMID(c->qid.path);
101: if(i > 1)
102: id++;
103: if(tab==0 || i>=ntab)
104: return -1;
105: tab += i;
106: devdir(c, (Qid){STREAMQID(id, tab->qid.path),0}, tab->name, tab->length, eve, tab->perm, dp);
107: return 1;
108: }
109:
110:
111: int
112: pipewalk(Chan *c, char *name)
113: {
114: return devwalk(c, name, pipedir, NPIPEDIR, pipegen);
115: }
116:
117: void
118: pipestat(Chan *c, char *db)
119: {
120: streamstat(c, db, "pipe", 0666);
121: }
122:
123: /*
124: * if the stream doesn't exist, create it
125: */
126: Chan *
127: pipeopen(Chan *c, int omode)
128: {
129: Pipe *p;
130: int other;
131: Stream *local, *remote;
132:
133: if(c->qid.path & CHDIR){
134: if(omode != OREAD)
135: error(Ebadarg);
136: c->mode = omode;
137: c->flag |= COPEN;
138: c->offset = 0;
139: return c;
140: }
141:
142: p = getpipe(STREAMID(c->qid.path)/2);
143: if(waserror()){
144: qunlock(p);
145: nexterror();
146: }
147: qlock(p);
148: streamopen(c, &pipeinfo);
149: local = c->stream;
150: if(local->devq->ptr == 0){
151: /*
152: * first open, create the other end also
153: */
154: other = STREAMID(c->qid.path)^1;
155: remote = streamnew(c->type, c->dev, other, &pipeinfo,1);
156:
157: /*
158: * connect the device ends of both streams
159: */
160: local->devq->ptr = remote;
161: remote->devq->ptr = local;
162: local->devq->other->next = remote->devq;
163: remote->devq->other->next = local->devq;
164: } else if(local->opens == 1){
165: /*
166: * keep other side around till last close of this side
167: */
168: streamenter(local->devq->ptr);
169: }
170: qunlock(p);
171: poperror();
172:
173: c->mode = openmode(omode);
174: c->flag |= COPEN;
175: c->offset = 0;
176: return c;
177: }
178:
179: void
180: pipecreate(Chan *c, char *name, int omode, ulong perm)
181: {
182: USED(c, name, omode, perm);
183: error(Egreg);
184: }
185:
186: void
187: piperemove(Chan *c)
188: {
189: USED(c);
190: error(Egreg);
191: }
192:
193: void
194: pipewstat(Chan *c, char *db)
195: {
196: USED(c, db);
197: error(Eperm);
198: }
199:
200: void
201: pipeclose(Chan *c)
202: {
203: Pipe *p, *f, **l;
204: Stream *remote;
205:
206: p = getpipe(STREAMID(c->qid.path)/2);
207:
208: /*
209: * take care of local and remote streams
210: */
211: if(c->stream){
212: qlock(p);
213: remote = c->stream->devq->ptr;
214: if(streamclose(c) == 0){
215: if(remote)
216: streamexit(remote);
217: }
218: qunlock(p);
219: }
220:
221: /*
222: * free the structure
223: */
224: if(decref(p) == 0){
225: lock(&pipealloc);
226: l = &pipealloc.pipe;
227: for(f = *l; f; f = f->next) {
228: if(f == p) {
229: *l = p->next;
230: break;
231: }
232: l = &f->next;
233: }
234: unlock(&pipealloc);
235: free(p);
236: }
237: }
238:
239: long
240: piperead(Chan *c, void *va, long n, ulong offset)
241: {
242: USED(offset);
243: if(c->qid.path & CHDIR)
244: return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
245:
246: return streamread(c, va, n);
247: }
248:
249: /*
250: * a write to a closed pipe causes a note to be sent to
251: * the process.
252: */
253: long
254: pipewrite(Chan *c, void *va, long n, ulong offset)
255: {
256: USED(offset);
257:
258: /* avoid notes when pipe is a mounted stream */
259: if(c->flag & CMSG)
260: return streamwrite(c, va, n, 0);
261:
262: if(waserror()) {
263: postnote(u->p, 1, "sys: write on closed pipe", NUser);
264: error(Ehungup);
265: }
266: n = streamwrite(c, va, n, 0);
267: poperror();
268: return n;
269: }
270:
271: /*
272: * send a block upstream to the process.
273: * sleep until there's room upstream.
274: */
275: static void
276: pipeiput(Queue *q, Block *bp)
277: {
278: FLOWCTL(q, bp);
279: }
280:
281: /*
282: * send the block to the other side
283: */
284: static void
285: pipeoput(Queue *q, Block *bp)
286: {
287: PUTNEXT(q, bp);
288: }
289:
290: /*
291: * send a hangup and disconnect the streams
292: */
293: static void
294: pipestclose(Queue *q)
295: {
296: Block *bp;
297:
298: /*
299: * point to the bit-bucket and let any in-progress
300: * write's finish.
301: */
302: q->put = nullput;
303: wakeup(&q->r);
304:
305: /*
306: * send a hangup
307: */
308: q = q->other;
309: if(q->next == 0)
310: return;
311: bp = allocb(0);
312: bp->type = M_HANGUP;
313: PUTNEXT(q, bp);
314: }
315:
316: Pipe*
317: getpipe(ulong path)
318: {
319: Pipe *p;
320:
321: lock(&pipealloc);
322: for(p = pipealloc.pipe; p; p = p->next) {
323: if(path == p->path) {
324: unlock(&pipealloc);
325: return p;
326: }
327: }
328: unlock(&pipealloc);
329: panic("getpipe");
330: return 0; /* not reached */
331: }
This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.