File:  [Plan 9 NeXT] / lucent / sys / src / 9 / port / devpipe.c
Revision 1.1.1.1 (vendor branch): download - view: text, annotated - select for diffs
Tue Apr 24 18:01:03 2018 UTC (8 years, 1 month ago) by root
Branches: lucent, MAIN
CVS tags: plan9, HEAD
Plan 9 NeXT

#include	"u.h"
#include	"../port/lib.h"
#include	"mem.h"
#include	"dat.h"
#include	"fns.h"
#include	"../port/error.h"

#include	"devtab.h"

typedef struct Pipe	Pipe;
struct Pipe
{
	Ref;
	QLock;
	Pipe	*next;
	ulong	path;
};

struct
{
	Lock;
	Pipe	*pipe;
	ulong	path;
} pipealloc;

static Pipe *getpipe(ulong);
static void pipeiput(Queue*, Block*);
static void pipeoput(Queue*, Block*);
static void pipestclose(Queue *);

Qinfo pipeinfo =
{
	pipeiput,
	pipeoput,
	0,
	pipestclose,
	"pipe"
};

Dirtab pipedir[] =
{
	"data",		{Sdataqid},	0,			0600,
	"ctl",		{Sctlqid},	0,			0600,
	"data1",	{Sdataqid},	0,			0600,
	"ctl1",		{Sctlqid},	0,			0600,
};
#define NPIPEDIR 4

void
pipeinit(void)
{
}

void
pipereset(void)
{
}

/*
 *  create a pipe, no streams are created until an open
 */
Chan*
pipeattach(char *spec)
{
	Pipe *p;
	Chan *c;

	c = devattach('|', spec);
	p = smalloc(sizeof(Pipe));
	p->ref = 1;

	lock(&pipealloc);
	p->path = ++pipealloc.path;
	p->next = pipealloc.pipe;
	pipealloc.pipe = p;
	unlock(&pipealloc);

	c->qid = (Qid){CHDIR|STREAMQID(2*p->path, 0), 0};
	c->dev = 0;
	return c;
}

Chan*
pipeclone(Chan *c, Chan *nc)
{
	Pipe *p;

	p = getpipe(STREAMID(c->qid.path)/2);
	nc = devclone(c, nc);
	if(incref(p) <= 1)
		panic("pipeclone");
	return nc;
}

int
pipegen(Chan *c, Dirtab *tab, int ntab, int i, Dir *dp)
{
	int id;

	id = STREAMID(c->qid.path);
	if(i > 1)
		id++;
	if(tab==0 || i>=ntab)
		return -1;
	tab += i;
	devdir(c, (Qid){STREAMQID(id, tab->qid.path),0}, tab->name, tab->length, eve, tab->perm, dp);
	return 1;
}


int
pipewalk(Chan *c, char *name)
{
	return devwalk(c, name, pipedir, NPIPEDIR, pipegen);
}

void
pipestat(Chan *c, char *db)
{
	streamstat(c, db, "pipe", 0666);
}

/*
 *  if the stream doesn't exist, create it
 */
Chan *
pipeopen(Chan *c, int omode)
{
	Pipe *p;
	int other;
	Stream *local, *remote;

	if(c->qid.path & CHDIR){
		if(omode != OREAD)
			error(Ebadarg);
		c->mode = omode;
		c->flag |= COPEN;
		c->offset = 0;
		return c;
	}

	p = getpipe(STREAMID(c->qid.path)/2);
	if(waserror()){
		qunlock(p);
		nexterror();
	}
	qlock(p);
	streamopen(c, &pipeinfo);
	local = c->stream;
	if(local->devq->ptr == 0){
		/*
		 *  first open, create the other end also
		 */
		other = STREAMID(c->qid.path)^1;
		remote = streamnew(c->type, c->dev, other, &pipeinfo,1);

		/*
		 *  connect the device ends of both streams
		 */
		local->devq->ptr = remote;
		remote->devq->ptr = local;
		local->devq->other->next = remote->devq;
		remote->devq->other->next = local->devq;
	} else if(local->opens == 1){
		/*
		 *  keep other side around till last close of this side
		 */
		streamenter(local->devq->ptr);
	}
	qunlock(p);
	poperror();

	c->mode = openmode(omode);
	c->flag |= COPEN;
	c->offset = 0;
	return c;
}

void
pipecreate(Chan *c, char *name, int omode, ulong perm)
{
	USED(c, name, omode, perm);
	error(Egreg);
}

void
piperemove(Chan *c)
{
	USED(c);
	error(Egreg);
}

void
pipewstat(Chan *c, char *db)
{
	USED(c, db);
	error(Eperm);
}

void
pipeclose(Chan *c)
{
	Pipe *p, *f, **l;
	Stream *remote;

	p = getpipe(STREAMID(c->qid.path)/2);

	/*
	 *  take care of local and remote streams
	 */
	if(c->stream){
		qlock(p);
		remote = c->stream->devq->ptr;
		if(streamclose(c) == 0){
			if(remote)
				streamexit(remote);
		}
		qunlock(p);
	}

	/*
	 *  free the structure
	 */
	if(decref(p) == 0){
		lock(&pipealloc);
		l = &pipealloc.pipe;
		for(f = *l; f; f = f->next) {
			if(f == p) {
				*l = p->next;
				break;
			}
			l = &f->next;
		}
		unlock(&pipealloc);
		free(p);
	}
}

long
piperead(Chan *c, void *va, long n, ulong offset)
{
	USED(offset);
	if(c->qid.path & CHDIR)
		return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);

	return streamread(c, va, n);
}

/*
 *  a write to a closed pipe causes a note to be sent to
 *  the process.
 */
long
pipewrite(Chan *c, void *va, long n, ulong offset)
{
	USED(offset);

	/* avoid notes when pipe is a mounted stream */
	if(c->flag & CMSG)
		return streamwrite(c, va, n, 0);

	if(waserror()) {
		postnote(u->p, 1, "sys: write on closed pipe", NUser);
		error(Ehungup);
	}
	n = streamwrite(c, va, n, 0);
	poperror();
	return n;
}

/*
 *  send a block upstream to the process.
 *  sleep until there's room upstream.
 */
static void
pipeiput(Queue *q, Block *bp)
{
	FLOWCTL(q, bp);
}

/*
 *  send the block to the other side
 */
static void
pipeoput(Queue *q, Block *bp)
{
	PUTNEXT(q, bp);
}

/*
 *  send a hangup and disconnect the streams
 */
static void
pipestclose(Queue *q)
{
	Block *bp;

	/*
	 *  point to the bit-bucket and let any in-progress
	 *  write's finish.
	 */
	q->put = nullput;
	wakeup(&q->r);

	/*
	 *  send a hangup
	 */
	q = q->other;
	if(q->next == 0)
		return;
	bp = allocb(0);
	bp->type = M_HANGUP;
	PUTNEXT(q, bp);
}

Pipe*
getpipe(ulong path)
{
	Pipe *p;

	lock(&pipealloc);
	for(p = pipealloc.pipe; p; p = p->next) {
		if(path == p->path) {
			unlock(&pipealloc);
			return p;
		}
	}
	unlock(&pipealloc);
	panic("getpipe");
	return 0;		/* not reached */
}

unix.superglobalmegacorp.com

This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.