Skip to content

Conversation

@ovflowd
Copy link
Member

@ovflowd ovflowd commented Dec 7, 2025

Description

This PR updates our generator implementation by allowing streamed processing for generators that support processChunk by using Async Iterators (Async Generator) (yield) which allows generators to eagerly start processing data as soon as at least one chunk from one of its dependencies finishes processing the data. The streaming approach works both for our threaded and non-threaded model.

flowchart TB
    subgraph Main["Main Thread"]
        G[Generator Pipeline]
        SC[Streaming Cache]
    end
    
    subgraph Pool["Shared WorkerPool"]
        Q[Task Queue]
        W1[Worker 1]
        W2[Worker 2]
        W3[Worker N...]
    end
    
    G -->|"schedules"| PW[ParallelWorker]
    PW -->|"chunks items"| Q
    Q -->|"distributes"| W1
    Q -->|"distributes"| W2
    Q -->|"distributes"| W3
    
    W1 -->|"yields results"| PW
    W2 -->|"yields results"| PW
    W3 -->|"yields results"| PW
    
    PW -->|"streams chunks"| G
    G -->|"caches collected"| SC
Loading

WorkerPool & Parallel Spawning

The WorkerPool class (src/threading/index.mjs) manages a shared pool of reusable Node.js worker threads. Workers are spawned in parallel using setImmediate() to avoid sequential startup delays, then kept alive in an idleWorkers array for reuse across tasks. The pool uses a task queue with processQueue() distributing work to available workers, and a single pool instance is shared across all generators via sharedPool in generators.mjs to minimize overhead.

Chunk Distribution & Streaming Results

The createParallelWorker function (src/threading/parallel.mjs) splits input items into chunks using createIndexChunks(), with automatic size optimization via optimalChunkSize = Math.ceil(itemCount / threads). Chunks are distributed to the pool and results are yielded as-completed through yieldAsCompleted(), an async generator that races pending promises and yields each result immediately. For single-threaded mode (threads <= 1), processing falls back to the main thread by calling processChunk directly.

Streaming Cache & Dependency Resolution

The createStreamingCache (src/streaming.mjs) ensures that when multiple generators depend on the same async generator source, collectAsyncGenerator() runs only once and all dependents share the cached result via getOrCollect(). Generators implement processChunk(fullInput, itemIndices, options) for worker-side processing and async *generate() for main-thread orchestration, yielding chunks as they complete to enable pipeline parallelism throughout the generator dependency chain.

Logger Enhancements

The logger (src/logger/logger.mjs) now tracks child loggers in a children Set, with setLogLevel() propagating level changes to all descendants automatically. The console transport (src/logger/transports/console.mjs) displays metadata objects inline in magenta using styleText('magenta', JSON.stringify(metadata)), providing better visibility into structured log data during debugging.

@ovflowd ovflowd requested a review from a team as a code owner December 7, 2025 17:43
Copilot AI review requested due to automatic review settings December 7, 2025 17:43
@vercel
Copy link

vercel bot commented Dec 7, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Updated (UTC)
api-docs-tooling Ready Ready Preview Dec 8, 2025 6:58pm

@codecov
Copy link

codecov bot commented Dec 7, 2025

Codecov Report

❌ Patch coverage is 91.85434% with 170 lines in your changes missing coverage. Please review.
✅ Project coverage is 81.26%. Comparing base (26d5760) to head (2c6a8ac).
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
src/generators/legacy-html-all/index.mjs 7.14% 26 Missing ⚠️
...rators/legacy-html/utils/replaceTemplateValues.mjs 50.94% 26 Missing ⚠️
bin/cli.mjs 0.00% 17 Missing ⚠️
src/threading/index.mjs 91.70% 17 Missing ⚠️
...rc/generators/jsx-ast/utils/getSortedHeadNodes.mjs 55.55% 16 Missing ⚠️
src/generators/legacy-html/index.mjs 48.38% 16 Missing ⚠️
src/generators/legacy-json/index.mjs 52.00% 12 Missing ⚠️
src/__tests__/streaming.test.mjs 97.41% 6 Missing ⚠️
src/generators/jsx-ast/index.mjs 64.28% 5 Missing ⚠️
src/generators/legacy-json-all/index.mjs 64.28% 5 Missing ⚠️
... and 8 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #509      +/-   ##
==========================================
+ Coverage   75.71%   81.26%   +5.54%     
==========================================
  Files         115      120       +5     
  Lines       11181    12865    +1684     
  Branches      756      893     +137     
