-
Notifications
You must be signed in to change notification settings - Fork 19
feat: chunked processing (streaming) #509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #509 +/- ##
==========================================
+ Coverage 75.71% 81.26% +5.54%
==========================================
Files 115 120 +5
Lines 11181 12865 +1684
Branches 756 893 +137
==========================================
+ Hits 8466 10455 +1989
+ Misses 2711 2407 -304
+ Partials 4 3 -1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements chunked processing with streaming capabilities using async generators, enabling pipeline parallelism where downstream generators can begin processing results as soon as chunks complete from their dependencies, rather than waiting for all processing to finish.
Key Changes:
- Introduces streaming utilities for async chunk processing
- Adds
stream()method to parallel workers alongside existingmap()method - Converts all generator
generate()functions to async generators that yield chunks
Reviewed changes
Copilot reviewed 14 out of 15 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
src/streaming.mjs |
New utility module providing streaming helpers including yieldAsCompleted, collectAsyncGenerator, and caching mechanisms |
src/threading/parallel.mjs |
Adds stream() method for chunk-by-chunk yielding; refactors utility functions to shared module |
src/generators.mjs |
Updates dependency resolution to handle async generators; adds streaming cache for collecting generator results |
src/generators/metadata/index.mjs |
Converts to async generator pattern with streaming support |
src/generators/legacy-json/index.mjs |
Converts to async generator pattern with streaming support |
src/generators/legacy-json-all/index.mjs |
Adds processChunk() implementation; converts to async generator with chunk aggregation |
src/generators/legacy-html/index.mjs |
Converts to async generator; extracts replaceTemplateValues to separate module |
src/generators/legacy-html-all/index.mjs |
Adds processChunk() implementation; converts to async generator; uses extracted replaceTemplateValues |
src/generators/legacy-html/utils/replaceTemplateValues.mjs |
New utility module extracted from legacy-html generator for template value replacement |
src/generators/web/index.mjs |
Adds processChunk() pass-through; converts to async generator while maintaining batch processing |
src/generators/jsx-ast/index.mjs |
Converts to async generator; extracts getSortedHeadNodes to separate module |
src/generators/jsx-ast/utils/getSortedHeadNodes.mjs |
New utility module extracted from jsx-ast generator for sorting head nodes |
src/generators/ast-js/index.mjs |
Converts to async generator with streaming support |
src/generators/api-links/__tests__/fixtures.test.mjs |
Updates test to consume async generator results |
bin/commands/generate.mjs |
Increases default chunk size from 10 to 20 items per worker thread |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
I want to revise a few things before this PR is ready. |
… thread scheduler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 29 out of 33 changed files in this pull request and generated 10 comments.
Comments suppressed due to low confidence (1)
src/generators/api-links/tests/fixtures.test.mjs:48
- The test creates a WorkerPool but never terminates it. This can leave worker threads running after the test completes, potentially causing resource leaks or test hangs. Add
await pool.terminate();after line 47 (before the closing brace of the test).
const pool = new WorkerPool('../chunk-worker.mjs', cpus().length);
const worker = createParallelWorker('ast-js', pool, {
threads: 1,
chunkSize: 10,
});
// Collect results from the async generator
const astJsResults = [];
for await (const chunk of astJs.generate(undefined, {
input: [sourceFile],
worker,
})) {
astJsResults.push(...chunk);
}
const actualOutput = await apiLinks.generate(astJsResults, {
gitRef: 'https://github.com/nodejs/node/tree/HEAD',
});
for (const [k, v] of Object.entries(actualOutput)) {
actualOutput[k] = v.replace(/.*(?=lib\/)/, '');
}
t.assert.snapshot(actualOutput);
});
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
There seems a bug on the generators, investigating. |
Description
This PR updates our generator implementation by allowing streamed processing for generators that support
processChunkby using Async Iterators (Async Generator) (yield) which allows generators to eagerly start processing data as soon as at least one chunk from one of its dependencies finishes processing the data. The streaming approach works both for our threaded and non-threaded model.flowchart TB subgraph Main["Main Thread"] G[Generator Pipeline] SC[Streaming Cache] end subgraph Pool["Shared WorkerPool"] Q[Task Queue] W1[Worker 1] W2[Worker 2] W3[Worker N...] end G -->|"schedules"| PW[ParallelWorker] PW -->|"chunks items"| Q Q -->|"distributes"| W1 Q -->|"distributes"| W2 Q -->|"distributes"| W3 W1 -->|"yields results"| PW W2 -->|"yields results"| PW W3 -->|"yields results"| PW PW -->|"streams chunks"| G G -->|"caches collected"| SCWorkerPool & Parallel Spawning
The
WorkerPoolclass (src/threading/index.mjs) manages a shared pool of reusable Node.js worker threads. Workers are spawned in parallel usingsetImmediate()to avoid sequential startup delays, then kept alive in anidleWorkersarray for reuse across tasks. The pool uses a task queue withprocessQueue()distributing work to available workers, and a single pool instance is shared across all generators viasharedPoolingenerators.mjsto minimize overhead.Chunk Distribution & Streaming Results
The
createParallelWorkerfunction (src/threading/parallel.mjs) splits input items into chunks usingcreateIndexChunks(), with automatic size optimization viaoptimalChunkSize = Math.ceil(itemCount / threads). Chunks are distributed to the pool and results are yielded as-completed throughyieldAsCompleted(), an async generator that races pending promises and yields each result immediately. For single-threaded mode (threads <= 1), processing falls back to the main thread by callingprocessChunkdirectly.Streaming Cache & Dependency Resolution
The
createStreamingCache(src/streaming.mjs) ensures that when multiple generators depend on the same async generator source,collectAsyncGenerator()runs only once and all dependents share the cached result viagetOrCollect(). Generators implementprocessChunk(fullInput, itemIndices, options)for worker-side processing andasync *generate()for main-thread orchestration, yielding chunks as they complete to enable pipeline parallelism throughout the generator dependency chain.Logger Enhancements
The logger (
src/logger/logger.mjs) now tracks child loggers in achildrenSet, withsetLogLevel()propagating level changes to all descendants automatically. The console transport (src/logger/transports/console.mjs) displays metadata objects inline in magenta usingstyleText('magenta', JSON.stringify(metadata)), providing better visibility into structured log data during debugging.