|
|
1.1 ! root 1: #include "u.h" ! 2: #include "../port/lib.h" ! 3: #include "mem.h" ! 4: #include "dat.h" ! 5: #include "fns.h" ! 6: #include "io.h" ! 7: #include "../port/error.h" ! 8: ! 9: enum { ! 10: MSrexmit= 1000, ! 11: Nmask= 0x7, ! 12: }; ! 13: ! 14: #define DPRINT if(q->flag&QDEBUG)kprint ! 15: ! 16: typedef struct Urp Urp; ! 17: ! 18: #define NOW (MACHP(0)->ticks*MS2HZ) ! 19: ! 20: /* ! 21: * URP status ! 22: */ ! 23: struct urpstat { ! 24: ulong input; /* bytes read from urp */ ! 25: ulong output; /* bytes output to urp */ ! 26: ulong rexmit; /* retransmit rejected urp msg */ ! 27: ulong rjtrs; /* reject, trailer size */ ! 28: ulong rjpks; /* reject, packet size */ ! 29: ulong rjseq; /* reject, sequence number */ ! 30: ulong levelb; /* unknown level b */ ! 31: ulong enqsx; /* enqs sent */ ! 32: ulong enqsr; /* enqs rcved */ ! 33: } urpstat; ! 34: ! 35: struct Urp { ! 36: QLock; ! 37: Urp *list; /* list of all urp structures */ ! 38: short state; /* flags */ ! 39: Rendez r; /* process waiting for output to finish */ ! 40: ! 41: /* input */ ! 42: QLock ack; /* ack lock */ ! 43: Queue *rq; /* input queue */ ! 44: uchar iseq; /* last good input sequence number */ ! 45: uchar lastecho; /* last echo/rej sent */ ! 46: uchar trbuf[3]; /* trailer being collected */ ! 47: short trx; /* # bytes in trailer being collected */ ! 48: int blocks; ! 49: ! 50: /* output */ ! 51: QLock xmit; /* output lock, only one process at a time */ ! 52: Queue *wq; /* output queue */ ! 53: int maxout; /* maximum outstanding unacked blocks */ ! 54: int maxblock; /* max block size */ ! 55: int next; /* next block to send */ ! 56: int unechoed; /* first unechoed block */ ! 57: int unacked; /* first unacked block */ ! 58: int nxb; /* next xb to use */ ! 59: Block *xb[8]; /* the xmit window buffer */ ! 60: QLock xl[8]; ! 61: ulong timer; /* timeout for xmit */ ! 62: int rexmit; ! 63: }; ! 64: ! 65: /* list of allocated urp structures (never freed) */ ! 66: struct ! 67: { ! 68: Lock; ! 69: Urp *urp; ! 70: } urpalloc; ! 71: ! 72: Rendez urpkr; ! 73: QLock urpkl; ! 74: int urpkstarted; ! 75: ! 76: #define WINDOW(u) ((u)->unechoed>(u)->next ? (u)->unechoed+(u)->maxout-(u)->next-8 :\ ! 77: (u)->unechoed+(u)->maxout-(u)->next) ! 78: #define IN(x, f, n) (f<=n ? (x>=f && x<n) : (x<n || x>=f)) ! 79: #define NEXT(x) (((x)+1)&Nmask) ! 80: ! 81: /* ! 82: * Protocol control bytes ! 83: */ ! 84: #define SEQ 0010 /* sequence number, ends trailers */ ! 85: #undef ECHO ! 86: #define ECHO 0020 /* echos, data given to next queue */ ! 87: #define REJ 0030 /* rejections, transmission error */ ! 88: #define ACK 0040 /* acknowledgments */ ! 89: #define BOT 0050 /* beginning of trailer */ ! 90: #define BOTM 0051 /* beginning of trailer, more data follows */ ! 91: #define BOTS 0052 /* seq update algorithm on this trailer */ ! 92: #define SOU 0053 /* start of unsequenced trailer */ ! 93: #define EOU 0054 /* end of unsequenced trailer */ ! 94: #define ENQ 0055 /* xmitter requests flow/error status */ ! 95: #define CHECK 0056 /* xmitter requests error status */ ! 96: #define INITREQ 0057 /* request initialization */ ! 97: #define INIT0 0060 /* disable trailer processing */ ! 98: #define INIT1 0061 /* enable trailer procesing */ ! 99: #define AINIT 0062 /* response to INIT0/INIT1 */ ! 100: #undef DELAY ! 101: #define DELAY 0100 /* real-time printing delay */ ! 102: #define BREAK 0110 /* Send/receive break (new style) */ ! 103: ! 104: #define REJECTING 0x1 ! 105: #define INITING 0x2 ! 106: #define HUNGUP 0x4 ! 107: #define OPEN 0x8 ! 108: #define CLOSING 0x10 ! 109: ! 110: /* ! 111: * predeclared ! 112: */ ! 113: static void urpreset(void); ! 114: static void urpciput(Queue*, Block*); ! 115: static void urpiput(Queue*, Block*); ! 116: static void urpoput(Queue*, Block*); ! 117: static void urpopen(Queue*, Stream*); ! 118: static void urpclose(Queue *); ! 119: static void output(Urp*); ! 120: static void sendblock(Urp*, int); ! 121: static void rcvack(Urp*, int); ! 122: static void flushinput(Urp*); ! 123: static void sendctl(Urp*, int); ! 124: static void sendack(Urp*); ! 125: static void sendrej(Urp*); ! 126: static void initoutput(Urp*, int); ! 127: static void initinput(Urp*); ! 128: static void urpkproc(void *arg); ! 129: static void urpvomit(char*, Urp*); ! 130: static void tryoutput(Urp*); ! 131: ! 132: Qinfo urpinfo = ! 133: { ! 134: urpciput, ! 135: urpoput, ! 136: urpopen, ! 137: urpclose, ! 138: "urp", ! 139: urpreset ! 140: }; ! 141: ! 142: void ! 143: sturplink(void) ! 144: { ! 145: newqinfo(&urpinfo); ! 146: } ! 147: ! 148: static void ! 149: urpreset(void) ! 150: { ! 151: } ! 152: ! 153: static void ! 154: urpopen(Queue *q, Stream *s) ! 155: { ! 156: Urp *up; ! 157: ! 158: USED(s); ! 159: if(!urpkstarted){ ! 160: qlock(&urpkl); ! 161: if(!urpkstarted){ ! 162: urpkstarted = 1; ! 163: kproc("urpkproc", urpkproc, 0); ! 164: } ! 165: qunlock(&urpkl); ! 166: } ! 167: ! 168: /* ! 169: * find an unused urp structure ! 170: */ ! 171: for(up = urpalloc.urp; up; up = up->list){ ! 172: if(up->state == 0){ ! 173: qlock(up); ! 174: if(up->state == 0) ! 175: break; ! 176: qunlock(up); ! 177: } ! 178: } ! 179: if(up == 0){ ! 180: /* ! 181: * none available, create a new one, they are never freed ! 182: */ ! 183: up = smalloc(sizeof(Urp)); ! 184: qlock(up); ! 185: lock(&urpalloc); ! 186: up->list = urpalloc.urp; ! 187: urpalloc.urp = up; ! 188: unlock(&urpalloc); ! 189: } ! 190: q->ptr = q->other->ptr = up; ! 191: q->rp = &urpkr; ! 192: up->rq = q; ! 193: up->wq = q->other; ! 194: up->state = OPEN; ! 195: qunlock(up); ! 196: initinput(up); ! 197: initoutput(up, 0); ! 198: } ! 199: ! 200: /* ! 201: * Shut down the connection and kill off the kernel process ! 202: */ ! 203: static int ! 204: isflushed(void *a) ! 205: { ! 206: Urp *up; ! 207: ! 208: up = (Urp *)a; ! 209: return (up->state&HUNGUP) || (up->unechoed==up->nxb && up->wq->len==0); ! 210: } ! 211: static void ! 212: urpclose(Queue *q) ! 213: { ! 214: Urp *up; ! 215: int i; ! 216: ! 217: up = (Urp *)q->ptr; ! 218: if(up == 0) ! 219: return; ! 220: ! 221: /* ! 222: * wait for all outstanding messages to drain, tell kernel ! 223: * process we're closing. ! 224: * ! 225: * if 2 minutes elapse, give it up ! 226: */ ! 227: up->state |= CLOSING; ! 228: if(!waserror()){ ! 229: tsleep(&up->r, isflushed, up, 2*60*1000); ! 230: poperror(); ! 231: } ! 232: up->state |= HUNGUP; ! 233: ! 234: qlock(&up->xmit); ! 235: /* ! 236: * ack all outstanding messages ! 237: */ ! 238: i = up->next - 1; ! 239: if(i < 0) ! 240: i = 7; ! 241: rcvack(up, ECHO+i); ! 242: ! 243: /* ! 244: * free all staged but unsent messages ! 245: */ ! 246: for(i = 0; i < 7; i++){ ! 247: qlock(&up->xl[i]); ! 248: if(up->xb[i]){ ! 249: freeb(up->xb[i]); ! 250: up->xb[i] = 0; ! 251: } ! 252: qunlock(&up->xl[i]); ! 253: } ! 254: qunlock(&up->xmit); ! 255: ! 256: qlock(up); ! 257: up->state = 0; ! 258: qunlock(up); ! 259: } ! 260: ! 261: /* ! 262: * upstream control messages ! 263: */ ! 264: static void ! 265: urpctliput(Urp *up, Queue *q, Block *bp) ! 266: { ! 267: switch(bp->type){ ! 268: case M_HANGUP: ! 269: up->state |= HUNGUP; ! 270: wakeup(&up->r); ! 271: break; ! 272: } ! 273: PUTNEXT(q, bp); ! 274: } ! 275: ! 276: /* ! 277: * character mode input. ! 278: * ! 279: * the first byte in every message is a ctl byte (which belongs at the end). ! 280: */ ! 281: void ! 282: urpciput(Queue *q, Block *bp) ! 283: { ! 284: Urp *up; ! 285: int i; ! 286: int ctl; ! 287: ! 288: up = (Urp *)q->ptr; ! 289: if(up == 0) ! 290: return; ! 291: if(bp->type != M_DATA){ ! 292: urpctliput(up, q, bp); ! 293: return; ! 294: } ! 295: ! 296: /* ! 297: * get the control character ! 298: */ ! 299: ctl = *bp->rptr++; ! 300: if(ctl < 0) ! 301: return; ! 302: ! 303: /* ! 304: * take care of any data ! 305: */ ! 306: if(BLEN(bp)>0 && q->next->len<2*Streamhi && q->next->nb<2*Streambhi){ ! 307: bp->flags |= S_DELIM; ! 308: urpstat.input += BLEN(bp); ! 309: PUTNEXT(q, bp); ! 310: } else ! 311: freeb(bp); ! 312: ! 313: /* ! 314: * handle the control character ! 315: */ ! 316: switch(ctl){ ! 317: case 0: ! 318: break; ! 319: case ENQ: ! 320: DPRINT("rENQ(c)\n"); ! 321: urpstat.enqsr++; ! 322: sendctl(up, up->lastecho); ! 323: sendctl(up, ACK+up->iseq); ! 324: break; ! 325: ! 326: case CHECK: ! 327: DPRINT("rCHECK(c)\n"); ! 328: sendctl(up, ACK+up->iseq); ! 329: break; ! 330: ! 331: case AINIT: ! 332: DPRINT("rAINIT(c)\n"); ! 333: up->state &= ~INITING; ! 334: flushinput(up); ! 335: tryoutput(up); ! 336: break; ! 337: ! 338: case INIT0: ! 339: case INIT1: ! 340: DPRINT("rINIT%d(c)\n", ctl-INIT0); ! 341: sendctl(up, AINIT); ! 342: if(ctl == INIT1) ! 343: q->put = urpiput; ! 344: initinput(up); ! 345: break; ! 346: ! 347: case INITREQ: ! 348: DPRINT("rINITREQ(c)\n"); ! 349: initoutput(up, 0); ! 350: break; ! 351: ! 352: case BREAK: ! 353: break; ! 354: ! 355: case REJ+0: case REJ+1: case REJ+2: case REJ+3: ! 356: case REJ+4: case REJ+5: case REJ+6: case REJ+7: ! 357: DPRINT("rREJ%d(c)\n", ctl-REJ); ! 358: rcvack(up, ctl); ! 359: break; ! 360: ! 361: case ACK+0: case ACK+1: case ACK+2: case ACK+3: ! 362: case ACK+4: case ACK+5: case ACK+6: case ACK+7: ! 363: case ECHO+0: case ECHO+1: case ECHO+2: case ECHO+3: ! 364: case ECHO+4: case ECHO+5: case ECHO+6: case ECHO+7: ! 365: DPRINT("%s%d(c)\n", (ctl&ECHO)?"rECHO":"rACK", ctl&7); ! 366: rcvack(up, ctl); ! 367: break; ! 368: ! 369: case SEQ+0: case SEQ+1: case SEQ+2: case SEQ+3: ! 370: case SEQ+4: case SEQ+5: case SEQ+6: case SEQ+7: ! 371: DPRINT("rSEQ%d(c)\n", ctl-SEQ); ! 372: qlock(&up->ack); ! 373: i = ctl & Nmask; ! 374: if(!QFULL(q->next)) ! 375: sendctl(up, up->lastecho = ECHO+i); ! 376: up->iseq = i; ! 377: qunlock(&up->ack); ! 378: break; ! 379: } ! 380: } ! 381: ! 382: /* ! 383: * block mode input. ! 384: * ! 385: * the first byte in every message is a ctl byte (which belongs at the end). ! 386: * ! 387: * Simplifying assumption: one put == one message && the control byte ! 388: * is in the first block. If this isn't true, strange bytes will be ! 389: * used as control bytes. ! 390: * ! 391: * There's no input lock. The channel could be closed while we're ! 392: * processing a message. ! 393: */ ! 394: void ! 395: urpiput(Queue *q, Block *bp) ! 396: { ! 397: Urp *up; ! 398: int i, len; ! 399: int ctl; ! 400: ! 401: up = (Urp *)q->ptr; ! 402: if(up == 0) ! 403: return; ! 404: if(bp->type != M_DATA){ ! 405: urpctliput(up, q, bp); ! 406: return; ! 407: } ! 408: ! 409: /* ! 410: * get the control character ! 411: */ ! 412: ctl = *bp->rptr++; ! 413: ! 414: /* ! 415: * take care of any block count(trx) ! 416: */ ! 417: while(up->trx){ ! 418: if(BLEN(bp)<=0) ! 419: break; ! 420: switch (up->trx) { ! 421: case 1: ! 422: case 2: ! 423: up->trbuf[up->trx++] = *bp->rptr++; ! 424: continue; ! 425: default: ! 426: up->trx = 0; ! 427: break; ! 428: } ! 429: } ! 430: ! 431: /* ! 432: * queue the block(s) ! 433: */ ! 434: if(BLEN(bp) > 0){ ! 435: bp->flags &= ~S_DELIM; ! 436: putq(q, bp); ! 437: if(q->len > 4*1024){ ! 438: flushinput(up); ! 439: return; ! 440: } ! 441: } else ! 442: freeb(bp); ! 443: ! 444: /* ! 445: * handle the control character ! 446: */ ! 447: switch(ctl){ ! 448: case 0: ! 449: break; ! 450: case ENQ: ! 451: DPRINT("rENQ %d %uo %uo\n", up->blocks, up->lastecho, ACK+up->iseq); ! 452: up->blocks = 0; ! 453: urpstat.enqsr++; ! 454: sendctl(up, up->lastecho); ! 455: sendctl(up, ACK+up->iseq); ! 456: flushinput(up); ! 457: break; ! 458: ! 459: case CHECK: ! 460: DPRINT("rCHECK\n"); ! 461: sendctl(up, ACK+up->iseq); ! 462: break; ! 463: ! 464: case AINIT: ! 465: DPRINT("rAINIT\n"); ! 466: up->state &= ~INITING; ! 467: flushinput(up); ! 468: tryoutput(up); ! 469: break; ! 470: ! 471: case INIT0: ! 472: case INIT1: ! 473: DPRINT("rINIT%d\n", ctl-INIT0); ! 474: sendctl(up, AINIT); ! 475: if(ctl == INIT0) ! 476: q->put = urpciput; ! 477: initinput(up); ! 478: break; ! 479: ! 480: case INITREQ: ! 481: DPRINT("rINITREQ\n"); ! 482: initoutput(up, 0); ! 483: break; ! 484: ! 485: case BREAK: ! 486: break; ! 487: ! 488: case BOT: ! 489: case BOTM: ! 490: case BOTS: ! 491: DPRINT("rBOT%c...", " MS"[ctl-BOT]); ! 492: up->trx = 1; ! 493: up->trbuf[0] = ctl; ! 494: break; ! 495: ! 496: case REJ+0: case REJ+1: case REJ+2: case REJ+3: ! 497: case REJ+4: case REJ+5: case REJ+6: case REJ+7: ! 498: DPRINT("rREJ%d\n", ctl-REJ); ! 499: rcvack(up, ctl); ! 500: break; ! 501: ! 502: case ACK+0: case ACK+1: case ACK+2: case ACK+3: ! 503: case ACK+4: case ACK+5: case ACK+6: case ACK+7: ! 504: case ECHO+0: case ECHO+1: case ECHO+2: case ECHO+3: ! 505: case ECHO+4: case ECHO+5: case ECHO+6: case ECHO+7: ! 506: DPRINT("%s%d\n", (ctl&ECHO)?"rECHO":"rACK", ctl&7); ! 507: rcvack(up, ctl); ! 508: break; ! 509: ! 510: /* ! 511: * if the sequence number is the next expected ! 512: * and the trailer length == 3 ! 513: * and the block count matches the bytes received ! 514: * then send the bytes upstream. ! 515: */ ! 516: case SEQ+0: case SEQ+1: case SEQ+2: case SEQ+3: ! 517: case SEQ+4: case SEQ+5: case SEQ+6: case SEQ+7: ! 518: len = up->trbuf[1] + (up->trbuf[2]<<8); ! 519: DPRINT("rSEQ%d(%d,%d,%d)...", ctl-SEQ, up->trx, len, q->len); ! 520: i = ctl & Nmask; ! 521: if(up->trx != 3){ ! 522: urpstat.rjtrs++; ! 523: sendrej(up); ! 524: break; ! 525: }else if(q->len != len){ ! 526: urpstat.rjpks++; ! 527: sendrej(up); ! 528: break; ! 529: }else if(i != ((up->iseq+1)&Nmask)){ ! 530: urpstat.rjseq++; ! 531: sendrej(up); ! 532: break; ! 533: }else if(q->next->len > (3*Streamhi)/2 ! 534: || q->next->nb > (3*Streambhi)/2){ ! 535: DPRINT("next->len=%d, next->nb=%d\n", ! 536: q->next->len, q->next->nb); ! 537: flushinput(up); ! 538: break; ! 539: } ! 540: DPRINT("accept %d\n", q->len); ! 541: ! 542: /* ! 543: * send data upstream ! 544: */ ! 545: if(q->first) { ! 546: if(up->trbuf[0] != BOTM) ! 547: q->last->flags |= S_DELIM; ! 548: while(bp = getq(q)){ ! 549: urpstat.input += BLEN(bp); ! 550: PUTNEXT(q, bp); ! 551: } ! 552: } else { ! 553: bp = allocb(0); ! 554: if(up->trbuf[0] != BOTM) ! 555: bp->flags |= S_DELIM; ! 556: PUTNEXT(q, bp); ! 557: } ! 558: up->trx = 0; ! 559: ! 560: /* ! 561: * acknowledge receipt ! 562: */ ! 563: qlock(&up->ack); ! 564: up->iseq = i; ! 565: if(!QFULL(q->next)) ! 566: sendctl(up, up->lastecho = ECHO|i); ! 567: qunlock(&up->ack); ! 568: break; ! 569: } ! 570: } ! 571: ! 572: /* ! 573: * downstream control ! 574: */ ! 575: Queue *trapq; ! 576: static void ! 577: urpctloput(Urp *up, Queue *q, Block *bp) ! 578: { ! 579: char *fields[2]; ! 580: int outwin; ! 581: ! 582: switch(bp->type){ ! 583: case M_CTL: ! 584: if(streamparse("break", bp)){ ! 585: /* ! 586: * send a break as part of the data stream ! 587: */ ! 588: urpstat.output++; ! 589: bp->wptr = bp->lim; ! 590: bp->rptr = bp->wptr - 1; ! 591: *bp->rptr = BREAK; ! 592: putq(q, bp); ! 593: output(up); ! 594: return; ! 595: } ! 596: if(streamparse("init", bp)){ ! 597: outwin = strtoul((char*)bp->rptr, 0, 0); ! 598: initoutput(up, outwin); ! 599: freeb(bp); ! 600: return; ! 601: } ! 602: if(streamparse("debug", bp)){ ! 603: switch(getfields((char *)bp->rptr, fields, 2, " ")){ ! 604: case 1: ! 605: if (strcmp(fields[0], "on") == 0) { ! 606: q->flag |= QDEBUG; ! 607: q->other->flag |= QDEBUG; ! 608: } ! 609: if (strcmp(fields[0], "off") == 0) { ! 610: q->flag &= ~QDEBUG; ! 611: q->other->flag &= ~QDEBUG; ! 612: } ! 613: } ! 614: freeb(bp); ! 615: return; ! 616: } ! 617: if(streamparse("trap", bp)){ ! 618: trapq = q; ! 619: return; ! 620: } ! 621: } ! 622: PUTNEXT(q, bp); ! 623: } ! 624: ! 625: /* ! 626: * accept data from a writer ! 627: */ ! 628: static void ! 629: urpoput(Queue *q, Block *bp) ! 630: { ! 631: Urp *up; ! 632: ! 633: up = (Urp *)q->ptr; ! 634: ! 635: if(bp->type != M_DATA){ ! 636: urpctloput(up, q, bp); ! 637: return; ! 638: } ! 639: ! 640: urpstat.output += BLEN(bp); ! 641: putq(q, bp); ! 642: output(up); ! 643: } ! 644: ! 645: /* ! 646: * start output ! 647: */ ! 648: static void ! 649: output(Urp *up) ! 650: { ! 651: Block *bp, *nbp; ! 652: ulong now; ! 653: Queue *q; ! 654: int i; ! 655: ! 656: if(!canqlock(&up->xmit)) ! 657: return; ! 658: ! 659: if(waserror()){ ! 660: print("urp output error\n"); ! 661: qunlock(&up->xmit); ! 662: nexterror(); ! 663: } ! 664: ! 665: /* ! 666: * if still initing and it's time to rexmit, send an INIT1 ! 667: */ ! 668: now = NOW; ! 669: if(up->state & INITING){ ! 670: if(now > up->timer){ ! 671: q = up->wq; ! 672: DPRINT("INITING timer (%d, %d): ", now, up->timer); ! 673: sendctl(up, INIT1); ! 674: up->timer = now + MSrexmit; ! 675: } ! 676: goto out; ! 677: } ! 678: ! 679: /* ! 680: * fill the transmit buffers, `nxb' can never overtake `unechoed' ! 681: */ ! 682: q = up->wq; ! 683: i = NEXT(up->nxb); ! 684: if(i != up->unechoed) { ! 685: for(bp = getq(q); bp && i!=up->unechoed; i = NEXT(i)){ ! 686: if(up->xb[up->nxb] != 0) ! 687: urpvomit("output", up); ! 688: if(BLEN(bp) > up->maxblock){ ! 689: nbp = up->xb[up->nxb] = allocb(0); ! 690: nbp->rptr = bp->rptr; ! 691: nbp->wptr = bp->rptr = bp->rptr + up->maxblock; ! 692: } else { ! 693: up->xb[up->nxb] = bp; ! 694: bp = getq(q); ! 695: } ! 696: up->nxb = i; ! 697: } ! 698: if(bp) ! 699: putbq(q, bp); ! 700: } ! 701: ! 702: /* ! 703: * retransmit cruft ! 704: */ ! 705: if(up->rexmit){ ! 706: /* ! 707: * if a retransmit is requested, move next back to ! 708: * the unacked blocks ! 709: */ ! 710: urpstat.rexmit++; ! 711: up->rexmit = 0; ! 712: up->next = up->unacked; ! 713: } else if(up->unechoed!=up->next && NOW>up->timer){ ! 714: /* ! 715: * if a retransmit time has elapsed since a transmit, ! 716: * send an ENQ ! 717: */ ! 718: DPRINT("OUTPUT timer (%d, %d): ", NOW, up->timer); ! 719: up->timer = NOW + MSrexmit; ! 720: up->state &= ~REJECTING; ! 721: urpstat.enqsx++; ! 722: sendctl(up, ENQ); ! 723: goto out; ! 724: } ! 725: ! 726: /* ! 727: * if there's a window open, push some blocks out ! 728: * ! 729: * the lock is to synchronize with acknowledges that free ! 730: * blocks. ! 731: */ ! 732: while(WINDOW(up)>0 && up->next!=up->nxb){ ! 733: i = up->next; ! 734: qlock(&up->xl[i]); ! 735: if(waserror()){ ! 736: qunlock(&up->xl[i]); ! 737: nexterror(); ! 738: } ! 739: sendblock(up, i); ! 740: qunlock(&up->xl[i]); ! 741: up->next = NEXT(up->next); ! 742: poperror(); ! 743: } ! 744: out: ! 745: qunlock(&up->xmit); ! 746: poperror(); ! 747: } ! 748: ! 749: /* ! 750: * try output, this is called by an input process ! 751: */ ! 752: void ! 753: tryoutput(Urp *up) ! 754: { ! 755: if(!waserror()){ ! 756: output(up); ! 757: poperror(); ! 758: } ! 759: } ! 760: ! 761: /* ! 762: * send a control byte, put the byte at the end of the allocated ! 763: * space in case a lower layer needs header room. ! 764: */ ! 765: static void ! 766: sendctl(Urp *up, int ctl) ! 767: { ! 768: Block *bp; ! 769: Queue *q; ! 770: ! 771: q = up->wq; ! 772: if(QFULL(q->next)) ! 773: return; ! 774: bp = allocb(1); ! 775: bp->wptr = bp->lim; ! 776: bp->rptr = bp->lim-1; ! 777: *bp->rptr = ctl; ! 778: bp->flags |= S_DELIM; ! 779: DPRINT("sCTL %ulx\n", ctl); ! 780: PUTNEXT(q, bp); ! 781: } ! 782: ! 783: /* ! 784: * send a reject ! 785: */ ! 786: static void ! 787: sendrej(Urp *up) ! 788: { ! 789: Queue *q = up->wq; ! 790: flushinput(up); ! 791: qlock(&up->ack); ! 792: if((up->lastecho&~Nmask) == ECHO){ ! 793: DPRINT("REJ %d\n", up->iseq); ! 794: sendctl(up, up->lastecho = REJ|up->iseq); ! 795: } ! 796: qunlock(&up->ack); ! 797: } ! 798: ! 799: /* ! 800: * send an acknowledge ! 801: */ ! 802: static void ! 803: sendack(Urp *up) ! 804: { ! 805: /* ! 806: * check the precondition for acking ! 807: */ ! 808: if(QFULL(up->rq->next) || (up->lastecho&Nmask)==up->iseq) ! 809: return; ! 810: ! 811: if(!canqlock(&up->ack)) ! 812: return; ! 813: ! 814: /* ! 815: * check again now that we've locked ! 816: */ ! 817: if(QFULL(up->rq->next) || (up->lastecho&Nmask)==up->iseq){ ! 818: qunlock(&up->ack); ! 819: return; ! 820: } ! 821: ! 822: /* ! 823: * send the ack ! 824: */ ! 825: { Queue *q = up->wq; DPRINT("sendack: "); } ! 826: sendctl(up, up->lastecho = ECHO|up->iseq); ! 827: qunlock(&up->ack); ! 828: } ! 829: ! 830: /* ! 831: * send a block. ! 832: */ ! 833: static void ! 834: sendblock(Urp *up, int bn) ! 835: { ! 836: Block *bp, *m, *nbp; ! 837: int n; ! 838: Queue *q; ! 839: ! 840: q = up->wq; ! 841: up->timer = NOW + MSrexmit; ! 842: if(QFULL(q->next)) ! 843: return; ! 844: ! 845: /* ! 846: * message 1, the BOT and the data ! 847: */ ! 848: bp = up->xb[bn]; ! 849: if(bp == 0) ! 850: return; ! 851: m = allocb(1); ! 852: m->rptr = m->lim - 1; ! 853: m->wptr = m->lim; ! 854: *m->rptr = (bp->flags & S_DELIM) ? BOT : BOTM; ! 855: nbp = allocb(0); ! 856: nbp->rptr = bp->rptr; ! 857: nbp->wptr = bp->wptr; ! 858: nbp->base = bp->base; ! 859: nbp->lim = bp->lim; ! 860: nbp->flags |= S_DELIM; ! 861: if(bp->type == M_CTL){ ! 862: PUTNEXT(q, nbp); ! 863: m->flags |= S_DELIM; ! 864: PUTNEXT(q, m); ! 865: } else { ! 866: m->next = nbp; ! 867: PUTNEXT(q, m); ! 868: } ! 869: ! 870: /* ! 871: * message 2, the block length and the SEQ ! 872: */ ! 873: m = allocb(3); ! 874: m->rptr = m->lim - 3; ! 875: m->wptr = m->lim; ! 876: n = BLEN(bp); ! 877: m->rptr[0] = SEQ | bn; ! 878: m->rptr[1] = n; ! 879: m->rptr[2] = n>>8; ! 880: m->flags |= S_DELIM; ! 881: PUTNEXT(q, m); ! 882: DPRINT("sb %d (%d)\n", bn, up->timer); ! 883: } ! 884: ! 885: /* ! 886: * receive an acknowledgement ! 887: */ ! 888: static void ! 889: rcvack(Urp *up, int msg) ! 890: { ! 891: int seqno; ! 892: int next; ! 893: int i; ! 894: ! 895: seqno = msg&Nmask; ! 896: next = NEXT(seqno); ! 897: ! 898: /* ! 899: * release any acknowledged blocks ! 900: */ ! 901: if(IN(seqno, up->unacked, up->next)){ ! 902: for(; up->unacked != next; up->unacked = NEXT(up->unacked)){ ! 903: i = up->unacked; ! 904: qlock(&up->xl[i]); ! 905: if(up->xb[i]) ! 906: freeb(up->xb[i]); ! 907: up->xb[i] = 0; ! 908: qunlock(&up->xl[i]); ! 909: } ! 910: } ! 911: ! 912: switch(msg & 0370){ ! 913: case ECHO: ! 914: if(IN(seqno, up->unechoed, up->next)) { ! 915: up->unechoed = next; ! 916: } ! 917: /* ! 918: * the next reject at the start of a window starts a ! 919: * retransmission. ! 920: */ ! 921: up->state &= ~REJECTING; ! 922: break; ! 923: case REJ: ! 924: if(IN(seqno, up->unechoed, up->next)) ! 925: up->unechoed = next; ! 926: /* ! 927: * ... FALL THROUGH ... ! 928: */ ! 929: case ACK: ! 930: /* ! 931: * start a retransmission if we aren't retransmitting ! 932: * and this is the start of a window. ! 933: */ ! 934: if(up->unechoed==next && !(up->state & REJECTING)){ ! 935: up->state |= REJECTING; ! 936: up->rexmit = 1; ! 937: } ! 938: break; ! 939: } ! 940: ! 941: tryoutput(up); ! 942: if(up->state & CLOSING) ! 943: wakeup(&up->r); ! 944: } ! 945: ! 946: /* ! 947: * throw away any partially collected input ! 948: */ ! 949: static void ! 950: flushinput(Urp *up) ! 951: { ! 952: Block *bp; ! 953: ! 954: while (bp = getq(up->rq)) ! 955: freeb(bp); ! 956: up->trx = 0; ! 957: } ! 958: ! 959: /* ! 960: * initialize output ! 961: */ ! 962: static void ! 963: initoutput(Urp *up, int window) ! 964: { ! 965: int i; ! 966: ! 967: /* ! 968: * set output window ! 969: */ ! 970: up->maxblock = window/4; ! 971: if(up->maxblock < 64) ! 972: up->maxblock = 64; ! 973: up->maxblock -= 4; ! 974: up->maxout = 4; ! 975: ! 976: /* ! 977: * set sequence varialbles ! 978: */ ! 979: up->unechoed = 1; ! 980: up->unacked = 1; ! 981: up->next = 1; ! 982: up->nxb = 1; ! 983: up->rexmit = 0; ! 984: ! 985: /* ! 986: * free any outstanding blocks ! 987: */ ! 988: for(i = 0; i < 8; i++){ ! 989: qlock(&up->xl[i]); ! 990: if(up->xb[i]) ! 991: freeb(up->xb[i]); ! 992: up->xb[i] = 0; ! 993: qunlock(&up->xl[i]); ! 994: } ! 995: ! 996: /* ! 997: * tell the other side we've inited ! 998: */ ! 999: up->state |= INITING; ! 1000: up->timer = NOW + MSrexmit; ! 1001: { Queue *q = up->wq; DPRINT("initoutput (%d): ", up->timer); } ! 1002: sendctl(up, INIT1); ! 1003: } ! 1004: ! 1005: /* ! 1006: * initialize input ! 1007: */ ! 1008: static void ! 1009: initinput(Urp *up) ! 1010: { ! 1011: /* ! 1012: * restart all sequence parameters ! 1013: */ ! 1014: up->blocks = 0; ! 1015: up->trx = 0; ! 1016: up->iseq = 0; ! 1017: up->lastecho = ECHO+0; ! 1018: flushinput(up); ! 1019: } ! 1020: ! 1021: static void ! 1022: urpkproc(void *arg) ! 1023: { ! 1024: Urp *up; ! 1025: ! 1026: USED(arg); ! 1027: ! 1028: if(waserror()) ! 1029: ; ! 1030: ! 1031: for(;;){ ! 1032: for(up = urpalloc.urp; up; up = up->list){ ! 1033: if(up->state==0 || (up->state&HUNGUP)) ! 1034: continue; ! 1035: if(!canqlock(up)) ! 1036: continue; ! 1037: if(waserror()){ ! 1038: qunlock(up); ! 1039: continue; ! 1040: } ! 1041: if(up->state==0 || (up->state&HUNGUP)){ ! 1042: qunlock(up); ! 1043: poperror(); ! 1044: continue; ! 1045: } ! 1046: if(up->iseq!=(up->lastecho&7) && !QFULL(up->rq->next)) ! 1047: sendack(up); ! 1048: output(up); ! 1049: qunlock(up); ! 1050: poperror(); ! 1051: } ! 1052: tsleep(&urpkr, return0, 0, 500); ! 1053: } ! 1054: } ! 1055: ! 1056: /* ! 1057: * urp got very confused, complain ! 1058: */ ! 1059: static void ! 1060: urpvomit(char *msg, Urp* up) ! 1061: { ! 1062: print("urpvomit: %s %ux next %d unechoed %d unacked %d nxb %d\n", ! 1063: msg, up, up->next, up->unechoed, up->unacked, up->nxb); ! 1064: print("\txb: %ux %ux %ux %ux %ux %ux %ux %ux\n", ! 1065: up->xb[0], up->xb[1], up->xb[2], up->xb[3], up->xb[4], ! 1066: up->xb[5], up->xb[6], up->xb[7]); ! 1067: print("\tiseq: %uo lastecho: %uo trx: %d trbuf: %uo %uo %uo\n", ! 1068: up->iseq, up->lastecho, up->trx, up->trbuf[0], up->trbuf[1], ! 1069: up->trbuf[2]); ! 1070: print("\tupq: %ux %d %d\n", &up->rq->next->r, up->rq->next->nb, ! 1071: up->rq->next->len); ! 1072: } ! 1073: ! 1074: void ! 1075: urpfillstats(Chan *c, char *buf, int len) ! 1076: { ! 1077: char b[256]; ! 1078: ! 1079: USED(c); ! 1080: sprint(b, "in: %d\nout: %d\nrexmit: %d\nrjtrs: %d\nrjpks: %d\nrjseq: %d\nenqsx: %d\nenqsr: %d\n", ! 1081: urpstat.input, urpstat.output, urpstat.rexmit, urpstat.rjtrs, ! 1082: urpstat.rjpks, urpstat.rjseq, urpstat.enqsx, urpstat.enqsr); ! 1083: strncpy(buf, b, len); ! 1084: }
This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.