Annotation of lucent/sys/src/9/port/sturp.c, revision 1.1.1.1

1.1       root        1: #include       "u.h"
                      2: #include       "../port/lib.h"
                      3: #include       "mem.h"
                      4: #include       "dat.h"
                      5: #include       "fns.h"
                      6: #include       "io.h"
                      7: #include       "../port/error.h"
                      8: 
                      9: enum {
                     10:        MSrexmit=       1000,
                     11:        Nmask=          0x7,
                     12: };
                     13: 
                     14: #define DPRINT if(q->flag&QDEBUG)kprint
                     15: 
                     16: typedef struct Urp     Urp;
                     17: 
                     18: #define NOW (MACHP(0)->ticks*MS2HZ)
                     19: 
                     20: /*
                     21:  * URP status
                     22:  */
                     23: struct urpstat {
                     24:        ulong   input;          /* bytes read from urp */
                     25:        ulong   output;         /* bytes output to urp */
                     26:        ulong   rexmit;         /* retransmit rejected urp msg */
                     27:        ulong   rjtrs;          /* reject, trailer size */
                     28:        ulong   rjpks;          /* reject, packet size */
                     29:        ulong   rjseq;          /* reject, sequence number */
                     30:        ulong   levelb;         /* unknown level b */
                     31:        ulong   enqsx;          /* enqs sent */
                     32:        ulong   enqsr;          /* enqs rcved */
                     33: } urpstat;
                     34: 
                     35: struct Urp {
                     36:        QLock;
                     37:        Urp     *list;          /* list of all urp structures */
                     38:        short   state;          /* flags */
                     39:        Rendez  r;              /* process waiting for output to finish */
                     40: 
                     41:        /* input */
                     42:        QLock   ack;            /* ack lock */
                     43:        Queue   *rq;            /* input queue */
                     44:        uchar   iseq;           /* last good input sequence number */
                     45:        uchar   lastecho;       /* last echo/rej sent */
                     46:        uchar   trbuf[3];       /* trailer being collected */
                     47:        short   trx;            /* # bytes in trailer being collected */
                     48:        int     blocks;
                     49: 
                     50:        /* output */
                     51:        QLock   xmit;           /* output lock, only one process at a time */
                     52:        Queue   *wq;            /* output queue */
                     53:        int     maxout;         /* maximum outstanding unacked blocks */
                     54:        int     maxblock;       /* max block size */
                     55:        int     next;           /* next block to send */
                     56:        int     unechoed;       /* first unechoed block */
                     57:        int     unacked;        /* first unacked block */
                     58:        int     nxb;            /* next xb to use */
                     59:        Block   *xb[8];         /* the xmit window buffer */
                     60:        QLock   xl[8];
                     61:        ulong   timer;          /* timeout for xmit */
                     62:        int     rexmit;
                     63: };
                     64: 
                     65: /* list of allocated urp structures (never freed) */
                     66: struct
                     67: {
                     68:        Lock;
                     69:        Urp     *urp;
                     70: } urpalloc;
                     71: 
                     72: Rendez urpkr;
                     73: QLock  urpkl;
                     74: int    urpkstarted;
                     75: 
                     76: #define WINDOW(u) ((u)->unechoed>(u)->next ? (u)->unechoed+(u)->maxout-(u)->next-8 :\
                     77:                        (u)->unechoed+(u)->maxout-(u)->next)
                     78: #define IN(x, f, n) (f<=n ? (x>=f && x<n) : (x<n || x>=f))
                     79: #define NEXT(x) (((x)+1)&Nmask)
                     80: 
                     81: /*
                     82:  *  Protocol control bytes
                     83:  */
                     84: #define        SEQ     0010            /* sequence number, ends trailers */
                     85: #undef ECHO
                     86: #define        ECHO    0020            /* echos, data given to next queue */
                     87: #define        REJ     0030            /* rejections, transmission error */
                     88: #define        ACK     0040            /* acknowledgments */
                     89: #define        BOT     0050            /* beginning of trailer */
                     90: #define        BOTM    0051            /* beginning of trailer, more data follows */
                     91: #define        BOTS    0052            /* seq update algorithm on this trailer */
                     92: #define        SOU     0053            /* start of unsequenced trailer */
                     93: #define        EOU     0054            /* end of unsequenced trailer */
                     94: #define        ENQ     0055            /* xmitter requests flow/error status */
                     95: #define        CHECK   0056            /* xmitter requests error status */
                     96: #define        INITREQ 0057            /* request initialization */
                     97: #define        INIT0   0060            /* disable trailer processing */
                     98: #define        INIT1   0061            /* enable trailer procesing */
                     99: #define        AINIT   0062            /* response to INIT0/INIT1 */
                    100: #undef DELAY
                    101: #define        DELAY   0100            /* real-time printing delay */
                    102: #define        BREAK   0110            /* Send/receive break (new style) */
                    103: 
                    104: #define        REJECTING       0x1
                    105: #define        INITING         0x2
                    106: #define HUNGUP         0x4
                    107: #define        OPEN            0x8
                    108: #define CLOSING                0x10
                    109: 
                    110: /*
                    111:  *  predeclared
                    112:  */
                    113: static void    urpreset(void);
                    114: static void    urpciput(Queue*, Block*);
                    115: static void    urpiput(Queue*, Block*);
                    116: static void    urpoput(Queue*, Block*);
                    117: static void    urpopen(Queue*, Stream*);
                    118: static void    urpclose(Queue *);
                    119: static void    output(Urp*);
                    120: static void    sendblock(Urp*, int);
                    121: static void    rcvack(Urp*, int);
                    122: static void    flushinput(Urp*);
                    123: static void    sendctl(Urp*, int);
                    124: static void    sendack(Urp*);
                    125: static void    sendrej(Urp*);
                    126: static void    initoutput(Urp*, int);
                    127: static void    initinput(Urp*);
                    128: static void    urpkproc(void *arg);
                    129: static void    urpvomit(char*, Urp*);
                    130: static void    tryoutput(Urp*);
                    131: 
                    132: Qinfo urpinfo =
                    133: {
                    134:        urpciput,
                    135:        urpoput,
                    136:        urpopen,
                    137:        urpclose,
                    138:        "urp",
                    139:        urpreset
                    140: };
                    141: 
                    142: void
                    143: sturplink(void)
                    144: {
                    145:        newqinfo(&urpinfo);
                    146: }
                    147: 
                    148: static void
                    149: urpreset(void)
                    150: {
                    151: }
                    152: 
                    153: static void
                    154: urpopen(Queue *q, Stream *s)
                    155: {
                    156:        Urp *up;
                    157: 
                    158:        USED(s);
                    159:        if(!urpkstarted){
                    160:                qlock(&urpkl);
                    161:                if(!urpkstarted){
                    162:                        urpkstarted = 1;
                    163:                        kproc("urpkproc", urpkproc, 0);
                    164:                }
                    165:                qunlock(&urpkl);
                    166:        }
                    167: 
                    168:        /*
                    169:         *  find an unused urp structure
                    170:         */
                    171:        for(up = urpalloc.urp; up; up = up->list){
                    172:                if(up->state == 0){
                    173:                        qlock(up);
                    174:                        if(up->state == 0)
                    175:                                break;
                    176:                        qunlock(up);
                    177:                }
                    178:        }
                    179:        if(up == 0){
                    180:                /*
                    181:                 *  none available, create a new one, they are never freed
                    182:                 */
                    183:                up = smalloc(sizeof(Urp));
                    184:                qlock(up);
                    185:                lock(&urpalloc);
                    186:                up->list = urpalloc.urp;
                    187:                urpalloc.urp = up;
                    188:                unlock(&urpalloc);
                    189:        }
                    190:        q->ptr = q->other->ptr = up;
                    191:        q->rp = &urpkr;
                    192:        up->rq = q;
                    193:        up->wq = q->other;
                    194:        up->state = OPEN;
                    195:        qunlock(up);
                    196:        initinput(up);
                    197:        initoutput(up, 0);
                    198: }
                    199: 
                    200: /*
                    201:  *  Shut down the connection and kill off the kernel process
                    202:  */
                    203: static int
                    204: isflushed(void *a)
                    205: {
                    206:        Urp *up;
                    207: 
                    208:        up = (Urp *)a;
                    209:        return (up->state&HUNGUP) || (up->unechoed==up->nxb && up->wq->len==0);
                    210: }
                    211: static void
                    212: urpclose(Queue *q)
                    213: {
                    214:        Urp *up;
                    215:        int i;
                    216: 
                    217:        up = (Urp *)q->ptr;
                    218:        if(up == 0)
                    219:                return;
                    220: 
                    221:        /*
                    222:         *  wait for all outstanding messages to drain, tell kernel
                    223:         *  process we're closing.
                    224:         *
                    225:         *  if 2 minutes elapse, give it up
                    226:         */
                    227:        up->state |= CLOSING;
                    228:        if(!waserror()){
                    229:                tsleep(&up->r, isflushed, up, 2*60*1000);
                    230:                poperror();
                    231:        }
                    232:        up->state |= HUNGUP;
                    233: 
                    234:        qlock(&up->xmit);
                    235:        /*
                    236:         *  ack all outstanding messages
                    237:         */
                    238:        i = up->next - 1;
                    239:        if(i < 0)
                    240:                i = 7;
                    241:        rcvack(up, ECHO+i);
                    242: 
                    243:        /*
                    244:         *  free all staged but unsent messages
                    245:         */
                    246:        for(i = 0; i < 7; i++){
                    247:                qlock(&up->xl[i]);
                    248:                if(up->xb[i]){
                    249:                        freeb(up->xb[i]);
                    250:                        up->xb[i] = 0;
                    251:                }
                    252:                qunlock(&up->xl[i]);
                    253:        }
                    254:        qunlock(&up->xmit);
                    255: 
                    256:        qlock(up);
                    257:        up->state = 0;
                    258:        qunlock(up);
                    259: }
                    260: 
                    261: /*
                    262:  *  upstream control messages
                    263:  */
                    264: static void
                    265: urpctliput(Urp *up, Queue *q, Block *bp)
                    266: {
                    267:        switch(bp->type){
                    268:        case M_HANGUP:
                    269:                up->state |= HUNGUP;
                    270:                wakeup(&up->r);
                    271:                break;
                    272:        }
                    273:        PUTNEXT(q, bp);
                    274: }
                    275: 
                    276: /*
                    277:  *  character mode input.
                    278:  *
                    279:  *  the first byte in every message is a ctl byte (which belongs at the end).
                    280:  */
                    281: void
                    282: urpciput(Queue *q, Block *bp)
                    283: {
                    284:        Urp *up;
                    285:        int i;
                    286:        int ctl;
                    287: 
                    288:        up = (Urp *)q->ptr;
                    289:        if(up == 0)
                    290:                return;
                    291:        if(bp->type != M_DATA){
                    292:                urpctliput(up, q, bp);
                    293:                return;
                    294:        }
                    295: 
                    296:        /*
                    297:         *  get the control character
                    298:         */
                    299:        ctl = *bp->rptr++;
                    300:        if(ctl < 0)
                    301:                return;
                    302: 
                    303:        /*
                    304:         *  take care of any data
                    305:         */
                    306:        if(BLEN(bp)>0  && q->next->len<2*Streamhi && q->next->nb<2*Streambhi){
                    307:                bp->flags |= S_DELIM;
                    308:                urpstat.input += BLEN(bp);
                    309:                PUTNEXT(q, bp);
                    310:        } else
                    311:                freeb(bp);
                    312: 
                    313:        /*
                    314:         *  handle the control character
                    315:         */
                    316:        switch(ctl){
                    317:        case 0:
                    318:                break;
                    319:        case ENQ:
                    320:                DPRINT("rENQ(c)\n");
                    321:                urpstat.enqsr++;
                    322:                sendctl(up, up->lastecho);
                    323:                sendctl(up, ACK+up->iseq);
                    324:                break;
                    325: 
                    326:        case CHECK:
                    327:                DPRINT("rCHECK(c)\n");
                    328:                sendctl(up, ACK+up->iseq);
                    329:                break;
                    330: 
                    331:        case AINIT:
                    332:                DPRINT("rAINIT(c)\n");
                    333:                up->state &= ~INITING;
                    334:                flushinput(up);
                    335:                tryoutput(up);
                    336:                break;
                    337: 
                    338:        case INIT0:
                    339:        case INIT1:
                    340:                DPRINT("rINIT%d(c)\n", ctl-INIT0);
                    341:                sendctl(up, AINIT);
                    342:                if(ctl == INIT1)
                    343:                        q->put = urpiput;
                    344:                initinput(up);
                    345:                break;
                    346: 
                    347:        case INITREQ:
                    348:                DPRINT("rINITREQ(c)\n");
                    349:                initoutput(up, 0);
                    350:                break;
                    351: 
                    352:        case BREAK:
                    353:                break;
                    354: 
                    355:        case REJ+0: case REJ+1: case REJ+2: case REJ+3:
                    356:        case REJ+4: case REJ+5: case REJ+6: case REJ+7:
                    357:                DPRINT("rREJ%d(c)\n", ctl-REJ);
                    358:                rcvack(up, ctl);
                    359:                break;
                    360:        
                    361:        case ACK+0: case ACK+1: case ACK+2: case ACK+3:
                    362:        case ACK+4: case ACK+5: case ACK+6: case ACK+7:
                    363:        case ECHO+0: case ECHO+1: case ECHO+2: case ECHO+3:
                    364:        case ECHO+4: case ECHO+5: case ECHO+6: case ECHO+7:
                    365:                DPRINT("%s%d(c)\n", (ctl&ECHO)?"rECHO":"rACK", ctl&7);
                    366:                rcvack(up, ctl);
                    367:                break;
                    368: 
                    369:        case SEQ+0: case SEQ+1: case SEQ+2: case SEQ+3:
                    370:        case SEQ+4: case SEQ+5: case SEQ+6: case SEQ+7:
                    371:                DPRINT("rSEQ%d(c)\n", ctl-SEQ);
                    372:                qlock(&up->ack);
                    373:                i = ctl & Nmask;
                    374:                if(!QFULL(q->next))
                    375:                        sendctl(up, up->lastecho = ECHO+i);
                    376:                up->iseq = i;
                    377:                qunlock(&up->ack);
                    378:                break;
                    379:        }
                    380: }
                    381: 
                    382: /*
                    383:  *  block mode input.
                    384:  *
                    385:  *  the first byte in every message is a ctl byte (which belongs at the end).
                    386:  *
                    387:  *  Simplifying assumption:  one put == one message && the control byte
                    388:  *     is in the first block.  If this isn't true, strange bytes will be
                    389:  *     used as control bytes.
                    390:  *
                    391:  *     There's no input lock.  The channel could be closed while we're
                    392:  *     processing a message.
                    393:  */
                    394: void
                    395: urpiput(Queue *q, Block *bp)
                    396: {
                    397:        Urp *up;
                    398:        int i, len;
                    399:        int ctl;
                    400: 
                    401:        up = (Urp *)q->ptr;
                    402:        if(up == 0)
                    403:                return;
                    404:        if(bp->type != M_DATA){
                    405:                urpctliput(up, q, bp);
                    406:                return;
                    407:        }
                    408: 
                    409:        /*
                    410:         *  get the control character
                    411:         */
                    412:        ctl = *bp->rptr++;
                    413: 
                    414:        /*
                    415:         *  take care of any block count(trx)
                    416:         */
                    417:        while(up->trx){
                    418:                if(BLEN(bp)<=0)
                    419:                        break;
                    420:                switch (up->trx) {
                    421:                case 1:
                    422:                case 2:
                    423:                        up->trbuf[up->trx++] = *bp->rptr++;
                    424:                        continue;
                    425:                default:
                    426:                        up->trx = 0;
                    427:                        break;
                    428:                }
                    429:        }
                    430: 
                    431:        /*
                    432:         *  queue the block(s)
                    433:         */
                    434:        if(BLEN(bp) > 0){
                    435:                bp->flags &= ~S_DELIM;
                    436:                putq(q, bp);
                    437:                if(q->len > 4*1024){
                    438:                        flushinput(up);
                    439:                        return;
                    440:                }
                    441:        } else
                    442:                freeb(bp);
                    443: 
                    444:        /*
                    445:         *  handle the control character
                    446:         */
                    447:        switch(ctl){
                    448:        case 0:
                    449:                break;
                    450:        case ENQ:
                    451:                DPRINT("rENQ %d %uo %uo\n", up->blocks, up->lastecho, ACK+up->iseq);
                    452:                up->blocks = 0;
                    453:                urpstat.enqsr++;
                    454:                sendctl(up, up->lastecho);
                    455:                sendctl(up, ACK+up->iseq);
                    456:                flushinput(up);
                    457:                break;
                    458: 
                    459:        case CHECK:
                    460:                DPRINT("rCHECK\n");
                    461:                sendctl(up, ACK+up->iseq);
                    462:                break;
                    463: 
                    464:        case AINIT:
                    465:                DPRINT("rAINIT\n");
                    466:                up->state &= ~INITING;
                    467:                flushinput(up);
                    468:                tryoutput(up);
                    469:                break;
                    470: 
                    471:        case INIT0:
                    472:        case INIT1:
                    473:                DPRINT("rINIT%d\n", ctl-INIT0);
                    474:                sendctl(up, AINIT);
                    475:                if(ctl == INIT0)
                    476:                        q->put = urpciput;
                    477:                initinput(up);
                    478:                break;
                    479: 
                    480:        case INITREQ:
                    481:                DPRINT("rINITREQ\n");
                    482:                initoutput(up, 0);
                    483:                break;
                    484: 
                    485:        case BREAK:
                    486:                break;
                    487: 
                    488:        case BOT:
                    489:        case BOTM:
                    490:        case BOTS:
                    491:                DPRINT("rBOT%c...", " MS"[ctl-BOT]);
                    492:                up->trx = 1;
                    493:                up->trbuf[0] = ctl;
                    494:                break;
                    495: 
                    496:        case REJ+0: case REJ+1: case REJ+2: case REJ+3:
                    497:        case REJ+4: case REJ+5: case REJ+6: case REJ+7:
                    498:                DPRINT("rREJ%d\n", ctl-REJ);
                    499:                rcvack(up, ctl);
                    500:                break;
                    501:        
                    502:        case ACK+0: case ACK+1: case ACK+2: case ACK+3:
                    503:        case ACK+4: case ACK+5: case ACK+6: case ACK+7:
                    504:        case ECHO+0: case ECHO+1: case ECHO+2: case ECHO+3:
                    505:        case ECHO+4: case ECHO+5: case ECHO+6: case ECHO+7:
                    506:                DPRINT("%s%d\n", (ctl&ECHO)?"rECHO":"rACK", ctl&7);
                    507:                rcvack(up, ctl);
                    508:                break;
                    509: 
                    510:        /*
                    511:         *  if the sequence number is the next expected
                    512:         *      and the trailer length == 3
                    513:         *      and the block count matches the bytes received
                    514:         *  then send the bytes upstream.
                    515:         */
                    516:        case SEQ+0: case SEQ+1: case SEQ+2: case SEQ+3:
                    517:        case SEQ+4: case SEQ+5: case SEQ+6: case SEQ+7:
                    518:                len = up->trbuf[1] + (up->trbuf[2]<<8);
                    519:                DPRINT("rSEQ%d(%d,%d,%d)...", ctl-SEQ, up->trx, len, q->len);
                    520:                i = ctl & Nmask;
                    521:                if(up->trx != 3){
                    522:                        urpstat.rjtrs++;
                    523:                        sendrej(up);
                    524:                        break;
                    525:                }else if(q->len != len){
                    526:                        urpstat.rjpks++;
                    527:                        sendrej(up);
                    528:                        break;
                    529:                }else if(i != ((up->iseq+1)&Nmask)){
                    530:                        urpstat.rjseq++;
                    531:                        sendrej(up);
                    532:                        break;
                    533:                }else if(q->next->len > (3*Streamhi)/2
                    534:                        || q->next->nb > (3*Streambhi)/2){
                    535:                        DPRINT("next->len=%d, next->nb=%d\n",
                    536:                                q->next->len, q->next->nb);
                    537:                        flushinput(up);
                    538:                        break;
                    539:                }
                    540:                DPRINT("accept %d\n", q->len);
                    541: 
                    542:                /*
                    543:                 *  send data upstream
                    544:                 */
                    545:                if(q->first) {
                    546:                        if(up->trbuf[0] != BOTM)
                    547:                                q->last->flags |= S_DELIM;
                    548:                        while(bp = getq(q)){
                    549:                                urpstat.input += BLEN(bp);
                    550:                                PUTNEXT(q, bp);
                    551:                        }
                    552:                } else {
                    553:                        bp = allocb(0);
                    554:                        if(up->trbuf[0] != BOTM)
                    555:                                bp->flags |= S_DELIM;
                    556:                        PUTNEXT(q, bp);
                    557:                }
                    558:                up->trx = 0;
                    559: 
                    560:                /*
                    561:                 *  acknowledge receipt
                    562:                 */
                    563:                qlock(&up->ack);
                    564:                up->iseq = i;
                    565:                if(!QFULL(q->next))
                    566:                        sendctl(up, up->lastecho = ECHO|i);
                    567:                qunlock(&up->ack);
                    568:                break;
                    569:        }
                    570: }
                    571: 
                    572: /*
                    573:  *  downstream control
                    574:  */
                    575: Queue *trapq;
                    576: static void
                    577: urpctloput(Urp *up, Queue *q, Block *bp)
                    578: {
                    579:        char *fields[2];
                    580:        int outwin;
                    581: 
                    582:        switch(bp->type){
                    583:        case M_CTL:
                    584:                if(streamparse("break", bp)){
                    585:                        /*
                    586:                         *  send a break as part of the data stream
                    587:                         */
                    588:                        urpstat.output++;
                    589:                        bp->wptr = bp->lim;
                    590:                        bp->rptr = bp->wptr - 1;
                    591:                        *bp->rptr = BREAK;
                    592:                        putq(q, bp);
                    593:                        output(up);
                    594:                        return;
                    595:                }
                    596:                if(streamparse("init", bp)){
                    597:                        outwin = strtoul((char*)bp->rptr, 0, 0);
                    598:                        initoutput(up, outwin);
                    599:                        freeb(bp);
                    600:                        return;
                    601:                }
                    602:                if(streamparse("debug", bp)){
                    603:                        switch(getfields((char *)bp->rptr, fields, 2, " ")){
                    604:                        case 1:
                    605:                                if (strcmp(fields[0], "on") == 0) {
                    606:                                        q->flag |= QDEBUG;
                    607:                                        q->other->flag |= QDEBUG;
                    608:                                }
                    609:                                if (strcmp(fields[0], "off") == 0) {
                    610:                                        q->flag &= ~QDEBUG;
                    611:                                        q->other->flag &= ~QDEBUG;
                    612:                                }
                    613:                        }
                    614:                        freeb(bp);
                    615:                        return;
                    616:                }
                    617:                if(streamparse("trap", bp)){
                    618:                        trapq = q;
                    619:                        return;
                    620:                }
                    621:        }
                    622:        PUTNEXT(q, bp);
                    623: }
                    624: 
                    625: /*
                    626:  *  accept data from a writer
                    627:  */
                    628: static void
                    629: urpoput(Queue *q, Block *bp)
                    630: {
                    631:        Urp *up;
                    632: 
                    633:        up = (Urp *)q->ptr;
                    634: 
                    635:        if(bp->type != M_DATA){
                    636:                urpctloput(up, q, bp);
                    637:                return;
                    638:        }
                    639: 
                    640:        urpstat.output += BLEN(bp);
                    641:        putq(q, bp);
                    642:        output(up);
                    643: }
                    644: 
                    645: /*
                    646:  *  start output
                    647:  */
                    648: static void
                    649: output(Urp *up)
                    650: {
                    651:        Block *bp, *nbp;
                    652:        ulong now;
                    653:        Queue *q;
                    654:        int i;
                    655: 
                    656:        if(!canqlock(&up->xmit))
                    657:                return;
                    658: 
                    659:        if(waserror()){
                    660:                print("urp output error\n");
                    661:                qunlock(&up->xmit);
                    662:                nexterror();
                    663:        }
                    664: 
                    665:        /*
                    666:         *  if still initing and it's time to rexmit, send an INIT1
                    667:         */
                    668:        now = NOW;
                    669:        if(up->state & INITING){
                    670:                if(now > up->timer){
                    671:                        q = up->wq;
                    672:                        DPRINT("INITING timer (%d, %d): ", now, up->timer);
                    673:                        sendctl(up, INIT1);
                    674:                        up->timer = now + MSrexmit;
                    675:                }
                    676:                goto out;
                    677:        }
                    678: 
                    679:        /*
                    680:         *  fill the transmit buffers, `nxb' can never overtake `unechoed'
                    681:         */
                    682:        q = up->wq;
                    683:        i = NEXT(up->nxb);
                    684:        if(i != up->unechoed) {
                    685:                for(bp = getq(q); bp && i!=up->unechoed; i = NEXT(i)){
                    686:                        if(up->xb[up->nxb] != 0)
                    687:                                urpvomit("output", up);
                    688:                        if(BLEN(bp) > up->maxblock){
                    689:                                nbp = up->xb[up->nxb] = allocb(0);
                    690:                                nbp->rptr = bp->rptr;
                    691:                                nbp->wptr = bp->rptr = bp->rptr + up->maxblock;
                    692:                        } else {
                    693:                                up->xb[up->nxb] = bp;
                    694:                                bp = getq(q);
                    695:                        }
                    696:                        up->nxb = i;
                    697:                }
                    698:                if(bp)
                    699:                        putbq(q, bp);
                    700:        }
                    701: 
                    702:        /*
                    703:         *  retransmit cruft
                    704:         */
                    705:        if(up->rexmit){
                    706:                /*
                    707:                 *  if a retransmit is requested, move next back to
                    708:                 *  the unacked blocks
                    709:                 */
                    710:                urpstat.rexmit++;
                    711:                up->rexmit = 0;
                    712:                up->next = up->unacked;
                    713:        } else if(up->unechoed!=up->next && NOW>up->timer){
                    714:                /*
                    715:                 *  if a retransmit time has elapsed since a transmit,
                    716:                 *  send an ENQ
                    717:                 */
                    718:                DPRINT("OUTPUT timer (%d, %d): ", NOW, up->timer);
                    719:                up->timer = NOW + MSrexmit;
                    720:                up->state &= ~REJECTING;
                    721:                urpstat.enqsx++;
                    722:                sendctl(up, ENQ);
                    723:                goto out;
                    724:        }
                    725: 
                    726:        /*
                    727:         *  if there's a window open, push some blocks out
                    728:         *
                    729:         *  the lock is to synchronize with acknowledges that free
                    730:         *  blocks.
                    731:         */
                    732:        while(WINDOW(up)>0 && up->next!=up->nxb){
                    733:                i = up->next;
                    734:                qlock(&up->xl[i]);
                    735:                if(waserror()){
                    736:                        qunlock(&up->xl[i]);
                    737:                        nexterror();
                    738:                }
                    739:                sendblock(up, i);
                    740:                qunlock(&up->xl[i]);
                    741:                up->next = NEXT(up->next);
                    742:                poperror();
                    743:        }
                    744: out:
                    745:        qunlock(&up->xmit);
                    746:        poperror();
                    747: }
                    748: 
                    749: /*
                    750:  *  try output, this is called by an input process
                    751:  */
                    752: void
                    753: tryoutput(Urp *up)
                    754: {
                    755:        if(!waserror()){
                    756:                output(up);
                    757:                poperror();
                    758:        }
                    759: }
                    760: 
                    761: /*
                    762:  *  send a control byte, put the byte at the end of the allocated
                    763:  *  space in case a lower layer needs header room.
                    764:  */
                    765: static void
                    766: sendctl(Urp *up, int ctl)
                    767: {
                    768:        Block *bp;
                    769:        Queue *q;
                    770: 
                    771:        q = up->wq;
                    772:        if(QFULL(q->next))
                    773:                return;
                    774:        bp = allocb(1);
                    775:        bp->wptr = bp->lim;
                    776:        bp->rptr = bp->lim-1;
                    777:        *bp->rptr = ctl;
                    778:        bp->flags |= S_DELIM;
                    779:        DPRINT("sCTL %ulx\n", ctl);
                    780:        PUTNEXT(q, bp);
                    781: }
                    782: 
                    783: /*
                    784:  *  send a reject
                    785:  */
                    786: static void
                    787: sendrej(Urp *up)
                    788: {
                    789:        Queue *q = up->wq;
                    790:        flushinput(up);
                    791:        qlock(&up->ack);
                    792:        if((up->lastecho&~Nmask) == ECHO){
                    793:                DPRINT("REJ %d\n", up->iseq);
                    794:                sendctl(up, up->lastecho = REJ|up->iseq);
                    795:        }
                    796:        qunlock(&up->ack);
                    797: }
                    798: 
                    799: /*
                    800:  *  send an acknowledge
                    801:  */
                    802: static void
                    803: sendack(Urp *up)
                    804: {
                    805:        /*
                    806:         *  check the precondition for acking
                    807:         */
                    808:        if(QFULL(up->rq->next) || (up->lastecho&Nmask)==up->iseq)
                    809:                return;
                    810: 
                    811:        if(!canqlock(&up->ack))
                    812:                return;
                    813: 
                    814:        /*
                    815:         *  check again now that we've locked
                    816:         */
                    817:        if(QFULL(up->rq->next) || (up->lastecho&Nmask)==up->iseq){
                    818:                qunlock(&up->ack);
                    819:                return;
                    820:        }
                    821: 
                    822:        /*
                    823:         *  send the ack
                    824:         */
                    825:        { Queue *q = up->wq; DPRINT("sendack: "); }
                    826:        sendctl(up, up->lastecho = ECHO|up->iseq);
                    827:        qunlock(&up->ack);
                    828: }
                    829: 
                    830: /*
                    831:  *  send a block.
                    832:  */
                    833: static void
                    834: sendblock(Urp *up, int bn)
                    835: {
                    836:        Block *bp, *m, *nbp;
                    837:        int n;
                    838:        Queue *q;
                    839: 
                    840:        q = up->wq;
                    841:        up->timer = NOW + MSrexmit;
                    842:        if(QFULL(q->next))
                    843:                return;
                    844: 
                    845:        /*
                    846:         *  message 1, the BOT and the data
                    847:         */
                    848:        bp = up->xb[bn];
                    849:        if(bp == 0)
                    850:                return;
                    851:        m = allocb(1);
                    852:        m->rptr = m->lim - 1;
                    853:        m->wptr = m->lim;
                    854:        *m->rptr = (bp->flags & S_DELIM) ? BOT : BOTM;
                    855:        nbp = allocb(0);
                    856:        nbp->rptr = bp->rptr;
                    857:        nbp->wptr = bp->wptr;
                    858:        nbp->base = bp->base;
                    859:        nbp->lim = bp->lim;
                    860:        nbp->flags |= S_DELIM;
                    861:        if(bp->type == M_CTL){
                    862:                PUTNEXT(q, nbp);
                    863:                m->flags |= S_DELIM;
                    864:                PUTNEXT(q, m);
                    865:        } else {
                    866:                m->next = nbp;
                    867:                PUTNEXT(q, m);
                    868:        }
                    869: 
                    870:        /*
                    871:         *  message 2, the block length and the SEQ
                    872:         */
                    873:        m = allocb(3);
                    874:        m->rptr = m->lim - 3;
                    875:        m->wptr = m->lim;
                    876:        n = BLEN(bp);
                    877:        m->rptr[0] = SEQ | bn;
                    878:        m->rptr[1] = n;
                    879:        m->rptr[2] = n>>8;
                    880:        m->flags |= S_DELIM;
                    881:        PUTNEXT(q, m);
                    882:        DPRINT("sb %d (%d)\n", bn, up->timer);
                    883: }
                    884: 
                    885: /*
                    886:  *  receive an acknowledgement
                    887:  */
                    888: static void
                    889: rcvack(Urp *up, int msg)
                    890: {
                    891:        int seqno;
                    892:        int next;
                    893:        int i;
                    894: 
                    895:        seqno = msg&Nmask;
                    896:        next = NEXT(seqno);
                    897: 
                    898:        /*
                    899:         *  release any acknowledged blocks
                    900:         */
                    901:        if(IN(seqno, up->unacked, up->next)){
                    902:                for(; up->unacked != next; up->unacked = NEXT(up->unacked)){
                    903:                        i = up->unacked;
                    904:                        qlock(&up->xl[i]);
                    905:                        if(up->xb[i])
                    906:                                freeb(up->xb[i]);
                    907:                        up->xb[i] = 0;
                    908:                        qunlock(&up->xl[i]);
                    909:                }
                    910:        }
                    911: 
                    912:        switch(msg & 0370){
                    913:        case ECHO:
                    914:                if(IN(seqno, up->unechoed, up->next)) {
                    915:                        up->unechoed = next;
                    916:                }
                    917:                /*
                    918:                 *  the next reject at the start of a window starts a 
                    919:                 *  retransmission.
                    920:                 */
                    921:                up->state &= ~REJECTING;
                    922:                break;
                    923:        case REJ:
                    924:                if(IN(seqno, up->unechoed, up->next))
                    925:                        up->unechoed = next;
                    926:                /*
                    927:                 *  ... FALL THROUGH ...
                    928:                 */
                    929:        case ACK:
                    930:                /*
                    931:                 *  start a retransmission if we aren't retransmitting
                    932:                 *  and this is the start of a window.
                    933:                 */
                    934:                if(up->unechoed==next && !(up->state & REJECTING)){
                    935:                        up->state |= REJECTING;
                    936:                        up->rexmit = 1;
                    937:                }
                    938:                break;
                    939:        }
                    940: 
                    941:        tryoutput(up);
                    942:        if(up->state & CLOSING)
                    943:                wakeup(&up->r);
                    944: }
                    945: 
                    946: /*
                    947:  * throw away any partially collected input
                    948:  */
                    949: static void
                    950: flushinput(Urp *up)
                    951: {
                    952:        Block *bp;
                    953: 
                    954:        while (bp = getq(up->rq))
                    955:                freeb(bp);
                    956:        up->trx = 0;
                    957: }
                    958: 
                    959: /*
                    960:  *  initialize output
                    961:  */
                    962: static void
                    963: initoutput(Urp *up, int window)
                    964: {
                    965:        int i;
                    966: 
                    967:        /*
                    968:         *  set output window
                    969:         */
                    970:        up->maxblock = window/4;
                    971:        if(up->maxblock < 64)
                    972:                up->maxblock = 64;
                    973:        up->maxblock -= 4;
                    974:        up->maxout = 4;
                    975: 
                    976:        /*
                    977:         *  set sequence varialbles
                    978:         */
                    979:        up->unechoed = 1;
                    980:        up->unacked = 1;
                    981:        up->next = 1;
                    982:        up->nxb = 1;
                    983:        up->rexmit = 0;
                    984: 
                    985:        /*
                    986:         *  free any outstanding blocks
                    987:         */
                    988:        for(i = 0; i < 8; i++){
                    989:                qlock(&up->xl[i]);
                    990:                if(up->xb[i])
                    991:                        freeb(up->xb[i]);
                    992:                up->xb[i] = 0;
                    993:                qunlock(&up->xl[i]);
                    994:        }
                    995: 
                    996:        /*
                    997:         *  tell the other side we've inited
                    998:         */
                    999:        up->state |= INITING;
                   1000:        up->timer = NOW + MSrexmit;
                   1001:        { Queue *q = up->wq; DPRINT("initoutput (%d): ", up->timer); }
                   1002:        sendctl(up, INIT1);
                   1003: }
                   1004: 
                   1005: /*
                   1006:  *  initialize input
                   1007:  */
                   1008: static void
                   1009: initinput(Urp *up)
                   1010: {
                   1011:        /*
                   1012:         *  restart all sequence parameters
                   1013:         */
                   1014:        up->blocks = 0;
                   1015:        up->trx = 0;
                   1016:        up->iseq = 0;
                   1017:        up->lastecho = ECHO+0;
                   1018:        flushinput(up);
                   1019: }
                   1020: 
                   1021: static void
                   1022: urpkproc(void *arg)
                   1023: {
                   1024:        Urp *up;
                   1025: 
                   1026:        USED(arg);
                   1027: 
                   1028:        if(waserror())
                   1029:                ;
                   1030: 
                   1031:        for(;;){
                   1032:                for(up = urpalloc.urp; up; up = up->list){
                   1033:                        if(up->state==0 || (up->state&HUNGUP))
                   1034:                                continue;
                   1035:                        if(!canqlock(up))
                   1036:                                continue;
                   1037:                        if(waserror()){
                   1038:                                qunlock(up);
                   1039:                                continue;
                   1040:                        }
                   1041:                        if(up->state==0 || (up->state&HUNGUP)){
                   1042:                                qunlock(up);
                   1043:                                poperror();
                   1044:                                continue;
                   1045:                        }
                   1046:                        if(up->iseq!=(up->lastecho&7) && !QFULL(up->rq->next))
                   1047:                                sendack(up);
                   1048:                        output(up);
                   1049:                        qunlock(up);
                   1050:                        poperror();
                   1051:                }
                   1052:                tsleep(&urpkr, return0, 0, 500);
                   1053:        }
                   1054: }
                   1055: 
                   1056: /*
                   1057:  *  urp got very confused, complain
                   1058:  */
                   1059: static void
                   1060: urpvomit(char *msg, Urp* up)
                   1061: {
                   1062:        print("urpvomit: %s %ux next %d unechoed %d unacked %d nxb %d\n",
                   1063:                msg, up, up->next, up->unechoed, up->unacked, up->nxb);
                   1064:        print("\txb: %ux %ux %ux %ux %ux %ux %ux %ux\n",
                   1065:                up->xb[0], up->xb[1], up->xb[2], up->xb[3], up->xb[4], 
                   1066:                up->xb[5], up->xb[6], up->xb[7]);
                   1067:        print("\tiseq: %uo lastecho: %uo trx: %d trbuf: %uo %uo %uo\n",
                   1068:                up->iseq, up->lastecho, up->trx, up->trbuf[0], up->trbuf[1],
                   1069:                up->trbuf[2]);
                   1070:        print("\tupq: %ux %d %d\n", &up->rq->next->r,  up->rq->next->nb,
                   1071:                up->rq->next->len);
                   1072: }
                   1073: 
                   1074: void
                   1075: urpfillstats(Chan *c, char *buf, int len)
                   1076: {
                   1077:        char b[256];
                   1078: 
                   1079:        USED(c);
                   1080:        sprint(b, "in: %d\nout: %d\nrexmit: %d\nrjtrs: %d\nrjpks: %d\nrjseq: %d\nenqsx: %d\nenqsr: %d\n",
                   1081:                urpstat.input, urpstat.output, urpstat.rexmit, urpstat.rjtrs,
                   1082:                urpstat.rjpks, urpstat.rjseq, urpstat.enqsx, urpstat.enqsr);
                   1083:        strncpy(buf, b, len);
                   1084: }

unix.superglobalmegacorp.com

This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.