|
| 1 | +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; |
| 2 | +import * as fs from 'fs'; |
| 3 | +import * as path from 'path'; |
| 4 | +import * as os from 'os'; |
| 5 | +import { StreamingJSONLParser } from '../streaming-jsonl-parser.js'; |
| 6 | + |
| 7 | +describe('StreamingJSONLParser', () => { |
| 8 | + let tmpDir: string; |
| 9 | + let parser: StreamingJSONLParser; |
| 10 | + |
| 11 | + beforeEach(() => { |
| 12 | + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'jsonl-test-')); |
| 13 | + parser = new StreamingJSONLParser(); |
| 14 | + }); |
| 15 | + |
| 16 | + afterEach(() => { |
| 17 | + fs.rmSync(tmpDir, { recursive: true, force: true }); |
| 18 | + }); |
| 19 | + |
| 20 | + function writeJsonl(name: string, lines: unknown[]): string { |
| 21 | + const fp = path.join(tmpDir, name); |
| 22 | + fs.writeFileSync(fp, lines.map((l) => JSON.stringify(l)).join('\n') + '\n'); |
| 23 | + return fp; |
| 24 | + } |
| 25 | + |
| 26 | + describe('parseAll', () => { |
| 27 | + it('parses all lines from a JSONL file', async () => { |
| 28 | + const fp = writeJsonl('basic.jsonl', [ |
| 29 | + { id: 1, name: 'a' }, |
| 30 | + { id: 2, name: 'b' }, |
| 31 | + { id: 3, name: 'c' }, |
| 32 | + ]); |
| 33 | + |
| 34 | + const result = await parser.parseAll(fp); |
| 35 | + expect(result).toHaveLength(3); |
| 36 | + expect(result[0]).toEqual({ id: 1, name: 'a' }); |
| 37 | + }); |
| 38 | + |
| 39 | + it('skips invalid JSON lines', async () => { |
| 40 | + const fp = path.join(tmpDir, 'bad.jsonl'); |
| 41 | + fs.writeFileSync(fp, '{"ok":true}\nnot json\n{"ok":false}\n'); |
| 42 | + |
| 43 | + const result = await parser.parseAll(fp); |
| 44 | + expect(result).toHaveLength(2); |
| 45 | + }); |
| 46 | + |
| 47 | + it('skips empty lines', async () => { |
| 48 | + const fp = path.join(tmpDir, 'empty.jsonl'); |
| 49 | + fs.writeFileSync(fp, '{"a":1}\n\n\n{"b":2}\n'); |
| 50 | + |
| 51 | + const result = await parser.parseAll(fp); |
| 52 | + expect(result).toHaveLength(2); |
| 53 | + }); |
| 54 | + |
| 55 | + it('applies filter', async () => { |
| 56 | + const fp = writeJsonl('filter.jsonl', [ |
| 57 | + { type: 'error', msg: 'bad' }, |
| 58 | + { type: 'info', msg: 'ok' }, |
| 59 | + { type: 'error', msg: 'worse' }, |
| 60 | + ]); |
| 61 | + |
| 62 | + const result = await parser.parseAll(fp, { |
| 63 | + filter: (obj) => obj.type === 'error', |
| 64 | + }); |
| 65 | + expect(result).toHaveLength(2); |
| 66 | + }); |
| 67 | + |
| 68 | + it('applies transform', async () => { |
| 69 | + const fp = writeJsonl('transform.jsonl', [{ x: 1 }, { x: 2 }]); |
| 70 | + |
| 71 | + const result = await parser.parseAll(fp, { |
| 72 | + transform: (obj) => ({ ...obj, doubled: obj.x * 2 }), |
| 73 | + }); |
| 74 | + expect(result[0]).toEqual({ x: 1, doubled: 2 }); |
| 75 | + expect(result[1]).toEqual({ x: 2, doubled: 4 }); |
| 76 | + }); |
| 77 | + }); |
| 78 | + |
| 79 | + describe('parseStream', () => { |
| 80 | + it('yields batches of specified size', async () => { |
| 81 | + const lines = Array.from({ length: 10 }, (_, i) => ({ i })); |
| 82 | + const fp = writeJsonl('batch.jsonl', lines); |
| 83 | + |
| 84 | + const batches: unknown[][] = []; |
| 85 | + for await (const batch of parser.parseStream(fp, { batchSize: 3 })) { |
| 86 | + batches.push(batch); |
| 87 | + } |
| 88 | + |
| 89 | + expect(batches).toHaveLength(4); // 3+3+3+1 |
| 90 | + expect(batches[0]).toHaveLength(3); |
| 91 | + expect(batches[3]).toHaveLength(1); |
| 92 | + }); |
| 93 | + |
| 94 | + it('calls onProgress callback', async () => { |
| 95 | + const fp = writeJsonl('progress.jsonl', [{ a: 1 }, { a: 2 }, { a: 3 }]); |
| 96 | + const progressCalls: number[] = []; |
| 97 | + |
| 98 | + const results: unknown[] = []; |
| 99 | + for await (const batch of parser.parseStream(fp, { |
| 100 | + batchSize: 2, |
| 101 | + onProgress: (n) => progressCalls.push(n), |
| 102 | + })) { |
| 103 | + results.push(...batch); |
| 104 | + } |
| 105 | + |
| 106 | + expect(results).toHaveLength(3); |
| 107 | + expect(progressCalls.length).toBeGreaterThan(0); |
| 108 | + }); |
| 109 | + |
| 110 | + it('skips oversized lines', async () => { |
| 111 | + const fp = path.join(tmpDir, 'oversized.jsonl'); |
| 112 | + const bigLine = JSON.stringify({ data: 'x'.repeat(200) }); |
| 113 | + const smallLine = JSON.stringify({ data: 'ok' }); |
| 114 | + fs.writeFileSync(fp, `${smallLine}\n${bigLine}\n${smallLine}\n`); |
| 115 | + |
| 116 | + const result = await parser.parseAll(fp, { maxLineLength: 100 }); |
| 117 | + expect(result).toHaveLength(2); |
| 118 | + }); |
| 119 | + }); |
| 120 | + |
| 121 | + describe('process', () => { |
| 122 | + it('processes batches with custom function', async () => { |
| 123 | + const fp = writeJsonl('process.jsonl', [{ v: 1 }, { v: 2 }, { v: 3 }]); |
| 124 | + |
| 125 | + const sums = await parser.process<{ v: number }, number>( |
| 126 | + fp, |
| 127 | + async (items) => items.reduce((s, i) => s + i.v, 0), |
| 128 | + { batchSize: 2 } |
| 129 | + ); |
| 130 | + |
| 131 | + // batch1: 1+2=3, batch2: 3 |
| 132 | + expect(sums).toEqual([3, 3]); |
| 133 | + }); |
| 134 | + }); |
| 135 | + |
| 136 | + describe('countLines', () => { |
| 137 | + it('counts all lines including empty', async () => { |
| 138 | + const fp = path.join(tmpDir, 'count.jsonl'); |
| 139 | + fs.writeFileSync(fp, '{"a":1}\n{"a":2}\n{"a":3}\n'); |
| 140 | + |
| 141 | + const count = await parser.countLines(fp); |
| 142 | + // 3 content lines + 1 trailing empty line from final \n |
| 143 | + expect(count).toBeGreaterThanOrEqual(3); |
| 144 | + }); |
| 145 | + }); |
| 146 | + |
| 147 | + describe('sampleLines', () => { |
| 148 | + it('throws for invalid sample rate', async () => { |
| 149 | + const fp = writeJsonl('sample.jsonl', [{ a: 1 }]); |
| 150 | + |
| 151 | + await expect(async () => { |
| 152 | + for await (const _ of parser.sampleLines(fp, 0)) { |
| 153 | + // consume |
| 154 | + } |
| 155 | + }).rejects.toThrow(/Sample rate must be between 0 and 1/); |
| 156 | + }); |
| 157 | + |
| 158 | + it('yields subset of lines at rate 1.0', async () => { |
| 159 | + const lines = Array.from({ length: 5 }, (_, i) => ({ i })); |
| 160 | + const fp = writeJsonl('sample-all.jsonl', lines); |
| 161 | + |
| 162 | + const results: unknown[] = []; |
| 163 | + for await (const item of parser.sampleLines(fp, 1.0)) { |
| 164 | + results.push(item); |
| 165 | + } |
| 166 | + expect(results).toHaveLength(5); |
| 167 | + }); |
| 168 | + }); |
| 169 | + |
| 170 | + describe('createTransformStream', () => { |
| 171 | + it('transforms JSONL chunks in object mode', async () => { |
| 172 | + const transform = parser.createTransformStream({ |
| 173 | + filter: (obj) => obj.keep, |
| 174 | + }); |
| 175 | + |
| 176 | + const results: unknown[] = []; |
| 177 | + transform.on('data', (obj) => results.push(obj)); |
| 178 | + |
| 179 | + await new Promise<void>((resolve, reject) => { |
| 180 | + transform.write('{"keep":true,"v":1}\n{"keep":false,"v":2}\n'); |
| 181 | + transform.end(() => { |
| 182 | + resolve(); |
| 183 | + }); |
| 184 | + transform.on('error', reject); |
| 185 | + }); |
| 186 | + |
| 187 | + expect(results).toHaveLength(1); |
| 188 | + expect(results[0]).toEqual({ keep: true, v: 1 }); |
| 189 | + }); |
| 190 | + }); |
| 191 | +}); |
0 commit comments