==========================================
+ Hits         8466    10455    +1989     
+ Misses       2711     2407     -304     
+ Partials        4        3       -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements chunked processing with streaming capabilities using async generators, enabling pipeline parallelism where downstream generators can begin processing results as soon as chunks complete from their dependencies, rather than waiting for all processing to finish.

Key Changes:

  • Introduces streaming utilities for async chunk processing
  • Adds stream() method to parallel workers alongside existing map() method
  • Converts all generator generate() functions to async generators that yield chunks

Reviewed changes

Copilot reviewed 14 out of 15 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
src/streaming.mjs New utility module providing streaming helpers including yieldAsCompleted, collectAsyncGenerator, and caching mechanisms
src/threading/parallel.mjs Adds stream() method for chunk-by-chunk yielding; refactors utility functions to shared module
src/generators.mjs Updates dependency resolution to handle async generators; adds streaming cache for collecting generator results
src/generators/metadata/index.mjs Converts to async generator pattern with streaming support
src/generators/legacy-json/index.mjs Converts to async generator pattern with streaming support
src/generators/legacy-json-all/index.mjs Adds processChunk() implementation; converts to async generator with chunk aggregation
src/generators/legacy-html/index.mjs Converts to async generator; extracts replaceTemplateValues to separate module
src/generators/legacy-html-all/index.mjs Adds processChunk() implementation; converts to async generator; uses extracted replaceTemplateValues
src/generators/legacy-html/utils/replaceTemplateValues.mjs New utility module extracted from legacy-html generator for template value replacement
src/generators/web/index.mjs Adds processChunk() pass-through; converts to async generator while maintaining batch processing
src/generators/jsx-ast/index.mjs Converts to async generator; extracts getSortedHeadNodes to separate module
src/generators/jsx-ast/utils/getSortedHeadNodes.mjs New utility module extracted from jsx-ast generator for sorting head nodes
src/generators/ast-js/index.mjs Converts to async generator with streaming support
src/generators/api-links/__tests__/fixtures.test.mjs Updates test to consume async generator results
bin/commands/generate.mjs Increases default chunk size from 10 to 20 items per worker thread

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@ovflowd ovflowd marked this pull request as draft December 7, 2025 19:04
@ovflowd
Copy link
Member Author

ovflowd commented Dec 7, 2025

I want to revise a few things before this PR is ready.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 29 out of 33 changed files in this pull request and generated 10 comments.

Comments suppressed due to low confidence (1)

src/generators/api-links/tests/fixtures.test.mjs:48

  • The test creates a WorkerPool but never terminates it. This can leave worker threads running after the test completes, potentially causing resource leaks or test hangs. Add await pool.terminate(); after line 47 (before the closing brace of the test).
        const pool = new WorkerPool('../chunk-worker.mjs', cpus().length);

        const worker = createParallelWorker('ast-js', pool, {
          threads: 1,
          chunkSize: 10,
        });

        // Collect results from the async generator
        const astJsResults = [];

        for await (const chunk of astJs.generate(undefined, {
          input: [sourceFile],
          worker,
        })) {
          astJsResults.push(...chunk);
        }

        const actualOutput = await apiLinks.generate(astJsResults, {
          gitRef: 'https://github.com/nodejs/node/tree/HEAD',
        });

        for (const [k, v] of Object.entries(actualOutput)) {
          actualOutput[k] = v.replace(/.*(?=lib\/)/, '');
        }

        t.assert.snapshot(actualOutput);
      });

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@ovflowd ovflowd marked this pull request as draft December 8, 2025 18:26
@ovflowd
Copy link
Member Author

ovflowd commented Dec 8, 2025

There seems a bug on the generators, investigating.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants