|
|
1.1 root 1: #include "u.h"
2: #include "../port/lib.h"
3: #include "mem.h"
4: #include "dat.h"
5: #include "fns.h"
6: #include "io.h"
7: #include "../port/error.h"
8:
9: enum {
10: MSrexmit= 1000,
11: Nmask= 0x7,
12: };
13:
14: #define DPRINT if(q->flag&QDEBUG)kprint
15:
16: typedef struct Urp Urp;
17:
18: #define NOW (MACHP(0)->ticks*MS2HZ)
19:
20: /*
21: * URP status
22: */
23: struct urpstat {
24: ulong input; /* bytes read from urp */
25: ulong output; /* bytes output to urp */
26: ulong rexmit; /* retransmit rejected urp msg */
27: ulong rjtrs; /* reject, trailer size */
28: ulong rjpks; /* reject, packet size */
29: ulong rjseq; /* reject, sequence number */
30: ulong levelb; /* unknown level b */
31: ulong enqsx; /* enqs sent */
32: ulong enqsr; /* enqs rcved */
33: } urpstat;
34:
35: struct Urp {
36: QLock;
37: Urp *list; /* list of all urp structures */
38: short state; /* flags */
39: Rendez r; /* process waiting for output to finish */
40:
41: /* input */
42: QLock ack; /* ack lock */
43: Queue *rq; /* input queue */
44: uchar iseq; /* last good input sequence number */
45: uchar lastecho; /* last echo/rej sent */
46: uchar trbuf[3]; /* trailer being collected */
47: short trx; /* # bytes in trailer being collected */
48: int blocks;
49:
50: /* output */
51: QLock xmit; /* output lock, only one process at a time */
52: Queue *wq; /* output queue */
53: int maxout; /* maximum outstanding unacked blocks */
54: int maxblock; /* max block size */
55: int next; /* next block to send */
56: int unechoed; /* first unechoed block */
57: int unacked; /* first unacked block */
58: int nxb; /* next xb to use */
59: Block *xb[8]; /* the xmit window buffer */
60: QLock xl[8];
61: ulong timer; /* timeout for xmit */
62: int rexmit;
63: };
64:
65: /* list of allocated urp structures (never freed) */
66: struct
67: {
68: Lock;
69: Urp *urp;
70: } urpalloc;
71:
72: Rendez urpkr;
73: QLock urpkl;
74: int urpkstarted;
75:
76: #define WINDOW(u) ((u)->unechoed>(u)->next ? (u)->unechoed+(u)->maxout-(u)->next-8 :\
77: (u)->unechoed+(u)->maxout-(u)->next)
78: #define IN(x, f, n) (f<=n ? (x>=f && x<n) : (x<n || x>=f))
79: #define NEXT(x) (((x)+1)&Nmask)
80:
81: /*
82: * Protocol control bytes
83: */
84: #define SEQ 0010 /* sequence number, ends trailers */
85: #undef ECHO
86: #define ECHO 0020 /* echos, data given to next queue */
87: #define REJ 0030 /* rejections, transmission error */
88: #define ACK 0040 /* acknowledgments */
89: #define BOT 0050 /* beginning of trailer */
90: #define BOTM 0051 /* beginning of trailer, more data follows */
91: #define BOTS 0052 /* seq update algorithm on this trailer */
92: #define SOU 0053 /* start of unsequenced trailer */
93: #define EOU 0054 /* end of unsequenced trailer */
94: #define ENQ 0055 /* xmitter requests flow/error status */
95: #define CHECK 0056 /* xmitter requests error status */
96: #define INITREQ 0057 /* request initialization */
97: #define INIT0 0060 /* disable trailer processing */
98: #define INIT1 0061 /* enable trailer procesing */
99: #define AINIT 0062 /* response to INIT0/INIT1 */
100: #undef DELAY
101: #define DELAY 0100 /* real-time printing delay */
102: #define BREAK 0110 /* Send/receive break (new style) */
103:
104: #define REJECTING 0x1
105: #define INITING 0x2
106: #define HUNGUP 0x4
107: #define OPEN 0x8
108: #define CLOSING 0x10
109:
110: /*
111: * predeclared
112: */
113: static void urpreset(void);
114: static void urpciput(Queue*, Block*);
115: static void urpiput(Queue*, Block*);
116: static void urpoput(Queue*, Block*);
117: static void urpopen(Queue*, Stream*);
118: static void urpclose(Queue *);
119: static void output(Urp*);
120: static void sendblock(Urp*, int);
121: static void rcvack(Urp*, int);
122: static void flushinput(Urp*);
123: static void sendctl(Urp*, int);
124: static void sendack(Urp*);
125: static void sendrej(Urp*);
126: static void initoutput(Urp*, int);
127: static void initinput(Urp*);
128: static void urpkproc(void *arg);
129: static void urpvomit(char*, Urp*);
130: static void tryoutput(Urp*);
131:
132: Qinfo urpinfo =
133: {
134: urpciput,
135: urpoput,
136: urpopen,
137: urpclose,
138: "urp",
139: urpreset
140: };
141:
142: void
143: sturplink(void)
144: {
145: newqinfo(&urpinfo);
146: }
147:
148: static void
149: urpreset(void)
150: {
151: }
152:
153: static void
154: urpopen(Queue *q, Stream *s)
155: {
156: Urp *up;
157:
158: USED(s);
159: if(!urpkstarted){
160: qlock(&urpkl);
161: if(!urpkstarted){
162: urpkstarted = 1;
163: kproc("urpkproc", urpkproc, 0);
164: }
165: qunlock(&urpkl);
166: }
167:
168: /*
169: * find an unused urp structure
170: */
171: for(up = urpalloc.urp; up; up = up->list){
172: if(up->state == 0){
173: qlock(up);
174: if(up->state == 0)
175: break;
176: qunlock(up);
177: }
178: }
179: if(up == 0){
180: /*
181: * none available, create a new one, they are never freed
182: */
183: up = smalloc(sizeof(Urp));
184: qlock(up);
185: lock(&urpalloc);
186: up->list = urpalloc.urp;
187: urpalloc.urp = up;
188: unlock(&urpalloc);
189: }
190: q->ptr = q->other->ptr = up;
191: q->rp = &urpkr;
192: up->rq = q;
193: up->wq = q->other;
194: up->state = OPEN;
195: qunlock(up);
196: initinput(up);
197: initoutput(up, 0);
198: }
199:
200: /*
201: * Shut down the connection and kill off the kernel process
202: */
203: static int
204: isflushed(void *a)
205: {
206: Urp *up;
207:
208: up = (Urp *)a;
209: return (up->state&HUNGUP) || (up->unechoed==up->nxb && up->wq->len==0);
210: }
211: static void
212: urpclose(Queue *q)
213: {
214: Urp *up;
215: int i;
216:
217: up = (Urp *)q->ptr;
218: if(up == 0)
219: return;
220:
221: /*
222: * wait for all outstanding messages to drain, tell kernel
223: * process we're closing.
224: *
225: * if 2 minutes elapse, give it up
226: */
227: up->state |= CLOSING;
228: if(!waserror()){
229: tsleep(&up->r, isflushed, up, 2*60*1000);
230: poperror();
231: }
232: up->state |= HUNGUP;
233:
234: qlock(&up->xmit);
235: /*
236: * ack all outstanding messages
237: */
238: i = up->next - 1;
239: if(i < 0)
240: i = 7;
241: rcvack(up, ECHO+i);
242:
243: /*
244: * free all staged but unsent messages
245: */
246: for(i = 0; i < 7; i++){
247: qlock(&up->xl[i]);
248: if(up->xb[i]){
249: freeb(up->xb[i]);
250: up->xb[i] = 0;
251: }
252: qunlock(&up->xl[i]);
253: }
254: qunlock(&up->xmit);
255:
256: qlock(up);
257: up->state = 0;
258: qunlock(up);
259: }
260:
261: /*
262: * upstream control messages
263: */
264: static void
265: urpctliput(Urp *up, Queue *q, Block *bp)
266: {
267: switch(bp->type){
268: case M_HANGUP:
269: up->state |= HUNGUP;
270: wakeup(&up->r);
271: break;
272: }
273: PUTNEXT(q, bp);
274: }
275:
276: /*
277: * character mode input.
278: *
279: * the first byte in every message is a ctl byte (which belongs at the end).
280: */
281: void
282: urpciput(Queue *q, Block *bp)
283: {
284: Urp *up;
285: int i;
286: int ctl;
287:
288: up = (Urp *)q->ptr;
289: if(up == 0)
290: return;
291: if(bp->type != M_DATA){
292: urpctliput(up, q, bp);
293: return;
294: }
295:
296: /*
297: * get the control character
298: */
299: ctl = *bp->rptr++;
300: if(ctl < 0)
301: return;
302:
303: /*
304: * take care of any data
305: */
306: if(BLEN(bp)>0 && q->next->len<2*Streamhi && q->next->nb<2*Streambhi){
307: bp->flags |= S_DELIM;
308: urpstat.input += BLEN(bp);
309: PUTNEXT(q, bp);
310: } else
311: freeb(bp);
312:
313: /*
314: * handle the control character
315: */
316: switch(ctl){
317: case 0:
318: break;
319: case ENQ:
320: DPRINT("rENQ(c)\n");
321: urpstat.enqsr++;
322: sendctl(up, up->lastecho);
323: sendctl(up, ACK+up->iseq);
324: break;
325:
326: case CHECK:
327: DPRINT("rCHECK(c)\n");
328: sendctl(up, ACK+up->iseq);
329: break;
330:
331: case AINIT:
332: DPRINT("rAINIT(c)\n");
333: up->state &= ~INITING;
334: flushinput(up);
335: tryoutput(up);
336: break;
337:
338: case INIT0:
339: case INIT1:
340: DPRINT("rINIT%d(c)\n", ctl-INIT0);
341: sendctl(up, AINIT);
342: if(ctl == INIT1)
343: q->put = urpiput;
344: initinput(up);
345: break;
346:
347: case INITREQ:
348: DPRINT("rINITREQ(c)\n");
349: initoutput(up, 0);
350: break;
351:
352: case BREAK:
353: break;
354:
355: case REJ+0: case REJ+1: case REJ+2: case REJ+3:
356: case REJ+4: case REJ+5: case REJ+6: case REJ+7:
357: DPRINT("rREJ%d(c)\n", ctl-REJ);
358: rcvack(up, ctl);
359: break;
360:
361: case ACK+0: case ACK+1: case ACK+2: case ACK+3:
362: case ACK+4: case ACK+5: case ACK+6: case ACK+7:
363: case ECHO+0: case ECHO+1: case ECHO+2: case ECHO+3:
364: case ECHO+4: case ECHO+5: case ECHO+6: case ECHO+7:
365: DPRINT("%s%d(c)\n", (ctl&ECHO)?"rECHO":"rACK", ctl&7);
366: rcvack(up, ctl);
367: break;
368:
369: case SEQ+0: case SEQ+1: case SEQ+2: case SEQ+3:
370: case SEQ+4: case SEQ+5: case SEQ+6: case SEQ+7:
371: DPRINT("rSEQ%d(c)\n", ctl-SEQ);
372: qlock(&up->ack);
373: i = ctl & Nmask;
374: if(!QFULL(q->next))
375: sendctl(up, up->lastecho = ECHO+i);
376: up->iseq = i;
377: qunlock(&up->ack);
378: break;
379: }
380: }
381:
382: /*
383: * block mode input.
384: *
385: * the first byte in every message is a ctl byte (which belongs at the end).
386: *
387: * Simplifying assumption: one put == one message && the control byte
388: * is in the first block. If this isn't true, strange bytes will be
389: * used as control bytes.
390: *
391: * There's no input lock. The channel could be closed while we're
392: * processing a message.
393: */
394: void
395: urpiput(Queue *q, Block *bp)
396: {
397: Urp *up;
398: int i, len;
399: int ctl;
400:
401: up = (Urp *)q->ptr;
402: if(up == 0)
403: return;
404: if(bp->type != M_DATA){
405: urpctliput(up, q, bp);
406: return;
407: }
408:
409: /*
410: * get the control character
411: */
412: ctl = *bp->rptr++;
413:
414: /*
415: * take care of any block count(trx)
416: */
417: while(up->trx){
418: if(BLEN(bp)<=0)
419: break;
420: switch (up->trx) {
421: case 1:
422: case 2:
423: up->trbuf[up->trx++] = *bp->rptr++;
424: continue;
425: default:
426: up->trx = 0;
427: break;
428: }
429: }
430:
431: /*
432: * queue the block(s)
433: */
434: if(BLEN(bp) > 0){
435: bp->flags &= ~S_DELIM;
436: putq(q, bp);
437: if(q->len > 4*1024){
438: flushinput(up);
439: return;
440: }
441: } else
442: freeb(bp);
443:
444: /*
445: * handle the control character
446: */
447: switch(ctl){
448: case 0:
449: break;
450: case ENQ:
451: DPRINT("rENQ %d %uo %uo\n", up->blocks, up->lastecho, ACK+up->iseq);
452: up->blocks = 0;
453: urpstat.enqsr++;
454: sendctl(up, up->lastecho);
455: sendctl(up, ACK+up->iseq);
456: flushinput(up);
457: break;
458:
459: case CHECK:
460: DPRINT("rCHECK\n");
461: sendctl(up, ACK+up->iseq);
462: break;
463:
464: case AINIT:
465: DPRINT("rAINIT\n");
466: up->state &= ~INITING;
467: flushinput(up);
468: tryoutput(up);
469: break;
470:
471: case INIT0:
472: case INIT1:
473: DPRINT("rINIT%d\n", ctl-INIT0);
474: sendctl(up, AINIT);
475: if(ctl == INIT0)
476: q->put = urpciput;
477: initinput(up);
478: break;
479:
480: case INITREQ:
481: DPRINT("rINITREQ\n");
482: initoutput(up, 0);
483: break;
484:
485: case BREAK:
486: break;
487:
488: case BOT:
489: case BOTM:
490: case BOTS:
491: DPRINT("rBOT%c...", " MS"[ctl-BOT]);
492: up->trx = 1;
493: up->trbuf[0] = ctl;
494: break;
495:
496: case REJ+0: case REJ+1: case REJ+2: case REJ+3:
497: case REJ+4: case REJ+5: case REJ+6: case REJ+7:
498: DPRINT("rREJ%d\n", ctl-REJ);
499: rcvack(up, ctl);
500: break;
501:
502: case ACK+0: case ACK+1: case ACK+2: case ACK+3:
503: case ACK+4: case ACK+5: case ACK+6: case ACK+7:
504: case ECHO+0: case ECHO+1: case ECHO+2: case ECHO+3:
505: case ECHO+4: case ECHO+5: case ECHO+6: case ECHO+7:
506: DPRINT("%s%d\n", (ctl&ECHO)?"rECHO":"rACK", ctl&7);
507: rcvack(up, ctl);
508: break;
509:
510: /*
511: * if the sequence number is the next expected
512: * and the trailer length == 3
513: * and the block count matches the bytes received
514: * then send the bytes upstream.
515: */
516: case SEQ+0: case SEQ+1: case SEQ+2: case SEQ+3:
517: case SEQ+4: case SEQ+5: case SEQ+6: case SEQ+7:
518: len = up->trbuf[1] + (up->trbuf[2]<<8);
519: DPRINT("rSEQ%d(%d,%d,%d)...", ctl-SEQ, up->trx, len, q->len);
520: i = ctl & Nmask;
521: if(up->trx != 3){
522: urpstat.rjtrs++;
523: sendrej(up);
524: break;
525: }else if(q->len != len){
526: urpstat.rjpks++;
527: sendrej(up);
528: break;
529: }else if(i != ((up->iseq+1)&Nmask)){
530: urpstat.rjseq++;
531: sendrej(up);
532: break;
533: }else if(q->next->len > (3*Streamhi)/2
534: || q->next->nb > (3*Streambhi)/2){
535: DPRINT("next->len=%d, next->nb=%d\n",
536: q->next->len, q->next->nb);
537: flushinput(up);
538: break;
539: }
540: DPRINT("accept %d\n", q->len);
541:
542: /*
543: * send data upstream
544: */
545: if(q->first) {
546: if(up->trbuf[0] != BOTM)
547: q->last->flags |= S_DELIM;
548: while(bp = getq(q)){
549: urpstat.input += BLEN(bp);
550: PUTNEXT(q, bp);
551: }
552: } else {
553: bp = allocb(0);
554: if(up->trbuf[0] != BOTM)
555: bp->flags |= S_DELIM;
556: PUTNEXT(q, bp);
557: }
558: up->trx = 0;
559:
560: /*
561: * acknowledge receipt
562: */
563: qlock(&up->ack);
564: up->iseq = i;
565: if(!QFULL(q->next))
566: sendctl(up, up->lastecho = ECHO|i);
567: qunlock(&up->ack);
568: break;
569: }
570: }
571:
572: /*
573: * downstream control
574: */
575: Queue *trapq;
576: static void
577: urpctloput(Urp *up, Queue *q, Block *bp)
578: {
579: char *fields[2];
580: int outwin;
581:
582: switch(bp->type){
583: case M_CTL:
584: if(streamparse("break", bp)){
585: /*
586: * send a break as part of the data stream
587: */
588: urpstat.output++;
589: bp->wptr = bp->lim;
590: bp->rptr = bp->wptr - 1;
591: *bp->rptr = BREAK;
592: putq(q, bp);
593: output(up);
594: return;
595: }
596: if(streamparse("init", bp)){
597: outwin = strtoul((char*)bp->rptr, 0, 0);
598: initoutput(up, outwin);
599: freeb(bp);
600: return;
601: }
602: if(streamparse("debug", bp)){
603: switch(getfields((char *)bp->rptr, fields, 2, " ")){
604: case 1:
605: if (strcmp(fields[0], "on") == 0) {
606: q->flag |= QDEBUG;
607: q->other->flag |= QDEBUG;
608: }
609: if (strcmp(fields[0], "off") == 0) {
610: q->flag &= ~QDEBUG;
611: q->other->flag &= ~QDEBUG;
612: }
613: }
614: freeb(bp);
615: return;
616: }
617: if(streamparse("trap", bp)){
618: trapq = q;
619: return;
620: }
621: }
622: PUTNEXT(q, bp);
623: }
624:
625: /*
626: * accept data from a writer
627: */
628: static void
629: urpoput(Queue *q, Block *bp)
630: {
631: Urp *up;
632:
633: up = (Urp *)q->ptr;
634:
635: if(bp->type != M_DATA){
636: urpctloput(up, q, bp);
637: return;
638: }
639:
640: urpstat.output += BLEN(bp);
641: putq(q, bp);
642: output(up);
643: }
644:
645: /*
646: * start output
647: */
648: static void
649: output(Urp *up)
650: {
651: Block *bp, *nbp;
652: ulong now;
653: Queue *q;
654: int i;
655:
656: if(!canqlock(&up->xmit))
657: return;
658:
659: if(waserror()){
660: print("urp output error\n");
661: qunlock(&up->xmit);
662: nexterror();
663: }
664:
665: /*
666: * if still initing and it's time to rexmit, send an INIT1
667: */
668: now = NOW;
669: if(up->state & INITING){
670: if(now > up->timer){
671: q = up->wq;
672: DPRINT("INITING timer (%d, %d): ", now, up->timer);
673: sendctl(up, INIT1);
674: up->timer = now + MSrexmit;
675: }
676: goto out;
677: }
678:
679: /*
680: * fill the transmit buffers, `nxb' can never overtake `unechoed'
681: */
682: q = up->wq;
683: i = NEXT(up->nxb);
684: if(i != up->unechoed) {
685: for(bp = getq(q); bp && i!=up->unechoed; i = NEXT(i)){
686: if(up->xb[up->nxb] != 0)
687: urpvomit("output", up);
688: if(BLEN(bp) > up->maxblock){
689: nbp = up->xb[up->nxb] = allocb(0);
690: nbp->rptr = bp->rptr;
691: nbp->wptr = bp->rptr = bp->rptr + up->maxblock;
692: } else {
693: up->xb[up->nxb] = bp;
694: bp = getq(q);
695: }
696: up->nxb = i;
697: }
698: if(bp)
699: putbq(q, bp);
700: }
701:
702: /*
703: * retransmit cruft
704: */
705: if(up->rexmit){
706: /*
707: * if a retransmit is requested, move next back to
708: * the unacked blocks
709: */
710: urpstat.rexmit++;
711: up->rexmit = 0;
712: up->next = up->unacked;
713: } else if(up->unechoed!=up->next && NOW>up->timer){
714: /*
715: * if a retransmit time has elapsed since a transmit,
716: * send an ENQ
717: */
718: DPRINT("OUTPUT timer (%d, %d): ", NOW, up->timer);
719: up->timer = NOW + MSrexmit;
720: up->state &= ~REJECTING;
721: urpstat.enqsx++;
722: sendctl(up, ENQ);
723: goto out;
724: }
725:
726: /*
727: * if there's a window open, push some blocks out
728: *
729: * the lock is to synchronize with acknowledges that free
730: * blocks.
731: */
732: while(WINDOW(up)>0 && up->next!=up->nxb){
733: i = up->next;
734: qlock(&up->xl[i]);
735: if(waserror()){
736: qunlock(&up->xl[i]);
737: nexterror();
738: }
739: sendblock(up, i);
740: qunlock(&up->xl[i]);
741: up->next = NEXT(up->next);
742: poperror();
743: }
744: out:
745: qunlock(&up->xmit);
746: poperror();
747: }
748:
749: /*
750: * try output, this is called by an input process
751: */
752: void
753: tryoutput(Urp *up)
754: {
755: if(!waserror()){
756: output(up);
757: poperror();
758: }
759: }
760:
761: /*
762: * send a control byte, put the byte at the end of the allocated
763: * space in case a lower layer needs header room.
764: */
765: static void
766: sendctl(Urp *up, int ctl)
767: {
768: Block *bp;
769: Queue *q;
770:
771: q = up->wq;
772: if(QFULL(q->next))
773: return;
774: bp = allocb(1);
775: bp->wptr = bp->lim;
776: bp->rptr = bp->lim-1;
777: *bp->rptr = ctl;
778: bp->flags |= S_DELIM;
779: DPRINT("sCTL %ulx\n", ctl);
780: PUTNEXT(q, bp);
781: }
782:
783: /*
784: * send a reject
785: */
786: static void
787: sendrej(Urp *up)
788: {
789: Queue *q = up->wq;
790: flushinput(up);
791: qlock(&up->ack);
792: if((up->lastecho&~Nmask) == ECHO){
793: DPRINT("REJ %d\n", up->iseq);
794: sendctl(up, up->lastecho = REJ|up->iseq);
795: }
796: qunlock(&up->ack);
797: }
798:
799: /*
800: * send an acknowledge
801: */
802: static void
803: sendack(Urp *up)
804: {
805: /*
806: * check the precondition for acking
807: */
808: if(QFULL(up->rq->next) || (up->lastecho&Nmask)==up->iseq)
809: return;
810:
811: if(!canqlock(&up->ack))
812: return;
813:
814: /*
815: * check again now that we've locked
816: */
817: if(QFULL(up->rq->next) || (up->lastecho&Nmask)==up->iseq){
818: qunlock(&up->ack);
819: return;
820: }
821:
822: /*
823: * send the ack
824: */
825: { Queue *q = up->wq; DPRINT("sendack: "); }
826: sendctl(up, up->lastecho = ECHO|up->iseq);
827: qunlock(&up->ack);
828: }
829:
830: /*
831: * send a block.
832: */
833: static void
834: sendblock(Urp *up, int bn)
835: {
836: Block *bp, *m, *nbp;
837: int n;
838: Queue *q;
839:
840: q = up->wq;
841: up->timer = NOW + MSrexmit;
842: if(QFULL(q->next))
843: return;
844:
845: /*
846: * message 1, the BOT and the data
847: */
848: bp = up->xb[bn];
849: if(bp == 0)
850: return;
851: m = allocb(1);
852: m->rptr = m->lim - 1;
853: m->wptr = m->lim;
854: *m->rptr = (bp->flags & S_DELIM) ? BOT : BOTM;
855: nbp = allocb(0);
856: nbp->rptr = bp->rptr;
857: nbp->wptr = bp->wptr;
858: nbp->base = bp->base;
859: nbp->lim = bp->lim;
860: nbp->flags |= S_DELIM;
861: if(bp->type == M_CTL){
862: PUTNEXT(q, nbp);
863: m->flags |= S_DELIM;
864: PUTNEXT(q, m);
865: } else {
866: m->next = nbp;
867: PUTNEXT(q, m);
868: }
869:
870: /*
871: * message 2, the block length and the SEQ
872: */
873: m = allocb(3);
874: m->rptr = m->lim - 3;
875: m->wptr = m->lim;
876: n = BLEN(bp);
877: m->rptr[0] = SEQ | bn;
878: m->rptr[1] = n;
879: m->rptr[2] = n>>8;
880: m->flags |= S_DELIM;
881: PUTNEXT(q, m);
882: DPRINT("sb %d (%d)\n", bn, up->timer);
883: }
884:
885: /*
886: * receive an acknowledgement
887: */
888: static void
889: rcvack(Urp *up, int msg)
890: {
891: int seqno;
892: int next;
893: int i;
894:
895: seqno = msg&Nmask;
896: next = NEXT(seqno);
897:
898: /*
899: * release any acknowledged blocks
900: */
901: if(IN(seqno, up->unacked, up->next)){
902: for(; up->unacked != next; up->unacked = NEXT(up->unacked)){
903: i = up->unacked;
904: qlock(&up->xl[i]);
905: if(up->xb[i])
906: freeb(up->xb[i]);
907: up->xb[i] = 0;
908: qunlock(&up->xl[i]);
909: }
910: }
911:
912: switch(msg & 0370){
913: case ECHO:
914: if(IN(seqno, up->unechoed, up->next)) {
915: up->unechoed = next;
916: }
917: /*
918: * the next reject at the start of a window starts a
919: * retransmission.
920: */
921: up->state &= ~REJECTING;
922: break;
923: case REJ:
924: if(IN(seqno, up->unechoed, up->next))
925: up->unechoed = next;
926: /*
927: * ... FALL THROUGH ...
928: */
929: case ACK:
930: /*
931: * start a retransmission if we aren't retransmitting
932: * and this is the start of a window.
933: */
934: if(up->unechoed==next && !(up->state & REJECTING)){
935: up->state |= REJECTING;
936: up->rexmit = 1;
937: }
938: break;
939: }
940:
941: tryoutput(up);
942: if(up->state & CLOSING)
943: wakeup(&up->r);
944: }
945:
946: /*
947: * throw away any partially collected input
948: */
949: static void
950: flushinput(Urp *up)
951: {
952: Block *bp;
953:
954: while (bp = getq(up->rq))
955: freeb(bp);
956: up->trx = 0;
957: }
958:
959: /*
960: * initialize output
961: */
962: static void
963: initoutput(Urp *up, int window)
964: {
965: int i;
966:
967: /*
968: * set output window
969: */
970: up->maxblock = window/4;
971: if(up->maxblock < 64)
972: up->maxblock = 64;
973: up->maxblock -= 4;
974: up->maxout = 4;
975:
976: /*
977: * set sequence varialbles
978: */
979: up->unechoed = 1;
980: up->unacked = 1;
981: up->next = 1;
982: up->nxb = 1;
983: up->rexmit = 0;
984:
985: /*
986: * free any outstanding blocks
987: */
988: for(i = 0; i < 8; i++){
989: qlock(&up->xl[i]);
990: if(up->xb[i])
991: freeb(up->xb[i]);
992: up->xb[i] = 0;
993: qunlock(&up->xl[i]);
994: }
995:
996: /*
997: * tell the other side we've inited
998: */
999: up->state |= INITING;
1000: up->timer = NOW + MSrexmit;
1001: { Queue *q = up->wq; DPRINT("initoutput (%d): ", up->timer); }
1002: sendctl(up, INIT1);
1003: }
1004:
1005: /*
1006: * initialize input
1007: */
1008: static void
1009: initinput(Urp *up)
1010: {
1011: /*
1012: * restart all sequence parameters
1013: */
1014: up->blocks = 0;
1015: up->trx = 0;
1016: up->iseq = 0;
1017: up->lastecho = ECHO+0;
1018: flushinput(up);
1019: }
1020:
1021: static void
1022: urpkproc(void *arg)
1023: {
1024: Urp *up;
1025:
1026: USED(arg);
1027:
1028: if(waserror())
1029: ;
1030:
1031: for(;;){
1032: for(up = urpalloc.urp; up; up = up->list){
1033: if(up->state==0 || (up->state&HUNGUP))
1034: continue;
1035: if(!canqlock(up))
1036: continue;
1037: if(waserror()){
1038: qunlock(up);
1039: continue;
1040: }
1041: if(up->state==0 || (up->state&HUNGUP)){
1042: qunlock(up);
1043: poperror();
1044: continue;
1045: }
1046: if(up->iseq!=(up->lastecho&7) && !QFULL(up->rq->next))
1047: sendack(up);
1048: output(up);
1049: qunlock(up);
1050: poperror();
1051: }
1052: tsleep(&urpkr, return0, 0, 500);
1053: }
1054: }
1055:
1056: /*
1057: * urp got very confused, complain
1058: */
1059: static void
1060: urpvomit(char *msg, Urp* up)
1061: {
1062: print("urpvomit: %s %ux next %d unechoed %d unacked %d nxb %d\n",
1063: msg, up, up->next, up->unechoed, up->unacked, up->nxb);
1064: print("\txb: %ux %ux %ux %ux %ux %ux %ux %ux\n",
1065: up->xb[0], up->xb[1], up->xb[2], up->xb[3], up->xb[4],
1066: up->xb[5], up->xb[6], up->xb[7]);
1067: print("\tiseq: %uo lastecho: %uo trx: %d trbuf: %uo %uo %uo\n",
1068: up->iseq, up->lastecho, up->trx, up->trbuf[0], up->trbuf[1],
1069: up->trbuf[2]);
1070: print("\tupq: %ux %d %d\n", &up->rq->next->r, up->rq->next->nb,
1071: up->rq->next->len);
1072: }
1073:
1074: void
1075: urpfillstats(Chan *c, char *buf, int len)
1076: {
1077: char b[256];
1078:
1079: USED(c);
1080: sprint(b, "in: %d\nout: %d\nrexmit: %d\nrjtrs: %d\nrjpks: %d\nrjseq: %d\nenqsx: %d\nenqsr: %d\n",
1081: urpstat.input, urpstat.output, urpstat.rexmit, urpstat.rjtrs,
1082: urpstat.rjpks, urpstat.rjseq, urpstat.enqsx, urpstat.enqsr);
1083: strncpy(buf, b, len);
1084: }
This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.