|
|
1.1 ! root 1: #include "u.h" ! 2: #include "lib.h" ! 3: #include "mem.h" ! 4: #include "dat.h" ! 5: #include "fns.h" ! 6: #include "io.h" ! 7: #include "errno.h" ! 8: ! 9: #define NOW (MACHP(0)->ticks*MS2HZ) ! 10: ! 11: enum { ! 12: /* ! 13: * configuration parameters ! 14: */ ! 15: Nconv = 32, /* total number of active circuits */ ! 16: Nifc = 2, /* number of interfaces */ ! 17: MSrexmit = 300, /* retranmission interval in ms */ ! 18: MSack = 50, /* ms to sit on an ack */ ! 19: ! 20: /* ! 21: * relative or immutable ! 22: */ ! 23: Ndir = Nconv + 1, /* entries in the nonet directory */ ! 24: Nsubdir = 2, /* entries in the nonet directory */ ! 25: Nmsg = 128, /* max number of outstanding messages */ ! 26: Nmask = Nmsg - 1, /* mask for log(Nmsg) bits */ ! 27: }; ! 28: ! 29: typedef struct Hdr Hdr; ! 30: typedef struct Msg Msg; ! 31: typedef struct Conversation Conversation; ! 32: typedef struct Interface Interface; ! 33: typedef struct Etherhdr Etherhdr; ! 34: ! 35: /* ! 36: * generic nonet header ! 37: */ ! 38: struct Hdr { ! 39: uchar circuit[3]; /* circuit number */ ! 40: uchar flag; ! 41: uchar mid; /* message id */ ! 42: uchar ack; /* piggy back ack */ ! 43: uchar remain[2]; /* count of remaing bytes of data */ ! 44: uchar sum[2]; /* checksum (0 means none) */ ! 45: }; ! 46: #define HDRSIZE 10 ! 47: #define NEWCALL 0x1 /* flag bit marking a new circuit */ ! 48: #define HANGUP 0x2 /* flag bit requesting hangup */ ! 49: #define ACKME 0x4 /* acknowledge this message */ ! 50: ! 51: /* ! 52: * a buffer describing a nonet message ! 53: */ ! 54: struct Msg { ! 55: QLock; ! 56: Blist; ! 57: int mid; /* sequence number */ ! 58: int rem; /* remaining */ ! 59: long time; ! 60: int acked; ! 61: }; ! 62: ! 63: /* ! 64: * Nonet conversation states (for Conversation.state) ! 65: */ ! 66: enum { ! 67: Cclosed, ! 68: Copen, ! 69: Clistening, ! 70: Cconnected, ! 71: Cconnecting, ! 72: Chungup, ! 73: Cclosing, ! 74: }; ! 75: ! 76: /* ! 77: * one exists for each Nonet conversation. ! 78: */ ! 79: struct Conversation { ! 80: QLock; ! 81: ! 82: Queue *rq; /* input queue */ ! 83: int version; /* incremented each time struct is changed */ ! 84: int state; /* true if listening */ ! 85: ! 86: Msg in[Nmsg]; /* messages being received */ ! 87: int rcvcircuit; /* circuit number of incoming packets */ ! 88: ! 89: uchar ack[Nmsg]; /* acknowledgements waiting to be sent */ ! 90: long atime[Nmsg]; ! 91: int afirst; ! 92: int anext; ! 93: ! 94: QLock xlock; /* one trasmitter at a time */ ! 95: Rendez r; /* process waiting for an output mid */ ! 96: Msg ctl; /* for control messages */ ! 97: Msg out[Nmsg]; /* messages being sent */ ! 98: int first; /* first unacknowledged message */ ! 99: int next; /* next message buffer to use */ ! 100: int lastacked; /* last message acked */ ! 101: Block *media; /* prototype media output header */ ! 102: Hdr *hdr; /* nonet header inside of media header */ ! 103: ! 104: Interface *ifc; ! 105: int kstarted; ! 106: char raddr[64]; /* remote address */ ! 107: int rexmit; /* statistics */ ! 108: int retry; ! 109: int bad; ! 110: int sent; ! 111: int rcvd; ! 112: }; ! 113: ! 114: /* ! 115: * a nonet interface. one exists for every stream that a ! 116: * nonet multiplexor is pushed onto. ! 117: */ ! 118: struct Interface { ! 119: QLock; ! 120: int ref; ! 121: char name[64]; /* interface name */ ! 122: int maxtu; /* maximum transfer unit */ ! 123: int mintu; /* minimum transfer unit */ ! 124: int hsize; /* media header size */ ! 125: Queue *wq; /* interface output queue */ ! 126: void (*connect)(Conversation *, char *); ! 127: Conversation conv[Nconv]; ! 128: }; ! 129: static Interface interface[Nifc]; ! 130: ! 131: void nonetkproc(void *); ! 132: int cksum(Block*, int); ! 133: extern Qinfo noetherinfo; ! 134: ! 135: /* ! 136: * start a new conversation. start an ack/retransmit process if ! 137: * none already exists for this circuit. ! 138: */ ! 139: static void ! 140: startconv(Conversation *cp, int circuit, int state) ! 141: { ! 142: int i; ! 143: char name[32]; ! 144: Interface *ifc; ! 145: ! 146: ifc = cp->ifc; ! 147: ! 148: /* ! 149: * allocate the prototype header ! 150: */ ! 151: cp->media = allocb(ifc->hsize + HDRSIZE); ! 152: cp->media->wptr += ifc->hsize + HDRSIZE; ! 153: cp->hdr = (Hdr *)(cp->media->rptr + ifc->hsize); ! 154: ! 155: /* ! 156: * fill in the circuit number ! 157: */ ! 158: cp->hdr->flag = NEWCALL|ACKME; ! 159: cp->hdr->circuit[2] = circuit>>16; ! 160: cp->hdr->circuit[1] = circuit>>8; ! 161: cp->hdr->circuit[0] = circuit; ! 162: ! 163: /* ! 164: * set the state variables ! 165: */ ! 166: cp->state = state; ! 167: cp->retry = 0; ! 168: for(i = 1; i < Nmsg; i++){ ! 169: cp->in[i].mid = i; ! 170: cp->in[i].acked = 0; ! 171: cp->in[i].rem = 0; ! 172: cp->out[i].mid = i | Nmsg; ! 173: cp->out[i].acked = 1; ! 174: cp->out[i].rem = 0; ! 175: } ! 176: cp->in[0].mid = Nmsg; ! 177: cp->in[0].acked = 0; ! 178: cp->in[0].rem = 0; ! 179: cp->out[0].mid = 0; ! 180: cp->out[0].acked = 1; ! 181: cp->out[0].rem = 0; ! 182: cp->first = cp->next = 1; ! 183: cp->rexmit = cp->bad = cp->sent = cp->rcvd = cp->lastacked = 0; ! 184: ! 185: /* ! 186: * used for demultiplexing ! 187: */ ! 188: cp->rcvcircuit = circuit ^ 1; ! 189: ! 190: /* ! 191: * start the ack/rexmit process ! 192: */ ! 193: if(cp->kstarted == 0){ ! 194: cp->kstarted = 1; ! 195: sprint(name, "**nonet%d**", cp - ifc->conv); ! 196: kproc(name, nonetkproc, cp); ! 197: } ! 198: } ! 199: ! 200: /* ! 201: * connect to the destination whose name is pointed to by bp->rptr. ! 202: */ ! 203: void ! 204: connect(Conversation *cp, Block *bp) ! 205: { ! 206: Interface *ifc; ! 207: char *iname; ! 208: int len; ! 209: int circuit; ! 210: ! 211: ifc = cp->ifc; ! 212: qlock(ifc); ! 213: if(waserror()){ ! 214: qunlock(ifc); ! 215: if(cp->media){ ! 216: freeb(cp->media); ! 217: cp->media = 0; ! 218: } ! 219: freeb(bp); ! 220: nexterror(); ! 221: } ! 222: ! 223: /* ! 224: * init ! 225: */ ! 226: startconv(cp, (++(cp->version) * Nconv) + 2*(cp - ifc->conv), Cconnecting); ! 227: (*ifc->connect)(cp, (char *)bp->rptr); ! 228: strncpy(cp->raddr, (char *)bp->rptr, sizeof(cp->raddr)); ! 229: ! 230: qunlock(ifc); ! 231: freeb(bp); ! 232: poperror(); ! 233: } ! 234: ! 235: /* ! 236: * listen for calls from any interface ! 237: */ ! 238: static int ! 239: listendone(void *a) ! 240: { ! 241: Conversation *cp; ! 242: ! 243: cp = (Conversation *)a; ! 244: return cp->state != Clistening; ! 245: } ! 246: static void ! 247: listen(Conversation *cp, Block *bp) ! 248: { ! 249: freeb(bp); ! 250: if(cp->state >= Clistening){ ! 251: print("listen in use %d %ux %ux\n", cp->state, cp->ifc, interface); ! 252: error(0, Einuse); ! 253: } ! 254: cp->state = Clistening; ! 255: sleep(&cp->r, listendone, cp); ! 256: } ! 257: ! 258: /* ! 259: * send a hangup signal up the stream to get all line disciplines ! 260: * to cease and desist ! 261: */ ! 262: static void ! 263: hangup(Conversation *cp) ! 264: { ! 265: Block *bp; ! 266: Queue *q; ! 267: ! 268: cp->state = Chungup; ! 269: bp = allocb(0); ! 270: bp->type = M_HANGUP; ! 271: q = cp->rq; ! 272: PUTNEXT(q, bp); ! 273: wakeup(&cp->r); ! 274: } ! 275: ! 276: /* ! 277: * process a message acknowledgement. if the message ! 278: * has any xmit buffers queued, free them. ! 279: */ ! 280: static void ! 281: rcvack(Conversation *cp, int mid) ! 282: { ! 283: Msg *mp; ! 284: Block *bp; ! 285: int i; ! 286: ! 287: mp = &cp->out[mid & Nmask]; ! 288: ! 289: /* ! 290: * if already acked, ignore ! 291: */ ! 292: if(mp->acked || mp->mid != mid) ! 293: return; ! 294: mp->acked = 1; ! 295: cp->lastacked = mid; ! 296: ! 297: /* ! 298: * free buffers ! 299: */ ! 300: qlock(mp); ! 301: while(bp = getb(mp)) ! 302: freeb(bp); ! 303: qunlock(mp); ! 304: ! 305: /* ! 306: * advance the first pointer and wakeup any processes waiting for a mid ! 307: */ ! 308: if((mid&Nmask) == cp->first){ ! 309: cp->retry = 0; ! 310: for(i = cp->first; i!=cp->next && cp->out[i].acked; i = (i+1)&Nmask) ! 311: ; ! 312: cp->first = i; ! 313: wakeup(&cp->r); ! 314: } ! 315: } ! 316: ! 317: /* ! 318: * queue an acknowledgement to be sent. ignore it if we already have Nmsg ! 319: * acknowledgements queued. ! 320: */ ! 321: static void ! 322: queueack(Conversation *cp, int mid) ! 323: { ! 324: int next; ! 325: ulong now; ! 326: ! 327: now = NOW; ! 328: next = (cp->anext + 1)&Nmask; ! 329: if(next != cp->afirst){ ! 330: cp->ack[cp->anext] = mid; ! 331: cp->atime[cp->anext] = now + MSack; ! 332: cp->anext = next; ! 333: } ! 334: if(now > cp->atime[cp->afirst]) ! 335: wakeup(&cp->rq->r); ! 336: } ! 337: ! 338: /* ! 339: * make a packet header ! 340: */ ! 341: Block * ! 342: mkhdr(Conversation *cp, int rem) ! 343: { ! 344: Block *bp; ! 345: Hdr *hp; ! 346: ! 347: bp = allocb(cp->ifc->hsize + HDRSIZE + cp->ifc->mintu); ! 348: memcpy(bp->wptr, cp->media->rptr, cp->ifc->hsize + HDRSIZE); ! 349: bp->wptr += cp->ifc->hsize + HDRSIZE; ! 350: hp = (Hdr *)(bp->rptr + cp->ifc->hsize); ! 351: hp->remain[1] = rem>>8; ! 352: hp->remain[0] = rem; ! 353: hp->sum[0] = hp->sum[1] = 0; ! 354: return bp; ! 355: } ! 356: ! 357: /* ! 358: * transmit a message. this involves breaking a possibly multi-block message into ! 359: * a train of packets on the media. ! 360: * ! 361: * called by nonetoput() and nonetkproc(). the qlock(mp) synchronizes these two ! 362: * processes. ! 363: */ ! 364: static void ! 365: sendmsg(Conversation *cp, Msg *mp) ! 366: { ! 367: Interface *ifc; ! 368: Queue *wq; ! 369: int msgrem; ! 370: int pktrem; ! 371: int n; ! 372: Block *bp, *pkt, *last; ! 373: uchar *rptr; ! 374: ! 375: ifc = cp->ifc; ! 376: if(ifc == 0) ! 377: return; ! 378: wq = ifc->wq->next; ! 379: ! 380: /* ! 381: * one transmitter at a time ! 382: */ ! 383: qlock(&cp->xlock); ! 384: ! 385: /* ! 386: * synchronize with rcvack, don't want to send while the ! 387: * message is being freed. ! 388: */ ! 389: qlock(mp); ! 390: ! 391: if(waserror() || mp->acked){ ! 392: qunlock(&cp->xlock); ! 393: qunlock(mp); ! 394: return; ! 395: } ! 396: ! 397: /* ! 398: * get the next acknowledge to use if the next queue up ! 399: * is not full. ! 400: */ ! 401: if(cp->afirst != cp->anext && cp->rq->next->len < 16*1024){ ! 402: cp->hdr->ack = cp->ack[cp->afirst]; ! 403: cp->afirst = (cp->afirst+1)&Nmask; ! 404: } ! 405: cp->hdr->mid = mp->mid; ! 406: ! 407: if(ifc->mintu > mp->len) { ! 408: /* ! 409: * short message: ! 410: * copy the whole message into the header block ! 411: */ ! 412: last = pkt = mkhdr(cp, mp->len); ! 413: for(bp = mp->first; bp; bp = bp->next){ ! 414: memcpy(pkt->wptr, bp->rptr, n = BLEN(bp)); ! 415: pkt->wptr += n; ! 416: } ! 417: memset(pkt->wptr, 0, n = ifc->mintu - mp->len); ! 418: pkt->wptr += n; ! 419: } else { ! 420: /* ! 421: * long message: ! 422: * break up the message into interface packets and send them. ! 423: * once around this loop for each non-header block generated. ! 424: */ ! 425: msgrem = mp->len; ! 426: pktrem = msgrem > ifc->maxtu ? ifc->maxtu : msgrem; ! 427: bp = mp->first; ! 428: if(bp) ! 429: rptr = bp->rptr; ! 430: last = pkt = mkhdr(cp, msgrem); ! 431: while(bp){ ! 432: /* ! 433: * if pkt full, send and create new header block ! 434: */ ! 435: if(pktrem == 0){ ! 436: cksum(pkt, ifc->hsize); ! 437: last->flags |= S_DELIM; ! 438: (*wq->put)(wq, pkt); ! 439: last = pkt = mkhdr(cp, -msgrem); ! 440: pktrem = msgrem > ifc->maxtu ? ifc->maxtu : msgrem; ! 441: } ! 442: n = bp->wptr - rptr; ! 443: if(n > pktrem) ! 444: n = pktrem; ! 445: last = last->next = allocb(0); ! 446: last->rptr = rptr; ! 447: last->wptr = rptr = rptr + n; ! 448: msgrem -= n; ! 449: pktrem -= n; ! 450: if(rptr >= bp->wptr){ ! 451: bp = bp->next; ! 452: if(bp) ! 453: rptr = bp->rptr; ! 454: } ! 455: } ! 456: } ! 457: cksum(pkt, ifc->hsize); ! 458: last->flags |= S_DELIM; ! 459: (*wq->put)(wq, pkt); ! 460: mp->time = NOW + MSrexmit; ! 461: qunlock(mp); ! 462: qunlock(&cp->xlock); ! 463: poperror(); ! 464: } ! 465: ! 466: /* ! 467: * send a control message (hangup or acknowledgement). ! 468: */ ! 469: sendctlmsg(Conversation *cp, int flag, int new) ! 470: { ! 471: cp->ctl.len = 0; ! 472: cp->ctl.first = 0; ! 473: cp->ctl.acked = 0; ! 474: if(new) ! 475: cp->ctl.mid = Nmsg^cp->out[cp->next].mid; ! 476: else ! 477: cp->ctl.mid = cp->lastacked; ! 478: cp->hdr->flag |= flag; ! 479: sendmsg(cp, &cp->ctl); ! 480: } ! 481: ! 482: /* ! 483: * receive a message (called by the multiplexor; noetheriput, nofddiiput, ...) ! 484: */ ! 485: static void ! 486: rcvmsg(Conversation *cp, Block *bp) ! 487: { ! 488: Block *nbp; ! 489: Hdr *h; ! 490: short r; ! 491: int c; ! 492: Msg *mp; ! 493: int f; ! 494: Queue *q; ! 495: ! 496: q = cp->rq; ! 497: ! 498: /* ! 499: * grab the packet header, push the pointer past the nonet header ! 500: */ ! 501: h = (Hdr *)bp->rptr; ! 502: bp->rptr += HDRSIZE; ! 503: mp = &cp->in[h->mid & Nmask]; ! 504: r = (h->remain[1]<<8) | h->remain[0]; ! 505: f = h->flag; ! 506: ! 507: /* ! 508: * if a new call request comes in on a connected channel, hang up the call ! 509: */ ! 510: if(h->mid==0 && (f & NEWCALL) && cp->state==Cconnected){ ! 511: freeb(bp); ! 512: hangup(cp); ! 513: return; ! 514: } ! 515: ! 516: /* ! 517: * ignore old messages and process the acknowledgement ! 518: */ ! 519: if(h->mid != mp->mid){ ! 520: if(r == 0){ ! 521: rcvack(cp, h->ack); ! 522: if(f & HANGUP) ! 523: hangup(cp); ! 524: } else { ! 525: if(r>0) ! 526: queueack(cp, h->mid); ! 527: cp->bad++; ! 528: } ! 529: freeb(bp); ! 530: return; ! 531: } ! 532: ! 533: if(r>=0){ ! 534: /* ! 535: * start of message packet ! 536: */ ! 537: if(mp->first){ ! 538: cp->bad++; ! 539: freeb(bp); ! 540: return; ! 541: } ! 542: mp->rem = r; ! 543: } else { ! 544: /* ! 545: * a continuation ! 546: */ ! 547: if(-r != mp->rem) { ! 548: cp->bad++; ! 549: freeb(bp); ! 550: return; ! 551: } ! 552: } ! 553: ! 554: /* ! 555: * take care of packets that were padded up ! 556: */ ! 557: mp->rem -= BLEN(bp); ! 558: if(mp->rem < 0){ ! 559: if(-mp->rem <= BLEN(bp)){ ! 560: bp->wptr += mp->rem; ! 561: mp->rem = 0; ! 562: } else ! 563: panic("rcvmsg: short packet"); ! 564: } ! 565: putb(mp, bp); ! 566: ! 567: /* ! 568: * if the last chunk - pass it up the stream and wake any ! 569: * waiting process. ! 570: * ! 571: * if not, strip off the delimiter. ! 572: */ ! 573: if(mp->rem == 0){ ! 574: rcvack(cp, h->ack); ! 575: if(f & ACKME) ! 576: queueack(cp, h->mid); ! 577: mp->last->flags |= S_DELIM; ! 578: PUTNEXT(q, mp->first); ! 579: mp->first = mp->last = 0; ! 580: mp->len = 0; ! 581: cp->rcvd++; ! 582: ! 583: /* ! 584: * cycle bufffer to next expected mid ! 585: */ ! 586: mp->mid ^= Nmsg; ! 587: ! 588: /* ! 589: * stop xmitting the NEWCALL flag ! 590: */ ! 591: if(cp->state==Cconnecting && !(f & NEWCALL)) ! 592: cp->state = Cconnected; ! 593: } else ! 594: mp->last->flags &= ~S_DELIM; ! 595: ! 596: } ! 597: ! 598: /* ! 599: * the device stream module definition ! 600: */ ! 601: static void nonetstopen(Queue *, Stream *); ! 602: static void nonetstclose(Queue *); ! 603: static void nonetoput(Queue *, Block *); ! 604: static void nonetiput(Queue *, Block *); ! 605: Qinfo nonetinfo = { nonetiput, nonetoput, nonetstopen, nonetstclose, "nonet" } ; ! 606: ! 607: /* ! 608: * store the device end of the stream so that the multiplexor can ! 609: * send blocks upstream. this is called by streamopen() when a ! 610: * nonet device steam is created. ! 611: */ ! 612: static void ! 613: nonetstopen(Queue *q, Stream *s) ! 614: { ! 615: Interface *ifc; ! 616: Conversation *cp; ! 617: ! 618: ifc = &interface[s->dev]; ! 619: cp = &ifc->conv[s->id]; ! 620: cp->ifc = ifc; ! 621: cp->rq = RD(q); ! 622: cp->state = Copen; ! 623: RD(q)->ptr = WR(q)->ptr = (void *)cp; ! 624: } ! 625: ! 626: /* ! 627: * wait until all output has drained or a hangup is received. ! 628: * then send a hangup message (until one is received). ! 629: */ ! 630: static int ! 631: isflushed(void *a) ! 632: { ! 633: Conversation *cp; ! 634: ! 635: cp = (Conversation *)a; ! 636: return cp->first == cp->next || cp->state == Chungup; ! 637: } ! 638: static int ! 639: ishungup(void *a) ! 640: { ! 641: Conversation *cp; ! 642: ! 643: cp = (Conversation *)a; ! 644: return cp->state == Chungup; ! 645: } ! 646: static int ! 647: isdead(void *a) ! 648: { ! 649: Conversation *cp; ! 650: ! 651: cp = (Conversation *)a; ! 652: return cp->kstarted == 0; ! 653: } ! 654: static void ! 655: nonetstclose(Queue *q) ! 656: { ! 657: Conversation *cp; ! 658: Msg *mp; ! 659: int i; ! 660: ! 661: cp = (Conversation *)q->ptr; ! 662: ! 663: /* ! 664: * wait for all messages to drain ! 665: */ ! 666: while(!isflushed(cp)) ! 667: sleep(&cp->r, isflushed, cp); ! 668: ! 669: /* ! 670: * ack all outstanding messages ! 671: */ ! 672: for(; cp->first != cp->next; cp->first = (cp->first+1)&Nmask) { ! 673: mp = &cp->out[cp->first]; ! 674: if(!mp->acked) ! 675: rcvack(cp, mp->mid); ! 676: } ! 677: cp->first = cp->next; ! 678: ! 679: /* ! 680: * send hangup messages to the other side ! 681: * until it hangs up. ! 682: */ ! 683: if(cp->state >= Cconnected){ ! 684: sendctlmsg(cp, HANGUP, 1); ! 685: for(i=0; i<10 && !ishungup(cp); i++){ ! 686: sendctlmsg(cp, HANGUP, 1); ! 687: tsleep(&cp->r, ishungup, cp, MSrexmit); ! 688: } ! 689: } ! 690: ! 691: /* ! 692: * kill off the nonetkproc ! 693: */ ! 694: cp->state = Cclosed; ! 695: wakeup(&cp->rq->r); ! 696: sleep(&cp->r, isdead, cp); ! 697: ! 698: /* ! 699: * close down, synchronizing with interface ! 700: */ ! 701: qlock(cp); ! 702: cp->ifc = 0; ! 703: qunlock(cp); ! 704: } ! 705: ! 706: /* ! 707: * send all messages up stream. this should only be control messages ! 708: */ ! 709: static void ! 710: nonetiput(Queue *q, Block *bp) ! 711: { ! 712: Conversation *cp; ! 713: ! 714: if(bp->type == M_HANGUP){ ! 715: cp = (Conversation *)q->ptr; ! 716: cp->state = Chungup; ! 717: } ! 718: PUTNEXT(q, bp); ! 719: } ! 720: ! 721: /* ! 722: * queue a block ! 723: */ ! 724: static int ! 725: windowopen(void *a) ! 726: { ! 727: Conversation *cp; ! 728: ! 729: cp = (Conversation *)a; ! 730: return (cp->next + 1)&Nmask != cp->first; ! 731: } ! 732: static void ! 733: nonetoput(Queue *q, Block *bp) ! 734: { ! 735: Conversation *cp; ! 736: int next; ! 737: Msg *mp; ! 738: ! 739: cp = (Conversation *)(q->ptr); ! 740: ! 741: /* ! 742: * do all control functions ! 743: */ ! 744: if(bp->type != M_DATA){ ! 745: if(streamparse("connect", bp)) ! 746: connect(cp, bp); ! 747: else if(streamparse("listen", bp)) ! 748: listen(cp, bp); ! 749: else ! 750: freeb(bp); ! 751: return; ! 752: } ! 753: ! 754: /* ! 755: * collect till we see a delim ! 756: */ ! 757: if(!putb(q, bp)) ! 758: return; ! 759: ! 760: /* ! 761: * block if we don't have any free mid's ! 762: */ ! 763: while((next = (cp->next + 1)&Nmask) == cp->first) ! 764: sleep(&cp->r, windowopen, cp); ! 765: ! 766: /* ! 767: * stick the message in a Msg structure ! 768: */ ! 769: mp = &cp->out[cp->next]; ! 770: mp->time = NOW + MSrexmit; ! 771: mp->first = q->first; ! 772: mp->last = q->last; ! 773: mp->len = q->len; ! 774: mp->mid ^= Nmsg; ! 775: mp->acked = 0; ! 776: ! 777: /* ! 778: * init the queue for new messages ! 779: */ ! 780: q->len = 0; ! 781: q->first = q->last = 0; ! 782: ! 783: /* ! 784: * send it ! 785: */ ! 786: cp->next = next; ! 787: sendmsg(cp, mp); ! 788: cp->sent++; ! 789: } ! 790: ! 791: /* ! 792: * wake up every 250 ms to send and ack or resend a message ! 793: */ ! 794: static int ! 795: always(void *a) ! 796: { ! 797: return 0; ! 798: } ! 799: void ! 800: nonetkproc(void *arg) ! 801: { ! 802: Conversation *cp; ! 803: Interface *ifc; ! 804: Queue *wq; ! 805: Msg *mp; ! 806: int i; ! 807: ! 808: cp = (Conversation *)arg; ! 809: ! 810: for(;;){ ! 811: /* ! 812: * die on request ! 813: */ ! 814: if(cp->state == Cclosed){ ! 815: cp->kstarted = 0; ! 816: wakeup(&cp->r); ! 817: return; ! 818: } ! 819: ! 820: /* ! 821: * retransmit first message ! 822: */ ! 823: if(cp->first != cp->next){ ! 824: mp = &cp->out[cp->first]; ! 825: if(!mp->acked && NOW >= mp->time){ ! 826: if(cp->retry++ > 400) ! 827: hangup(cp); ! 828: else { ! 829: cp->rexmit++; ! 830: sendmsg(cp, mp); ! 831: } ! 832: } ! 833: } ! 834: ! 835: /* ! 836: * send any ack whose time is come ! 837: */ ! 838: while(cp->afirst != cp->anext && NOW >= cp->atime[cp->anext] ! 839: && cp->rq->next->len < Streamhi) ! 840: sendctlmsg(cp, 0, 0); ! 841: tsleep(&cp->rq->r, always, 0, MSrexmit/2); ! 842: } ! 843: } ! 844: ! 845: /* ! 846: * nonet directory and subdirectory ! 847: */ ! 848: enum { ! 849: Nraddrqid, ! 850: Nstatsqid, ! 851: Nchanqid, ! 852: Ncloneqid, ! 853: }; ! 854: Dirtab nonetdir[Ndir]; ! 855: Dirtab nosubdir[]={ ! 856: "raddr", Nraddrqid, 0, 0600, ! 857: "stats", Nstatsqid, 0, 0600, ! 858: }; ! 859: ! 860: /* ! 861: * nonet file system. most of the calls use dev.c to access the nonet ! 862: * directory and stream.c to access the nonet devices. ! 863: */ ! 864: void ! 865: nonetreset(void) ! 866: { ! 867: newqinfo(&noetherinfo); ! 868: } ! 869: ! 870: /* ! 871: * create the nonet directory. the files are `clone' and stream ! 872: * directories '1' to '32' (or whatever Nconv is in decimal) ! 873: */ ! 874: void ! 875: nonetinit(void) ! 876: { ! 877: int i; ! 878: ! 879: /* ! 880: * create the directory. ! 881: */ ! 882: /* ! 883: * the circuits ! 884: */ ! 885: for(i = 0; i < Nconv; i++) { ! 886: sprint(nonetdir[i].name, "%d", i); ! 887: nonetdir[i].qid = CHDIR|STREAMQID(i, Nchanqid); ! 888: nonetdir[i].length = 0; ! 889: nonetdir[i].perm = 0600; ! 890: } ! 891: ! 892: /* ! 893: * the clone device ! 894: */ ! 895: strcpy(nonetdir[i].name, "clone"); ! 896: nonetdir[i].qid = Ncloneqid; ! 897: nonetdir[i].length = 0; ! 898: nonetdir[i].perm = 0600; ! 899: ! 900: } ! 901: ! 902: Chan* ! 903: nonetattach(char *spec) ! 904: { ! 905: Interface *ifc; ! 906: Chan *c; ! 907: ! 908: /* ! 909: * find an interface with the same name ! 910: */ ! 911: for(ifc = interface; ifc < &interface[Nifc]; ifc++){ ! 912: qlock(ifc); ! 913: if(strcmp(spec, ifc->name)==0 && ifc->wq) { ! 914: ifc->ref++; ! 915: qunlock(ifc); ! 916: break; ! 917: } ! 918: qunlock(ifc); ! 919: } ! 920: if(ifc == &interface[Nifc]) ! 921: error(0, Enoifc); ! 922: c = devattach('n', spec); ! 923: c->dev = 0; ! 924: return c; ! 925: } ! 926: ! 927: Chan* ! 928: nonetclone(Chan *c, Chan *nc) ! 929: { ! 930: Interface *ifc; ! 931: ! 932: c = devclone(c, nc); ! 933: ifc = &interface[c->dev]; ! 934: qlock(ifc); ! 935: ifc->ref++; ! 936: qunlock(ifc); ! 937: return c; ! 938: } ! 939: ! 940: int ! 941: nonetwalk(Chan *c, char *name) ! 942: { ! 943: if(c->qid == CHDIR) ! 944: return devwalk(c, name, nonetdir, Ndir, devgen); ! 945: else ! 946: return devwalk(c, name, nosubdir, Nsubdir, streamgen); ! 947: } ! 948: ! 949: void ! 950: nonetstat(Chan *c, char *dp) ! 951: { ! 952: if(c->qid == CHDIR) ! 953: devstat(c, dp, nonetdir, Ndir, devgen); ! 954: else ! 955: devstat(c, dp, nosubdir, Nsubdir, streamgen); ! 956: } ! 957: ! 958: /* ! 959: * opening a nonet device allocates a Conversation. Opening the `clone' ! 960: * device is a ``macro'' for finding a free Conversation and opening ! 961: * it's ctl file. ! 962: */ ! 963: Chan* ! 964: nonetopen(Chan *c, int omode) ! 965: { ! 966: extern Qinfo nonetinfo; ! 967: Stream *s; ! 968: Conversation *cp; ! 969: Interface *ifc; ! 970: ! 971: if(c->qid == Ncloneqid){ ! 972: ifc = &interface[c->dev]; ! 973: for(cp = &ifc->conv[0]; cp < &ifc->conv[Nconv]; cp++){ ! 974: if(cp->state == Cclosed && canqlock(cp)){ ! 975: if(cp->state != Cclosed){ ! 976: qunlock(cp); ! 977: continue; ! 978: } ! 979: c->qid = CHDIR|STREAMQID(cp-ifc->conv, Nchanqid); ! 980: devwalk(c, "ctl", 0, 0, streamgen); ! 981: streamopen(c, &nonetinfo); ! 982: qunlock(cp); ! 983: break; ! 984: } ! 985: } ! 986: if(cp == &ifc->conv[Nconv]) ! 987: error(0, Enodev); ! 988: } else if(c->qid != CHDIR) ! 989: streamopen(c, &nonetinfo); ! 990: ! 991: c->mode = openmode(omode); ! 992: c->flag |= COPEN; ! 993: c->offset = 0; ! 994: return c; ! 995: } ! 996: ! 997: void ! 998: nonetcreate(Chan *c, char *name, int omode, ulong perm) ! 999: { ! 1000: error(0, Eperm); ! 1001: } ! 1002: ! 1003: void ! 1004: nonetclose(Chan *c) ! 1005: { ! 1006: Interface *ifc; ! 1007: ! 1008: /* real closing happens in lancestclose */ ! 1009: if(c->qid != CHDIR) ! 1010: streamclose(c); ! 1011: ! 1012: ifc = &interface[c->dev]; ! 1013: qlock(ifc); ! 1014: ifc->ref--; ! 1015: qunlock(ifc); ! 1016: } ! 1017: ! 1018: long ! 1019: nonetread(Chan *c, void *a, long n) ! 1020: { ! 1021: int t; ! 1022: Conversation *cp; ! 1023: char stats[256]; ! 1024: ! 1025: t = STREAMTYPE(c->qid); ! 1026: if(t >= Slowqid) ! 1027: return streamread(c, a, n); ! 1028: ! 1029: if(c->qid == CHDIR) ! 1030: return devdirread(c, a, n, nonetdir, Ndir, devgen); ! 1031: if(c->qid & CHDIR) ! 1032: return devdirread(c, a, n, nosubdir, Nsubdir, streamgen); ! 1033: ! 1034: cp = &interface[c->dev].conv[STREAMID(c->qid)]; ! 1035: switch(t){ ! 1036: case Nraddrqid: ! 1037: return stringread(c, a, n, cp->raddr); ! 1038: case Nstatsqid: ! 1039: sprint(stats, "sent: %d\nrcved: %d\nrexmit: %d\nbad: %d\n", ! 1040: cp->sent, cp->rcvd, cp->rexmit, cp->bad); ! 1041: return stringread(c, a, n, stats); ! 1042: } ! 1043: error(0, Eperm); ! 1044: } ! 1045: ! 1046: long ! 1047: nonetwrite(Chan *c, void *a, long n) ! 1048: { ! 1049: int t; ! 1050: ! 1051: t = STREAMTYPE(c->qid); ! 1052: if(t >= Slowqid) ! 1053: return streamwrite(c, a, n, 0); ! 1054: ! 1055: error(0, Eperm); ! 1056: } ! 1057: ! 1058: void ! 1059: nonetremove(Chan *c) ! 1060: { ! 1061: error(0, Eperm); ! 1062: } ! 1063: ! 1064: void ! 1065: nonetwstat(Chan *c, char *dp) ! 1066: { ! 1067: error(0, Eperm); ! 1068: } ! 1069: ! 1070: void ! 1071: noneterrstr(Error *e, char *buf) ! 1072: { ! 1073: rooterrstr(e, buf); ! 1074: } ! 1075: ! 1076: void ! 1077: nonetuserstr(Error *e, char *buf) ! 1078: { ! 1079: extern consuserstr(Error *, char *); ! 1080: ! 1081: consuserstr(e, buf); ! 1082: } ! 1083: ! 1084: /* ! 1085: * interface ! 1086: */ ! 1087: /* ! 1088: * Create an interface. The qlock on ifc prevents a circuit ! 1089: * from connecting to (nonetconnect) or outputting on (nonetoput) ! 1090: * the interface while it is being created. ! 1091: */ ! 1092: Interface * ! 1093: newifc(Queue *q, Stream *s, int maxtu, int mintu, int hsize, ! 1094: void (*connect)(Conversation *, char *)) ! 1095: { ! 1096: Interface *ifc; ! 1097: ! 1098: for(ifc = interface; ifc < &interface[Nifc]; ifc++){ ! 1099: if(ifc->wq == 0){ ! 1100: qlock(ifc); ! 1101: if(ifc->wq) { ! 1102: /* someone was faster than us */ ! 1103: qunlock(ifc); ! 1104: continue; ! 1105: } ! 1106: RD(q)->ptr = WR(q)->ptr = (void *)ifc; ! 1107: ifc->maxtu = maxtu - hsize - HDRSIZE; ! 1108: ifc->mintu = mintu - hsize - HDRSIZE; ! 1109: ifc->hsize = hsize; ! 1110: ifc->connect = connect; ! 1111: ifc->wq = WR(q); ! 1112: ifc->name[0] = 0; ! 1113: qunlock(ifc); ! 1114: return ifc; ! 1115: } ! 1116: } ! 1117: error(0, Enoifc); ! 1118: } ! 1119: ! 1120: /* ! 1121: * Free an interface. ! 1122: */ ! 1123: void ! 1124: freeifc(Interface *ifc) ! 1125: { ! 1126: qlock(ifc); ! 1127: if(ifc->ref){ ! 1128: qunlock(ifc); ! 1129: print("freeifc in use\n"); ! 1130: error(0, Einuse); ! 1131: } ! 1132: ifc->wq = 0; ! 1133: qunlock(ifc); ! 1134: } ! 1135: /* ! 1136: * ethernet specific multiplexor ! 1137: */ ! 1138: /* ! 1139: * ethernet header of a packet ! 1140: */ ! 1141: struct Etherhdr { ! 1142: uchar d[6]; ! 1143: uchar s[6]; ! 1144: uchar type[2]; ! 1145: uchar circuit[3]; /* circuit number */ ! 1146: uchar flag; ! 1147: uchar mid; /* message id */ ! 1148: uchar ack; /* piggy back ack */ ! 1149: uchar remain[2]; /* count of remaing bytes of data */ ! 1150: uchar sum[2]; /* checksum (0 means none) */ ! 1151: }; ! 1152: #define EHDRSIZE 24 ! 1153: #define EMAXBODY (1514-HDRSIZE) /* maximum ethernet packet body */ ! 1154: #define ETHER_TYPE 0x900 /* most significant byte last */ ! 1155: ! 1156: /* ! 1157: * the ethernet multiplexor stream module definition ! 1158: */ ! 1159: static void noetheropen(Queue *, Stream *); ! 1160: static void noetherclose(Queue *); ! 1161: static void noetheroput(Queue *, Block *); ! 1162: static void noetheriput(Queue *, Block *); ! 1163: Qinfo noetherinfo = { noetheriput, nullput, noetheropen, noetherclose, "noether" }; ! 1164: ! 1165: /* ! 1166: * parse an ethernet address (assumed to be 12 ascii hex digits) ! 1167: */ ! 1168: void ! 1169: etherparse(uchar *to, char *from) ! 1170: { ! 1171: int tdig; ! 1172: int fdig; ! 1173: int i; ! 1174: ! 1175: if(strlen(from) != 12) ! 1176: error(0, Ebadnet); ! 1177: ! 1178: for(i = 0; i < 6; i++){ ! 1179: fdig = *from++; ! 1180: tdig = fdig > 'a' ? (fdig - 'a' + 10) ! 1181: : (fdig > 'A' ? (fdig - 'A' + 10) : (fdig - '0')); ! 1182: fdig = *from++; ! 1183: tdig <<= 4; ! 1184: tdig |= fdig > 'a' ? (fdig - 'a' + 10) ! 1185: : (fdig > 'A' ? (fdig - 'A' + 10) : (fdig - '0')); ! 1186: *to++ = tdig; ! 1187: } ! 1188: } ! 1189: ! 1190: /* ! 1191: * perfrorm the ether specific part of nonetconnect. just stick ! 1192: * the address into the prototype header. ! 1193: */ ! 1194: void ! 1195: noetherconnect(Conversation *cp, char *ea) ! 1196: { ! 1197: Etherhdr *eh; ! 1198: ! 1199: /* ! 1200: * special hack for ross ! 1201: */ ! 1202: if(strcmp(ea, "020701005eff")==0) ! 1203: cp->hdr->flag &= ~ACKME; ! 1204: ! 1205: eh = (Etherhdr *)cp->media->rptr; ! 1206: etherparse(eh->d, ea); ! 1207: eh->type[0] = ETHER_TYPE>>8; ! 1208: eh->type[1] = ETHER_TYPE; ! 1209: } ! 1210: ! 1211: /* ! 1212: * set up an ether interface ! 1213: */ ! 1214: static void ! 1215: noetheropen(Queue *q, Stream *s) ! 1216: { ! 1217: newifc(q, s, 1514, 60, 14, noetherconnect); ! 1218: } ! 1219: ! 1220: /* ! 1221: * tear down an ether interface ! 1222: */ ! 1223: static void ! 1224: noetherclose(Queue *q) ! 1225: { ! 1226: Interface *ifc; ! 1227: ! 1228: ifc = (Interface *)(q->ptr); ! 1229: freeifc(ifc); ! 1230: } ! 1231: ! 1232: /* ! 1233: * Input a packet and use the ether address to select the correct ! 1234: * nonet device to pass it to. ! 1235: * ! 1236: * Simplifying assumption: one put == one packet && the complete header ! 1237: * is in the first block. If this isn't true, demultiplexing will not work. ! 1238: */ ! 1239: static void ! 1240: noetheriput(Queue *q, Block *bp) ! 1241: { ! 1242: Interface *ifc; ! 1243: int circuit; ! 1244: Conversation *cp; ! 1245: Etherhdr *h; ! 1246: Etherhdr *ph; ! 1247: ulong s; ! 1248: Block *end; ! 1249: ! 1250: if(bp->type != M_DATA){ ! 1251: PUTNEXT(q, bp); ! 1252: return; ! 1253: } ! 1254: ! 1255: ifc = (Interface *)(q->ptr); ! 1256: h = (Etherhdr *)(bp->rptr); ! 1257: circuit = (h->circuit[2]<<16) | (h->circuit[1]<<8) | h->circuit[0]; ! 1258: s = (h->sum[1]<<8) | h->sum[0]; ! 1259: if(s && s != cksum(bp, 14)){ ! 1260: print("checksum error %ux %ux\n", s, (h->sum[1]<<8) | h->sum[0]); ! 1261: freeb(bp); ! 1262: return; ! 1263: } ! 1264: ! 1265: /* ! 1266: * look for an existing circuit. ! 1267: */ ! 1268: for(cp = &ifc->conv[0]; cp < &ifc->conv[Nconv]; cp++){ ! 1269: if(cp->state > Clistening && circuit == cp->rcvcircuit && canqlock(cp)){ ! 1270: ph = (Etherhdr *)(cp->media->rptr); ! 1271: if(ifc == cp->ifc ! 1272: && circuit == cp->rcvcircuit ! 1273: && cp->state > Clistening ! 1274: && memcmp(ph->d, h->s, sizeof(h->s)) == 0){ ! 1275: cp->hdr->flag &= ~NEWCALL; ! 1276: bp->rptr += ifc->hsize; ! 1277: rcvmsg(cp, bp); ! 1278: qunlock(cp); ! 1279: return; ! 1280: } ! 1281: qunlock(cp); ! 1282: } ! 1283: } ! 1284: ! 1285: /* ! 1286: * ... or one just listening ! 1287: */ ! 1288: if((h->flag & NEWCALL) == 0) { ! 1289: freeb(bp); ! 1290: return; ! 1291: } ! 1292: for(cp = &ifc->conv[0]; cp < &ifc->conv[Nconv]; cp++){ ! 1293: if(cp->state == Clistening && canqlock(cp)) { ! 1294: /* ! 1295: * initialize the conversation ! 1296: */ ! 1297: startconv(cp, circuit^1, Cconnecting); ! 1298: wakeup(&cp->r); ! 1299: sprint(cp->raddr, "%.2ux%.2ux%.2ux%.2ux%.2ux%.2ux", h->s[0], ! 1300: h->s[1], h->s[2], h->s[3], h->s[4], h->s[5]); ! 1301: ! 1302: /* ! 1303: * fill in media dependent prototype header ! 1304: */ ! 1305: ph = (Etherhdr *)(cp->media->rptr); ! 1306: memcpy(ph->d, h->s, sizeof(h->s)); ! 1307: ph->type[0] = ETHER_TYPE>>8; ! 1308: ph->type[1] = ETHER_TYPE; ! 1309: ! 1310: /* ! 1311: * pass on the packet ! 1312: */ ! 1313: bp->rptr += ifc->hsize; ! 1314: rcvmsg(cp, bp); ! 1315: qunlock(cp); ! 1316: return; ! 1317: } ! 1318: } ! 1319: ! 1320: /* ! 1321: * not found ! 1322: */ ! 1323: freeb(bp); ! 1324: } ! 1325: ! 1326: /* ! 1327: * calculate the checksum of a list of blocks. ignore the first `offset' bytes. ! 1328: */ ! 1329: int ! 1330: cksum(Block *bp, int offset) ! 1331: { ! 1332: Block *nbp = bp; ! 1333: uchar *ep, *p; ! 1334: int n; ! 1335: ulong s; ! 1336: Hdr *hp; ! 1337: ! 1338: s = 0; ! 1339: p = bp->rptr + offset; ! 1340: n = bp->wptr - p; ! 1341: hp = (Hdr *)p; ! 1342: hp->sum[0] = hp->sum[1] = 0; ! 1343: for(;;){ ! 1344: ep = p+(n&~0x7); ! 1345: while(p < ep) { ! 1346: s = s + s + p[0]; ! 1347: s = s + s + p[1]; ! 1348: s = s + s + p[2]; ! 1349: s = s + s + p[3]; ! 1350: s = s + s + p[4]; ! 1351: s = s + s + p[5]; ! 1352: s = s + s + p[6]; ! 1353: s = s + s + p[7]; ! 1354: s = (s&0xffff) + (s>>16); ! 1355: p += 8; ! 1356: } ! 1357: ep = p+(n&0x7); ! 1358: while(p < ep) { ! 1359: s = s + s + *p; ! 1360: p++; ! 1361: } ! 1362: s = (s&0xffff) + (s>>16); ! 1363: bp = bp->next; ! 1364: if(bp == 0) ! 1365: break; ! 1366: p = bp->rptr; ! 1367: n = BLEN(bp); ! 1368: } ! 1369: s = (s&0xffff) + (s>>16); ! 1370: hp->sum[1] = s>>8; ! 1371: hp->sum[0] = s; ! 1372: return s & 0xffff; ! 1373: }
This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.