|
|
1.1 root 1: /* Copyright 1990, AT&T Bell Labs */
2: #include <stdlib.h>
3: #include <string.h>
4: #include "fsort.h"
5:
6: char *disorder = "key out of order";
7: char *duplicate = "duplicate key";
8: char *space = "out of space for merging";
9:
10: enum { IEOF, IOK };
11: enum { IPRE = 01000, IFOL = 02000 }; /* see find() */
12: enum { NMERGE = 16 };
13:
14: int nmerge = NMERGE; /* max number of files to merge at once */
15: int nextfile;
16:
17: struct merge { /* current record in an open file */
18: char *name; /* name of file (for diagnostics) */
19: FILE *file;
20: struct rec *rec; /* pointer to the data (and key) */
21: short serial; /* file no., breaks ties for stable sort */
22: short del; /* delete at close */
23: };
24:
25: struct merge *mfile; /* one struct per open file */
26: struct merge **f; /* pointers to mfile, ordered by content */
27:
28: static void mergephase(int, char*);
29: static int find(int, int*);
30: static int fillrec(struct merge*);
31: static int compare(struct merge*, struct merge*);
32: static void syncerr(struct merge*, char*);
33: extern int link(char*, char*);
34: extern int unlink(char*);
35: static void recinit(int nf, int nm, int n);
36: static void mv(int, int);
37:
38: static void
39: recalloc(struct merge *m)
40: {
41: if(m->rec)
42: return; /* hygiene; doesn't happen */
43: m->rec = (struct rec*)malloc(MINREC);
44: if(m->rec == 0)
45: fatal(space, "", 0);
46: m->rec->dlen = 0;
47: m->rec->next = (struct rec*)((uchar*)m->rec + MINREC);
48: }
49:
50: /* misfortune : all fields are parsed and converted
51: before comparison. Lazy parsing might be in order,
52: but it's too hard to contemplate */
53:
54: /* Merge strategy, to merge N inputs in groups of at most M.
55: Numbers, like (1), are keyed to comments in code.
56: If N <= M, merge everything. (4)
57: If N > M*(M-1)+1
58: greedily merge M at a time (1) to reduce to the next case.
59: Merge x bunches of M (3) and one bunch of y (2),
60: then merge the x+1 new files (4) with the remaining z files
61: in an exactly M-way merge. Calculate x,y,z thus:
62: Mx + y + z = N file count
63: x + 1 + z = M final merge
64: 2 <= y <= M
65: x >= 0
66: z >= 0
67: by algebra
68: (M-1)x + y = N-M+1
69: y == (N-M+1) mod (M-1), 2 <= y <= M
70: x == (n-M+1-y)/(M-1)
71: Total cost is Mx + y + N. (Strategy due to P. McIlroy.) */
72:
73: /* merge n files. the first unmerged temp file is nm (i.e.
74: nm is the number of already merged files). the first
75: unmerged input file is nf.
76: rename temp files to keep a dense set beginning at 0
77: (It seems silly to rename 100 files after merging
78: every 1; how much time does that actually cost?) */
79: void
80: merge1(int nf, int nm, int n)
81: {
82: char *name;
83: int i, j;
84: int flag = nf<nfiles;
85: recinit(nf, nm, n);
86: name = filename(nextfile++);
87: mergephase(n, name);
88: free(name);
89: if(flag)
90: return;
91: mv(--nextfile, nm);
92: for(i=nm+n, j=nm+1; i<nextfile; i++, j++)
93: mv(i, j);
94: nextfile -= n - 1;
95: }
96:
97: /* if flag = 0 merge input files; else temps only.
98: this program is specialized to two levels of merge;
99: should fix that some day. */
100:
101: void
102: merge(int flag)
103: {
104: char buf[BUFSIZ];
105: FILE *input, *output;
106: int n, y;
107: char *name;
108: int nf = 0; /* next input file to merge */
109: int nm = 0; /* total already merged once */
110: int nt = nfiles; /* files as yet unmerged */
111: if(flag) {
112: nf = nfiles;
113: nt = nextfile;
114: }
115: if(mfile == 0)
116: mfile = (struct merge*)calloc(nmerge+1,
117: sizeof(*mfile));
118: if(f == 0)
119: f = (struct merge**)calloc(nmerge+1,
120: sizeof(*f));
121: if(mfile==0 || f==0)
122: fatal(space,"",0);
123:
124: if(nt > nmerge*(nmerge-1)+1) { /* (1) */
125: for(nm=0; nt>0; ) {
126: n = nt>nmerge? nmerge: nt;
127: merge1(nf, nm, n);
128: if(flag) nf += n;
129: nt -= n;
130: nm++;
131: }
132: merge(1);
133: return;
134: }
135: if(nt > nmerge) { /* (2) */
136: y = (nt-nmerge+1)%(nmerge-1);
137: if(y <= 1) y += nmerge-1;
138: merge1(nf, nm, y);
139: nf = nf+y>=nfiles? nfiles: nf+y;
140: nt -= y;
141: nm++;
142: }
143: while(nt+nm > nmerge) { /* (3) */
144: merge1(nf, nm, nmerge);
145: nf = nf+nmerge>=nfiles? nfiles: nf+nmerge;
146: nt -= nmerge;
147: nm++;
148: }
149:
150: n = nm+nt; /* (4) */
151: recinit(nf, 0, n);
152: if(!overwrite(nf))
153: name = oname;
154: else
155: name = filename(nextfile++);
156: mergephase(n, name);
157: if(name == oname)
158: return;
159: input = fileopen(name, "r");
160: output = fileopen(oname, "w");
161: while(n = fread(buf, 1, sizeof(buf), input))
162: if(fwrite(buf, 1, n, output) != n)
163: fatal("error writing", oname, 0);
164: fileclose(input, name);
165: unlink(name);
166: free(name);
167: fileclose(output, oname);
168: }
169:
170: /* Initialize merge records for n files, beginning with
171: input file number nf and temp file number nm.
172:
173: There are nfiles-nf input files; any files beyond that
174: number must be temporaries, which come from already
175: merged inputs. (The only time that both kinds of
176: files are present is the very last merge, when for
177: stability, the temps are regarded as earlier.) */
178:
179: static void
180: recinit(int nf, int nm, int n)
181: {
182: int i;
183: struct merge *m;
184: int last = nfiles-nf + nextfile-nm == n;
185: for(i=0; i<=n; i++) /* one extra, for mergephase() */
186: recalloc(&mfile[i]);
187: for(i=0; i<n; i++) {
188: m = &mfile[i];
189: m->del = nf>=nfiles || last && i<nextfile;
190: m->name = m->del? filename(nm++): files[nf++];
191: m->file = fileopen(m->name, "r");
192: m->serial = i;
193: f[i] = m;
194: }
195: f[n] = &mfile[n];
196: }
197:
198: static void
199: mv(int i, int j)
200: {
201: char *old, *new;
202: if(i == j) return; /* believed not to happen */
203: old = filename(i);
204: new = filename(j);
205: unlink(new);
206: if(link(old,new) == -1 || unlink(old) == -1)
207: fatal("cannot move", old, 0);
208: free(old);
209: free(new);
210: }
211:
212: static void
213: emit(FILE *output)
214: {
215: struct merge *m = f[0];
216: int n = m->rec->dlen;
217: uchar *p = data(m->rec); /* hanky panky for speed */
218: uchar *e = p + n++; /* use fwrite instead of */
219: int c = *e; /* printf */
220: *e = '\n';
221: if(fwrite((char*)p, 1, n, output) != n)
222: fatal("error writing", m->name, 0);
223: *e = c;
224: }
225:
226: /* Merge files in mfile[0]..mfile[n-1].
227: f[0]..f[l-1] points to files in increasing order
228: of their current records. All the current records
229: are strictly ordered, either by tie-breaking, or
230: by skipping records that compare equal under -u.
231: There are two phases: (a) initialization (reading
232: from a file when there is no record at hand) and
233: (b) continuation (reading the next record with the
234: intent of displacing the current record).
235: These conditions arise, at places numbered in comments
236: (1) the record compares equal to another, and would follow
237: that other if ties were broken
238: (2) the record compares equal to another, and would precede
239: that other if ties were broken
240: (3) the record falls between two others
241:
242: At all times the keys pointed to by f[0]..f[l]
243: are distinct (perhaps due to tie-breaking), as
244: are the files they come from. When a new key
245: is read, its proper home (in tie-broken order)
246: is between f[j-1] and f[j].
247: The picture below shows the hardest case (2b),
248: where key a from file F is about to be emitted.
249: All keys in the sequence axby are distinct, as
250: are the files they come from. A new key is read
251: from file F into the extra space pointed to by
252: f[i]. (Keeping one extra key allows disorder to
253: be detected almost for free.) The
254: new key is equal to key b pointed to by f[j]
255: and file F precedes file G in stability order.
256: Thus b(F) deserves to be kept and b(G) discarded.
257: 0 j l=n
258: --------------------------------------------
259: | a(F) | x | b(G) | y | b(F) |
260: --------------------------------------------
261:
262: We emit the record at 0, and displace the key
263: at j, from whose file there is now no record
264: present; l decreases by one.
265: 0 j l n
266: --------------------------------------------
267: | x | b(F) | y | ?(G) | |
268: --------------------------------------------
269:
270: This takes us back to initialization. From
271: this configuration it is possible to reach
272: case (2a), like (2b) except that the key at 0
273: is not to be emitted; (2a) cannot happen during
274: the original initialization.
275: */
276:
277: #define moveup(a,n) memmove(a+1, a, (n)*sizeof(*a))
278: #define movedown(a,n) memmove(a-1, a, (n)*sizeof(*a))
279:
280: static void
281: mergephase(int n, char *name)
282: {
283: int j, flags;
284: struct merge *mp;
285: struct rec *r;
286: int l = 0;
287: int k = -1; /* for spotting sync errors */
288: FILE *output = fileopen(name, "w");
289: init: while(l < n) { /*(a)*/
290: mp = f[l];
291: if(fillrec(mp) == IEOF) {
292: movedown(f+l+1, n-l);
293: f[n--] = mp;
294: continue;
295: }
296: j = find(l, &flags);
297: if(j <= k)
298: syncerr(mp, disorder);
299: if(flags & IFOL) { /*(1a)*/
300: if(!aflag || doaccum(f[j-1]->rec, mp->rec))
301: continue;
302: } else if(flags & IPRE) /*(2a)*/
303: if(!aflag || doaccum(mp->rec, f[j]->rec)) {
304: f[l] = f[j];
305: f[j] = mp;
306: k = j;
307: continue;
308: }
309: moveup(f+j, l-j); /*(3a)*/
310: f[j] = mp;
311: l++;
312: }
313: while(n > 0) { /*(b)*/
314: mp = f[l];
315: r = mp->rec;
316: *mp = *f[0];
317: mp->rec = r;
318: if(fillrec(mp) == IEOF) {
319: emit(output);
320: mp = f[0];
321: movedown(f+1, l);
322: f[--n] = mp;
323: l = n;
324: continue;
325: }
326: j = find(l, &flags);
327: if(j == 0)
328: syncerr(mp, disorder);
329: if(flags & IFOL) { /*(1b)*/
330: if(!aflag || doaccum(f[j-1]->rec, mp->rec))
331: continue;
332: } else if(flags & IPRE) /*(2b)*/
333: if(!aflag || doaccum(mp->rec, f[j]->rec)) {
334: emit(output);
335: f[l] = f[j];
336: f[j] = mp;
337: mp = f[0];
338: movedown(f+1, l);
339: f[l--] = mp;
340: k = j - 1;
341: goto init;
342: }
343: emit(output); /*(3b)*/
344: mp = f[0];
345: movedown(f+1, j-1);
346: f[j-1] = f[l];
347: f[l] = mp;
348: }
349:
350: fileclose(output, name);
351: }
352:
353: static int
354: fillrec(struct merge *mp)
355: {
356: struct rec *r = getline(mp->rec, mp->file);
357: if(r == 0)
358: return IOK;
359: if(r == ENDFILE) {
360: fileclose(mp->file, mp->name);
361: if(mp->del)
362: unlink(mp->name);
363: return IEOF;
364: }
365: mp->rec = r;
366: return IOK;
367: }
368:
369: /*
370: Return the proper index for the extra record f[l]
371: to be inserted before (regardless of -u).
372: Under -u, if the extra record is a duplicate, return
373: the index flagged by IFOL or IPRE according as
374: the insertion position follows or precedes a
375: record of the same value.
376: */
377:
378: static int
379: find(int l, int *flags)
380: {
381: int i, t;
382: int bot = 0;
383: int top = l;
384: while(bot < top) {
385: i = (bot+top)/2;
386: t = compare(f[l], f[i]);
387: if(t < 0)
388: top = i;
389: else if(t > 0)
390: bot = i + 1;
391: else if(uflag) {
392: if(f[l]->serial < f[i]->serial) {
393: *flags = IPRE;
394: return i;
395: } else {
396: *flags = IFOL;
397: return i + 1;
398: }
399: }
400: else if(f[l]->serial < f[i]->serial)
401: top = i;
402: else
403: bot = i + 1;
404: }
405: *flags = 0;
406: return top;
407: }
408:
409: static int
410: compare(struct merge *mi, struct merge *mj)
411: {
412: uchar *ip, *jp;
413: uchar *ei, *ej;
414: uchar *trans, *keep;
415: int li, lj, k;
416: if(simplekeyed) {
417: trans = fields->trans;
418: keep = fields->keep;
419: ip = data(mi->rec);
420: jp = data(mj->rec);
421: ei = ip + mi->rec->dlen;
422: ej = jp + mj->rec->dlen;
423: for( ; ; ip++, jp++) {
424: while(ip<ei && !keep[*ip]) ip++;
425: while(jp<ej && !keep[*jp]) jp++;
426: if(ip>=ei || jp>=ej) break;
427: k = trans[*ip] - trans[*jp];
428: if(k != 0) break;
429: }
430: if(ip<ei && jp<ej)
431: return k*signedrflag;
432: else if(ip < ei)
433: return signedrflag;
434: else if(jp < ej)
435: return -signedrflag;
436: else if(sflag)
437: return 0;
438: } else if(keyed) {
439: ip = key(mi->rec);
440: jp = key(mj->rec);
441: li = mi->rec->klen;
442: lj = mj->rec->klen;
443: for(k=li<lj?li:lj; --k>=0; ip++, jp++)
444: if(*ip != *jp)
445: break;
446: if(k < 0) {
447: if(li != lj)
448: fatal("theorem disproved","",0);
449: if(sflag)
450: return 0;
451: } else
452: return *ip - *jp;
453:
454: }
455: li = mi->rec->dlen;
456: lj = mj->rec->dlen;
457: ip = data(mi->rec);
458: jp = data(mj->rec);
459: for(k=li<lj?li:lj; --k>=0; ip++, jp++)
460: if(*ip != *jp)
461: break;
462: return (k<0? li-lj: *ip-*jp)*signedrflag;
463: }
464:
465: void
466: check(char *name)
467: {
468: int i, t;
469:
470: mfile = (struct merge*)calloc(2,sizeof(*mfile));
471: recalloc(&mfile[0]);
472: recalloc(&mfile[1]);
473: mfile[0].name = mfile[1].name = name;
474: mfile[0].file = mfile[1].file = fileopen(name, "r");
475: if(mfile[0].file == 0)
476: fatal("can't open ", name, 0);
477:
478: if(fillrec(&mfile[0]) == IEOF)
479: return;
480: for(i=1; fillrec(&mfile[i])!=IEOF; i^=1) {
481: t = compare(&mfile[i^1], &mfile[i]);
482: if(t>0)
483: syncerr(&mfile[i], disorder);
484: else if(t==0 && uflag)
485: syncerr(&mfile[i], duplicate);
486: }
487: }
488:
489: static void
490: syncerr(struct merge *m, char *message)
491: {
492: int n = m->rec->dlen;
493: warn("trouble in file", m->name, 0);
494: fatal(message, n?(char*)data(m->rec):"", n);
495: }
This archive runs on limited infrastructure. Preserving old code on modern bandwidth. Automated agents are requested to crawl responsibly.