|
|
1.1 root 1: #include "u.h"
2: #include "lib.h"
3: #include "mem.h"
4: #include "dat.h"
5: #include "fns.h"
6: #include "io.h"
7: #include "errno.h"
8:
9: #define NOW (MACHP(0)->ticks*MS2HZ)
10:
11: enum {
12: /*
13: * configuration parameters
14: */
15: Nconv = 32, /* total number of active circuits */
16: Nifc = 2, /* number of interfaces */
17: MSrexmit = 300, /* retranmission interval in ms */
18: MSack = 50, /* ms to sit on an ack */
19:
20: /*
21: * relative or immutable
22: */
23: Ndir = Nconv + 1, /* entries in the nonet directory */
24: Nsubdir = 2, /* entries in the nonet directory */
25: Nmsg = 128, /* max number of outstanding messages */
26: Nmask = Nmsg - 1, /* mask for log(Nmsg) bits */
27: };
28:
29: typedef struct Hdr Hdr;
30: typedef struct Msg Msg;
31: typedef struct Conversation Conversation;
32: typedef struct Interface Interface;
33: typedef struct Etherhdr Etherhdr;
34:
35: /*
36: * generic nonet header
37: */
38: struct Hdr {
39: uchar circuit[3]; /* circuit number */
40: uchar flag;
41: uchar mid; /* message id */
42: uchar ack; /* piggy back ack */
43: uchar remain[2]; /* count of remaing bytes of data */
44: uchar sum[2]; /* checksum (0 means none) */
45: };
46: #define HDRSIZE 10
47: #define NEWCALL 0x1 /* flag bit marking a new circuit */
48: #define HANGUP 0x2 /* flag bit requesting hangup */
49: #define ACKME 0x4 /* acknowledge this message */
50:
51: /*
52: * a buffer describing a nonet message
53: */
54: struct Msg {
55: QLock;
56: Blist;
57: int mid; /* sequence number */
58: int rem; /* remaining */
59: long time;
60: int acked;
61: };
62:
63: /*
64: * Nonet conversation states (for Conversation.state)
65: */
66: enum {
67: Cclosed,
68: Copen,
69: Clistening,
70: Cconnected,
71: Cconnecting,
72: Chungup,
73: Cclosing,
74: };
75:
76: /*
77: * one exists for each Nonet conversation.
78: */
79: struct Conversation {
80: QLock;
81:
82: Queue *rq; /* input queue */
83: int version; /* incremented each time struct is changed */
84: int state; /* true if listening */
85:
86: Msg in[Nmsg]; /* messages being received */
87: int rcvcircuit; /* circuit number of incoming packets */
88:
89: uchar ack[Nmsg]; /* acknowledgements waiting to be sent */
90: long atime[Nmsg];
91: int afirst;
92: int anext;
93:
94: QLock xlock; /* one trasmitter at a time */
95: Rendez r; /* process waiting for an output mid */
96: Msg ctl; /* for control messages */
97: Msg out[Nmsg]; /* messages being sent */
98: int first; /* first unacknowledged message */
99: int next; /* next message buffer to use */
100: int lastacked; /* last message acked */
101: Block *media; /* prototype media output header */
102: Hdr *hdr; /* nonet header inside of media header */
103:
104: Interface *ifc;
105: int kstarted;
106: char raddr[64]; /* remote address */
107: int rexmit; /* statistics */
108: int retry;
109: int bad;
110: int sent;
111: int rcvd;
112: };
113:
114: /*
115: * a nonet interface. one exists for every stream that a
116: * nonet multiplexor is pushed onto.
117: */
118: struct Interface {
119: QLock;
120: int ref;
121: char name[64]; /* interface name */
122: int maxtu; /* maximum transfer unit */
123: int mintu; /* minimum transfer unit */
124: int hsize; /* media header size */
125: Queue *wq; /* interface output queue */
126: void (*connect)(Conversation *, char *);
127: Conversation conv[Nconv];
128: };
129: static Interface interface[Nifc];
130:
131: void nonetkproc(void *);
132: int cksum(Block*, int);
133: extern Qinfo noetherinfo;
134:
135: /*
136: * start a new conversation. start an ack/retransmit process if
137: * none already exists for this circuit.
138: */
139: static void
140: startconv(Conversation *cp, int circuit, int state)
141: {
142: int i;
143: char name[32];
144: Interface *ifc;
145:
146: ifc = cp->ifc;
147:
148: /*
149: * allocate the prototype header
150: */
151: cp->media = allocb(ifc->hsize + HDRSIZE);
152: cp->media->wptr += ifc->hsize + HDRSIZE;
153: cp->hdr = (Hdr *)(cp->media->rptr + ifc->hsize);
154:
155: /*
156: * fill in the circuit number
157: */
158: cp->hdr->flag = NEWCALL|ACKME;
159: cp->hdr->circuit[2] = circuit>>16;
160: cp->hdr->circuit[1] = circuit>>8;
161: cp->hdr->circuit[0] = circuit;
162:
163: /*
164: * set the state variables
165: */
166: cp->state = state;
167: cp->retry = 0;
168: for(i = 1; i < Nmsg; i++){
169: cp->in[i].mid = i;
170: cp->in[i].acked = 0;
171: cp->in[i].rem = 0;
172: cp->out[i].mid = i | Nmsg;
173: cp->out[i].acked = 1;
174: cp->out[i].rem = 0;
175: }
176: cp->in[0].mid = Nmsg;
177: cp->in[0].acked = 0;
178: cp->in[0].rem = 0;
179: cp->out[0].mid = 0;
180: cp->out[0].acked = 1;
181: cp->out[0].rem = 0;
182: cp->first = cp->next = 1;
183: cp->rexmit = cp->bad = cp->sent = cp->rcvd = cp->lastacked = 0;
184:
185: /*
186: * used for demultiplexing
187: */
188: cp->rcvcircuit = circuit ^ 1;
189:
190: /*
191: * start the ack/rexmit process
192: */
193: if(cp->kstarted == 0){
194: cp->kstarted = 1;
195: sprint(name, "**nonet%d**", cp - ifc->conv);
196: kproc(name, nonetkproc, cp);
197: }
198: }
199:
200: /*
201: * connect to the destination whose name is pointed to by bp->rptr.
202: */
203: void
204: connect(Conversation *cp, Block *bp)
205: {
206: Interface *ifc;
207: char *iname;
208: int len;
209: int circuit;
210:
211: ifc = cp->ifc;
212: qlock(ifc);
213: if(waserror()){
214: qunlock(ifc);
215: if(cp->media){
216: freeb(cp->media);
217: cp->media = 0;
218: }
219: freeb(bp);
220: nexterror();
221: }
222:
223: /*
224: * init
225: */
226: startconv(cp, (++(cp->version) * Nconv) + 2*(cp - ifc->conv), Cconnecting);
227: (*ifc->connect)(cp, (char *)bp->rptr);
228: strncpy(cp->raddr, (char *)bp->rptr, sizeof(cp->raddr));
229:
230: qunlock(ifc);
231: freeb(bp);
232: poperror();
233: }
234:
235: /*
236: * listen for calls from any interface
237: */
238: static int
239: listendone(void *a)
240: {
241: Conversation *cp;
242:
243: cp = (Conversation *)a;
244: return cp->state != Clistening;
245: }
246: static void
247: listen(Conversation *cp, Block *bp)
248: {
249: freeb(bp);
250: if(cp->state >= Clistening){
251: print("listen in use %d %ux %ux\n", cp->state, cp->ifc, interface);
252: error(0, Einuse);
253: }
254: cp->state = Clistening;
255: sleep(&cp->r, listendone, cp);
256: }
257:
258: /*
259: * send a hangup signal up the stream to get all line disciplines
260: * to cease and desist
261: */
262: static void
263: hangup(Conversation *cp)
264: {
265: Block *bp;
266: Queue *q;
267:
268: cp->state = Chungup;
269: bp = allocb(0);
270: bp->type = M_HANGUP;
271: q = cp->rq;
272: PUTNEXT(q, bp);
273: wakeup(&cp->r);
274: }
275:
276: /*
277: * process a message acknowledgement. if the message
278: * has any xmit buffers queued, free them.
279: */
280: static void
281: rcvack(Conversation *cp, int mid)
282: {
283: Msg *mp;
284: Block *bp;
285: int i;
286:
287: mp = &cp->out[mid & Nmask];
288:
289: /*
290: * if already acked, ignore
291: */
292: if(mp->acked || mp->mid != mid)
293: return;
294: mp->acked = 1;
295: cp->lastacked = mid;
296:
297: /*
298: * free buffers
299: */
300: qlock(mp);
301: while(bp = getb(mp))
302: freeb(bp);
303: qunlock(mp);
304:
305: /*
306: * advance the first pointer and wakeup any processes waiting for a mid
307: */
308: if((mid&Nmask) == cp->first){
309: cp->retry = 0;
310: for(i = cp->first; i!=cp->next && cp->out[i].acked; i = (i+1)&Nmask)
311: ;
312: cp->first = i;
313: wakeup(&cp->r);
314: }
315: }
316:
317: /*
318: * queue an acknowledgement to be sent. ignore it if we already have Nmsg
319: * acknowledgements queued.
320: */
321: static void
322: queueack(Conversation *cp, int mid)
323: {
324: int next;
325: ulong now;
326:
327: now = NOW;
328: next = (cp->anext + 1)&Nmask;
329: if(next != cp->afirst){
330: cp->ack[cp->anext] = mid;
331: cp->atime[cp->anext] = now + MSack;
332: cp->anext = next;
333: }
334: if(now > cp->atime[cp->afirst])
335: wakeup(&cp->rq->r);
336: }
337:
338: /*
339: * make a packet header
340: */
341: Block *
342: mkhdr(Conversation *cp, int rem)
343: {
344: Block *bp;
345: Hdr *hp;
346:
347: bp = allocb(cp->ifc->hsize + HDRSIZE + cp->ifc->mintu);
348: memcpy(bp->wptr, cp->media->rptr, cp->ifc->hsize + HDRSIZE);
349: bp->wptr += cp->ifc->hsize + HDRSIZE;
350: hp = (Hdr *)(bp->rptr + cp->ifc->hsize);
351: hp->remain[1] = rem>>8;
352: hp->remain[0] = rem;
353: hp->sum[0] = hp->sum[1] = 0;
354: return bp;
355: }
356:
357: /*
358: * transmit a message. this involves breaking a possibly multi-block message into
359: * a train of packets on the media.
360: *
361: * called by nonetoput() and nonetkproc(). the qlock(mp) synchronizes these two
362: * processes.
363: */
364: static void
365: sendmsg(Conversation *cp, Msg *mp)
366: {
367: Interface *ifc;
368: Queue *wq;
369: int msgrem;
370: int pktrem;
371: int n;
372: Block *bp, *pkt, *last;
373: uchar *rptr;
374:
375: ifc = cp->ifc;
376: if(ifc == 0)
377: return;
378: wq = ifc->wq->next;
379:
380: /*
381: * one transmitter at a time
382: */
383: qlock(&cp->xlock);
384:
385: /*
386: * synchronize with rcvack, don't want to send while the
387: * message is being freed.
388: */
389: qlock(mp);
390:
391: if(waserror() || mp->acked){
392: qunlock(&cp->xlock);
393: qunlock(mp);
394: return;
395: }
396:
397: /*
398: * get the next acknowledge to use if the next queue up
399: * is not full.
400: */
401: if(cp->afirst != cp->anext && cp->rq->next->len < 16*1024){
402: cp->hdr->ack = cp->ack[cp->afirst];
403: cp->afirst = (cp->afirst+1)&Nmask;
404: }
405: cp->hdr->mid = mp->mid;
406:
407: if(ifc->mintu > mp->len) {
408: /*
409: * short message:
410: * copy the whole message into the header block
411: */
412: last = pkt = mkhdr(cp, mp->len);
413: for(bp = mp->first; bp; bp = bp->next){
414: memcpy(pkt->wptr, bp->rptr, n = BLEN(bp));
415: pkt->wptr += n;
416: }
417: memset(pkt->wptr, 0, n = ifc->mintu - mp->len);
418: pkt->wptr += n;
419: } else {
420: /*
421: * long message:
422: * break up the message into interface packets and send them.
423: * once around this loop for each non-header block generated.
424: */
425: msgrem = mp->len;
426: pktrem = msgrem > ifc->maxtu ? ifc->maxtu : msgrem;
427: bp = mp->first;
428: if(bp)
429: rptr = bp->rptr;
430: last = pkt = mkhdr(cp, msgrem);
431: while(bp){
432: /*
433: * if pkt full, send and create new header block
434: */
435: if(pktrem == 0){
436: cksum(pkt, ifc->hsize);
437: last->flags |= S_DELIM;
438: (*wq->put)(wq, pkt);
439: last = pkt = mkhdr(cp, -msgrem);
440: pktrem = msgrem > ifc->maxtu ? ifc->maxtu : msgrem;
441: }
442: n = bp->wptr - rptr;
443: if(n > pktrem)
444: n = pktrem;
445: last = last->next = allocb(0);
446: last->rptr = rptr;
447: last->wptr = rptr = rptr + n;
448: msgrem -= n;
449: pktrem -= n;
450: if(rptr >= bp->wptr){
451: bp = bp->next;
452: if(bp)
453: rptr = bp->rptr;
454: }
455: }
456: }
457: cksum(pkt, ifc->hsize);
458: last->flags |= S_DELIM;
459: (*wq->put)(wq, pkt);
460: mp->time = NOW + MSrexmit;
461: qunlock(mp);
462: qunlock(&cp->xlock);
463: poperror();
464: }
465:
466: /*
467: * send a control message (hangup or acknowledgement).
468: */
469: sendctlmsg(Conversation *cp, int flag, int new)
470: {
471: cp->ctl.len = 0;
472: cp->ctl.first = 0;
473: cp->ctl.acked = 0;
474: if(new)
475: cp->ctl.mid = Nmsg^cp->out[cp->next].mid;
476: else
477: cp->ctl.mid = cp->lastacked;
478: cp->hdr->flag |= flag;
479: sendmsg(cp, &cp->ctl);
480: }
481:
482: /*
483: * receive a message (called by the multiplexor; noetheriput, nofddiiput, ...)
484: */
485: static void
486: rcvmsg(Conversation *cp, Block *bp)
487: {
488: Block *nbp;
489: Hdr *h;
490: short r;
491: int c;
492: Msg *mp;
493: int f;
494: Queue *q;
495:
496: q = cp->rq;
497:
498: /*
499: * grab the packet header, push the pointer past the nonet header
500: */
501: h = (Hdr *)bp->rptr;
502: bp->rptr += HDRSIZE;
503: mp = &cp->in[h->mid & Nmask];
504: r = (h->remain[1]<<8) | h->remain[0];
505: f = h->flag;
506:
507: /*
508: * if a new call request comes in on a connected channel, hang up the call
509: */
510: if(h->mid==0 && (f & NEWCALL) && cp->state==Cconnected){
511: freeb(bp);
512: hangup(cp);
513: return;
514: }
515:
516: /*
517: * ignore old messages and process the acknowledgement
518: */
519: if(h->mid != mp->mid){
520: if(r == 0){
521: rcvack(cp, h->ack);
522: if(f & HANGUP)
523: hangup(cp);
524: } else {
525: if(r>0)
526: queueack(cp, h->mid);
527: cp->bad++;
528: }
529: freeb(bp);
530: return;
531: }
532:
533: if(r>=0){
534: /*
535: * start of message packet
536: */
537: if(mp->first){
538: cp->bad++;
539: freeb(bp);
540: return;
541: }
542: mp->rem = r;
543: } else {
544: /*
545: * a continuation
546: */
547: if(-r != mp->rem) {
548: cp->bad++;
549: freeb(bp);
550: return;
551: }
552: }
553:
554: /*
555: * take care of packets that were padded up
556: */
557: mp->rem -= BLEN(bp);
558: if(mp->rem < 0){
559: if(-mp->rem <= BLEN(bp)){
560: bp->wptr += mp->rem;
561: mp->rem = 0;
562: } else
563: panic("rcvmsg: short packet");
564: }
565: putb(mp, bp);
566:
567: /*
568: * if the last chunk - pass it up the stream and wake any
569: * waiting process.
570: *
571: * if not, strip off the delimiter.
572: */
573: if(mp->rem == 0){
574: rcvack(cp, h->ack);
575: if(f & ACKME)
576: queueack(cp, h->mid);
577: mp->last->flags |= S_DELIM;
578: PUTNEXT(q, mp->first);
579: mp->first = mp->last = 0;
580: mp->len = 0;
581: cp->rcvd++;
582:
583: /*
584: * cycle bufffer to next expected mid
585: */
586: mp->mid ^= Nmsg;
587:
588: /*
589: * stop xmitting the NEWCALL flag
590: */
591: if(cp->state==Cconnecting && !(f & NEWCALL))
592: cp->state = Cconnected;
593: } else
594: mp->last->flags &= ~S_DELIM;
595:
596: }
597:
598: /*
599: * the device stream module definition
600: */
601: static void nonetstopen(Queue *, Stream *);
602: static void nonetstclose(Queue *);
603: static void nonetoput(Queue *, Block *);
604: static void nonetiput(Queue *, Block *);
605: Qinfo nonetinfo = { nonetiput, nonetoput, nonetstopen, nonetstclose, "nonet" } ;
606:
607: /*
608: * store the device end of the stream so that the multiplexor can
609: * send blocks upstream. this is called by streamopen() when a
610: * nonet device steam is created.
611: */
612: static void
613: nonetstopen(Queue *q, Stream *s)
614: {
615: Interface *ifc;
616: Conversation *cp;
617:
618: ifc = &interface[s->dev];
619: cp = &ifc->conv[s->id];
620: cp->ifc = ifc;
621: cp->rq = RD(q);
622: cp->state = Copen;
623: RD(q)->ptr = WR(q)->ptr = (void *)cp;
624: }
625:
626: /*
627: * wait until all output has drained or a hangup is received.
628: * then send a hangup message (until one is received).
629: */
630: static int
631: isflushed(void *a)
632: {
633: Conversation *cp;
634:
635: cp = (Conversation *)a;
636: return cp->first == cp->next || cp->state == Chungup;
637: }
638: static int
639: ishungup(void *a)
640: {
641: Conversation *cp;
642:
643: cp = (Conversation *)a;
644: return cp->state == Chungup;
645: }
646: static int
647: isdead(void *a)
648: {
649: Conversation *cp;
650:
651: cp = (Conversation *)a;
652: return cp->kstarted == 0;
653: }
654: static void
655: nonetstclose(Queue *q)
656: {
657: Conversation *cp;
658: Msg *mp;
659: int i;
660:
661: cp = (Conversation *)q->ptr;
662:
663: /*
664: * wait for all messages to drain
665: */
666: while(!isflushed(cp))
667: sleep(&cp->r, isflushed, cp);
668:
669: /*
670: * ack all outstanding messages
671: */
672: for(; cp->first != cp->next; cp->first = (cp->first+1)&Nmask) {
673: mp = &cp->out[cp->first];
674: if(!mp->acked)
675: rcvack(cp, mp->mid);
676: }
677: cp->first = cp->next;
678:
679: /*
680: * send hangup messages to the other side
681: * until it hangs up.
682: */
683: if(cp->state >= Cconnected){
684: sendctlmsg(cp, HANGUP, 1);
685: for(i=0; i<10 && !ishungup(cp); i++){
686: sendctlmsg(cp, HANGUP, 1);
687: tsleep(&cp->r, ishungup, cp, MSrexmit);
688: }
689: }
690:
691: /*
692: * kill off the nonetkproc
693: */
694: cp->state = Cclosed;
695: wakeup(&cp->rq->r);
696: sleep(&cp->r, isdead, cp);
697:
698: /*
699: * close down, synchronizing with interface
700: */
701: qlock(cp);
702: cp->ifc = 0;
703: qunlock(cp);
704: }
705:
706: /*
707: * send all messages up stream. this should only be control messages
708: */
709: static void
710: nonetiput(Queue *q, Block *bp)
711: {
712: Conversation *cp;
713:
714: if(bp->type == M_HANGUP){
715: cp = (Conversation *)q->ptr;
716: cp->state = Chungup;
717: }
718: PUTNEXT(q, bp);
719: }
720:
721: /*
722: * queue a block
723: */
724: static int
725: windowopen(void *a)
726: {
727: Conversation *cp;
728:
729: cp = (Conversation *)a;
730: return (cp->next + 1)&Nmask != cp->first;
731: }
732: static void
733: nonetoput(Queue *q, Block *bp)
734: {
735: Conversation *cp;
736: int next;
737: Msg *mp;
738:
739: cp = (Conversation *)(q->ptr);
740:
741: /*
742: * do all control functions
743: */
744: if(bp->type != M_DATA){
745: if(streamparse("connect", bp))
746: connect(cp, bp);
747: else if(streamparse("listen", bp))
748: listen(cp, bp);
749: else
750: freeb(bp);
751: return;
752: }
753:
754: /*
755: * collect till we see a delim
756: */
757: if(!putb(q, bp))
758: return;
759:
760: /*
761: * block if we don't have any free mid's
762: */
763: while((next = (cp->next + 1)&Nmask) == cp->first)
764: sleep(&cp->r, windowopen, cp);
765:
766: /*
767: * stick the message in a Msg structure
768: */
769: mp = &cp->out[cp->next];
770: mp->time = NOW + MSrexmit;
771: mp->first = q->first;
772: mp->last = q->last;
773: mp->len = q->len;
774: mp->mid ^= Nmsg;
775: mp->acked = 0;
776:
777: /*
778: * init the queue for new messages
779: */
780: q->len = 0;
781: q->first = q->last = 0;
782:
783: /*
784: * send it
785: */
786: cp->next = next;
787: sendmsg(cp, mp);
788: cp->sent++;
789: }
790:
791: /*
792: * wake up every 250 ms to send and ack or resend a message
793: */
794: static int
795: always(void *a)
796: {
797: return 0;
798: }
799: void
800: nonetkproc(void *arg)
801: {
802: Conversation *cp;
803: Interface *ifc;
804: Queue *wq;
805: Msg *mp;
806: int i;
807:
808: cp = (Conversation *)arg;
809:
810: for(;;){
811: /*
812: * die on request
813: */
814: if(cp->state == Cclosed){
815: cp->kstarted = 0;
816: wakeup(&cp->r);
817: return;
818: }
819:
820: /*
821: * retransmit first message
822: */
823: if(cp->first != cp->next){
824: mp = &cp->out[cp->first];
825: if(!mp->acked && NOW >= mp->time){
826: if(cp->retry++ > 400)
827: hangup(cp);
828: else {
829: cp->rexmit++;
830: sendmsg(cp, mp);
831: }
832: }
833: }
834:
835: /*
836: * send any ack whose time is come
837: */
838: while(cp->afirst != cp->anext && NOW >= cp->atime[cp->anext]
839: && cp->rq->next->len < Streamhi)
840: sendctlmsg(cp, 0, 0);
841: tsleep(&cp->rq->r, always, 0, MSrexmit/2);
842: }
843: }
844:
845: /*
846: * nonet directory and subdirectory
847: */
848: enum {
849: Nraddrqid,
850: Nstatsqid,
851: Nchanqid,
852: Ncloneqid,
853: };
854: Dirtab nonetdir[Ndir];
855: Dirtab nosubdir[]={
856: "raddr", Nraddrqid, 0, 0600,
857: "stats", Nstatsqid, 0, 0600,
858: };
859:
860: /*
861: * nonet file system. most of the calls use dev.c to access the nonet
862: * directory and stream.c to access the nonet devices.
863: */
864: void
865: nonetreset(void)
866: {
867: newqinfo(&noetherinfo);
868: }
869:
870: /*
871: * create the nonet directory. the files are `clone' and stream
872: * directories '1' to '32' (or whatever Nconv is in decimal)
873: */
874: void
875: nonetinit(void)
876: {
877: int i;
878:
879: /*
880: * create the directory.
881: */
882: /*
883: * the circuits
884: */
885: for(i = 0; i < Nconv; i++) {
886: sprint(nonetdir[i].name, "%d", i);
887: nonetdir[i].qid = CHDIR|STREAMQID(i, Nchanqid);
888: nonetdir[i].length = 0;
889: nonetdir[i].perm = 0600;
890: }
891:
892: /*
893: * the clone device
894: */
895: strcpy(nonetdir[i].name, "clone");
896: nonetdir[i].qid = Ncloneqid;
897: nonetdir[i].length = 0;
898: nonetdir[i].perm = 0600;
899:
900: }
901:
902: Chan*
903: nonetattach(char *spec)
904: {
905: Interface *ifc;
906: Chan *c;
907:
908: /*
909: * find an interface with the same name
910: */
911: for(ifc = interface; ifc < &interface[Nifc]; ifc++){
912: qlock(ifc);
913: if(strcmp(spec, ifc->name)==0 && ifc->wq) {
914: ifc->ref++;
915: qunlock(ifc);
916: break;
917: }
918: qunlock(ifc);
919: }
920: if(ifc == &interface[Nifc])
921: error(0, Enoifc);
922: c = devattach('n', spec);
923: c->dev = 0;
924: return c;
925: }
926:
927: Chan*
928: nonetclone(Chan *c, Chan *nc)
929: {
930: Interface *ifc;
931:
932: c = devclone(c, nc);
933: ifc = &interface[c->dev];
934: qlock(ifc);
935: ifc->ref++;
936: qunlock(ifc);
937: return c;
938: }
939:
940: int
941: nonetwalk(Chan *c, char *name)
942: {
943: if(c->qid == CHDIR)
944: return devwalk(c, name, nonetdir, Ndir, devgen);
945: else
946: return devwalk(c, name, nosubdir, Nsubdir, streamgen);
947: }
948:
949: void
950: nonetstat(Chan *c, char *dp)
951: {
952: if(c->qid == CHDIR)
953: devstat(c, dp, nonetdir, Ndir, devgen);
954: else
955: devstat(c, dp, nosubdir, Nsubdir, streamgen);
956: }
957:
958: /*
959: * opening a nonet device allocates a Conversation. Opening the `clone'
960: * device is a ``macro'' for finding a free Conversation and opening
961: * it's ctl file.
962: */
963: Chan*
964: nonetopen(Chan *c, int omode)
965: {
966: extern Qinfo nonetinfo;
967: Stream *s;
968: Conversation *cp;
969: Interface *ifc;
970:
971: if(c->qid == Ncloneqid){
972: ifc = &interface[c->dev];
973: for(cp = &ifc->conv[0]; cp < &ifc->conv[Nconv]; cp++){
974: if(cp->state == Cclosed && canqlock(cp)){
975: if(cp->state != Cclosed){
976: qunlock(cp);
977: continue;
978: }
979: c->qid = CHDIR|STREAMQID(cp-ifc->conv, Nchanqid);
980: devwalk(c, "ctl", 0, 0, streamgen);
981: streamopen(c, &nonetinfo);
982: qunlock(cp);
983: break;
984: }
985: }
986: if(cp == &ifc->conv[Nconv])
987: error(0, Enodev);
988: } else if(c->qid != CHDIR)
989: streamopen(c, &nonetinfo);
990:
991: c->mode = openmode(omode);
992: c->flag |= COPEN;
993: c->offset = 0;
994: return c;
995: }
996:
997: void
998: nonetcreate(Chan *c, char *name, int omode, ulong perm)
999: {
1000: error(0, Eperm);
1001: }
1002:
1003: void
1004: nonetclose(Chan *c)
1005: {
1006: Interface *ifc;
1007:
1008: /* real closing happens in lancestclose */
1009: if(c->qid != CHDIR)
1010: streamclose(c);
1011:
1012: ifc = &interface[c->dev];
1013: qlock(ifc);
1014: ifc->ref--;
1015: qunlock(ifc);
1016: }
1017:
1018: long
1019: nonetread(Chan *c, void *a, long n)
1020: {
1021: int t;
1022: Conversation *cp;
1023: char stats[256];
1024:
1025: t = STREAMTYPE(c->qid);
1026: if(t >= Slowqid)
1027: return streamread(c, a, n);
1028:
1029: if(c->qid == CHDIR)
1030: return devdirread(c, a, n, nonetdir, Ndir, devgen);
1031: if(c->qid & CHDIR)
1032: return devdirread(c, a, n, nosubdir, Nsubdir, streamgen);
1033:
1034: cp = &interface[c->dev].conv[STREAMID(c->qid)];
1035: switch(t){
1036: case Nraddrqid:
1037: return stringread(c, a, n, cp->raddr);
1038: case Nstatsqid:
1039: sprint(stats, "sent: %d\nrcved: %d\nrexmit: %d\nbad: %d\n",
1040: cp->sent, cp->rcvd, cp->rexmit, cp->bad);
1041: return stringread(c, a, n, stats);
1042: }
1043: error(0, Eperm);
1044: }
1045:
1046: long
1047: nonetwrite(Chan *c, void *a, long n)
1048: {
1049: int t;
1050:
1051: t = STREAMTYPE(c->qid);
1052: if(t >= Slowqid)
1053: return streamwrite(c, a, n, 0);
1054:
1055: error(0, Eperm);
1056: }
1057:
1058: void
1059: nonetremove(Chan *c)
1060: {
1061: error(0, Eperm);
1062: }
1063:
1064: void
1065: nonetwstat(Chan *c, char *dp)
1066: {
1067: error(0, Eperm);
1068: }
1069:
1070: void
1071: noneterrstr(Error *e, char *buf)
1072: {
1073: rooterrstr(e, buf);
1074: }
1075:
1076: void
1077: nonetuserstr(Error *e, char *buf)
1078: {
1079: extern consuserstr(Error *, char *);
1080:
1081: consuserstr(e, buf);
1082: }
1083:
1084: /*
1085: * interface
1086: */
1087: /*
1088: * Create an interface. The qlock on ifc prevents a circuit
1089: * from connecting to (nonetconnect) or outputting on (nonetoput)
1090: * the interface while it is being created.
1091: */
1092: Interface *
1093: newifc(Queue *q, Stream *s, int maxtu, int mintu, int hsize,
1094: void (*connect)(Conversation *, char *))
1095: {
1096: Interface *ifc;
1097:
1098: for(ifc = interface; ifc < &interface[Nifc]; ifc++){
1099: if(ifc->wq == 0){
1100: qlock(ifc);
1101: if(ifc->wq) {
1102: /* someone was faster than us */
1103: qunlock(ifc);
1104: continue;
1105: }
1106: RD(q)->ptr = WR(q)->ptr = (void *)ifc;
1107: ifc->maxtu = maxtu - hsize - HDRSIZE;
1108: ifc->mintu = mintu - hsize - HDRSIZE;
1109: ifc->hsize = hsize;
1110: ifc->connect = connect;
1111: ifc->wq = WR(q);
1112: ifc->name[0] = 0;
1113: qunlock(ifc);
1114: return ifc;
1115: }
1116: }
1117: error(0, Enoifc);
1118: }
1119:
1120: /*
1121: * Free an interface.
1122: */
1123: void
1124: freeifc(Interface *ifc)
1125: {
1126: qlock(ifc);
1127: if(ifc->ref){
1128: qunlock(ifc);
1129: print("freeifc in use\n");
1130: error(0, Einuse);
1131: }
1132: ifc->wq = 0;
1133: qunlock(ifc);
1134: }
1135: /*
1136: * ethernet specific multiplexor
1137: */
1138: /*
1139: * ethernet header of a packet
1140: */
1141: struct Etherhdr {
1142: uchar d[6];
1143: uchar s[6];
1144: uchar type[2];
1145: uchar circuit[3]; /* circuit number */
1146: uchar flag;
1147: uchar mid; /* message id */
1148: uchar ack; /* piggy back ack */
1149: uchar remain[2]; /* count of remaing bytes of data */
1150: uchar sum[2]; /* checksum (0 means none) */
1151: };
1152: #define EHDRSIZE 24
1153: #define EMAXBODY (1514-HDRSIZE) /* maximum ethernet packet body */
1154: #define ETHER_TYPE 0x900 /* most significant byte last */
1155:
1156: /*
1157: * the ethernet multiplexor stream module definition
1158: */
1159: static void noetheropen(Queue *, Stream *);
1160: static void noetherclose(Queue *);
1161: static void noetheroput(Queue *, Block *);
1162: static void noetheriput(Queue *, Block *);
1163: Qinfo noetherinfo = { noetheriput, nullput, noetheropen, noetherclose, "noether" };
1164:
1165: /*
1166: * parse an ethernet address (assumed to be 12 ascii hex digits)
1167: */
1168: void
1169: etherparse(uchar *to, char *from)
1170: {
1171: int tdig;
1172: int fdig;
1173: int i;
1174:
1175: if(strlen(from) != 12)
1176: error(0, Ebadnet);
1177:
1178: for(i = 0; i < 6; i++){
1179: fdig = *from++;
1180: tdig = fdig > 'a' ? (fdig - 'a' + 10)
1181: : (fdig > 'A' ? (fdig - 'A' + 10) : (fdig - '0'));
1182: fdig = *from++;
1183: tdig <<= 4;
1184: tdig |= fdig > 'a' ? (fdig - 'a' + 10)
1185: : (fdig > 'A' ? (fdig - 'A' + 10) : (fdig - '0'));
1186: *to++ = tdig;
1187: }
1188: }
1189:
1190: /*
1191: * perfrorm the ether specific part of nonetconnect. just stick
1192: * the address into the prototype header.
1193: */
1194: void
1195: noetherconnect(Conversation *cp, char *ea)
1196: {
1197: Etherhdr *eh;
1198:
1199: /*
1200: * special hack for ross
1201: */
1202: if(strcmp(ea, "020701005eff")==0)
1203: cp->hdr->flag &= ~ACKME;
1204:
1205: eh = (Etherhdr *)cp->media->rptr;
1206: etherparse(eh->d, ea);
1207: eh->type[0] = ETHER_TYPE>>8;
1208: eh->type[1] = ETHER_TYPE;
1209: }
1210:
1211: /*
1212: * set up an ether interface
1213: */
1214: static void
1215: noetheropen(Queue *q, Stream *s)
1216: {
1217: newifc(q, s, 1514, 60, 14, noetherconnect);
1218: }
1219:
1220: /*
1221: * tear down an ether interface
1222: */
1223: static void
1224: noetherclose(Queue *q)
1225: {
1226: Interface *ifc;
1227:
1228: ifc = (Interface *)(q->ptr);
1229: freeifc(ifc);
1230: }
1231:
1232: /*
1233: * Input a packet and use the ether address to select the correct
1234: * nonet device to pass it to.
1235: *
1236: * Simplifying assumption: one put == one packet && the complete header
1237: * is in the first block. If this isn't true, demultiplexing will not work.
1238: */
1239: static void
1240: noetheriput(Queue *q, Block *bp)
1241: {
1242: Interface *ifc;
1243: int circuit;
1244: Conversation *cp;
1245: Etherhdr *h;
1246: Etherhdr *ph;
1247: ulong s;
1248: Block *end;
1249:
1250: if(bp->type != M_DATA){
1251: PUTNEXT(q, bp);
1252: return;
1253: }
1254:
1255: ifc = (Interface *)(q->ptr);
1256: h = (Etherhdr *)(bp->rptr);
1257: circuit = (h->circuit[2]<<16) | (h->circuit[1]<<8) | h->circuit[0];
1258: s = (h->sum[1]<<8) | h->sum[0];
1259: if(s && s != cksum(bp, 14)){
1260: print("checksum error %ux %ux\n", s, (h->sum[1]<<8) | h->sum[0]);
1261: freeb(bp);
1262: return;
1263: }
1264:
1265: /*
1266: * look for an existing circuit.
1267: */
1268: for(cp = &ifc->conv[0]; cp < &ifc->conv[Nconv]; cp++){
1269: if(cp->state > Clistening && circuit == cp->rcvcircuit && canqlock(cp)){
1270: ph = (Etherhdr *)(cp->media->rptr);
1271: if(ifc == cp->ifc
1272: && circuit == cp->rcvcircuit
1273: && cp->state > Clistening
1274: && memcmp(ph->d, h->s, sizeof(h->s)) == 0){
1275: cp->hdr->flag &= ~NEWCALL;
1276: bp->rptr += ifc->hsize;
1277: rcvmsg(cp, bp);
1278: qunlock(cp);
1279: return;
1280: }
1281: qunlock(cp);
1282: }
1283: }
1284:
1285: /*
1286: * ... or one just listening
1287: */
1288: if((h->flag & NEWCALL) == 0) {
1289: freeb(bp);
1290: return;
1291: }
1292: for(cp = &ifc->conv[0]; cp < &ifc->conv[Nconv]; cp++){
1293: if(cp->state == Clistening && canqlock(cp)) {
1294: /*
1295: * initialize the conversation
1296: */
1297: startconv(cp, circuit^1, Cconnecting);
1298: wakeup(&cp->r);
1299: sprint(cp->raddr, "%.2ux%.2ux%.2ux%.2ux%.2ux%.2ux", h->s[0],
1300: h->s[1], h->s[2], h->s[3], h->s[4], h->s[5]);
1301:
1302: /*
1303: * fill in media dependent prototype header
1304: */
1305: ph = (Etherhdr *)(cp->media->rptr);
1306: memcpy(ph->d, h->s, sizeof(h->s));
1307: ph->type[0] = ETHER_TYPE>>8;
1308: ph->type[1] = ETHER_TYPE;
1309:
1310: /*
1311: * pass on the packet
1312: */
1313: bp->rptr += ifc->hsize;
1314: rcvmsg(cp, bp);
1315: qunlock(cp);
1316: return;
1317: }
1318: }
1319:
1320: /*
1321: * not found
1322: */
1323: freeb(bp);
1324: }
1325:
1326: /*
1327: * calculate the checksum of a list of blocks. ignore the first `offset' bytes.
1328: */
1329: int
1330: cksum(Block *bp, int offset)
1331: {
1332: Block *nbp = bp;
1333: uchar *ep, *p;
1334: int n;
1335: ulong s;
1336: Hdr *hp;
1337:
1338: s = 0;
1339: p = bp->rptr + offset;
1340: n = bp->wptr - p;
1341: hp = (Hdr *)p;
1342: hp->sum[0] = hp->sum[1] = 0;
1343: for(;;){
1344: ep = p+(n&~0x7);
1345: while(p < ep) {
1346: s = s + s + p[0];
1347: s = s + s + p[1];
1348: s = s + s + p[2];
1349: s = s + s + p[3];
1350: s = s + s + p[4];
1351: s = s + s + p[5];
1352: s = s + s + p[6];
1353: s = s + s + p[7];
1354: s = (s&0xffff) + (s>>16);
1355: p += 8;
1356: }
1357: ep = p+(n&0x7);
1358: while(p < ep) {
1359: s = s + s + *p;
1360: p++;
1361: }
1362: s = (s&0xffff) + (s>>16);
1363: bp = bp->next;
1364: if(bp == 0)
1365: break;
1366: p = bp->rptr;
1367: n = BLEN(bp);
1368: }
1369: s = (s&0xffff) + (s>>16);
1370: hp->sum[1] = s>>8;
1371: hp->sum[0] = s;
1372: return s & 0xffff;
1373: }
This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.