Skip to content

Conversation

@taras
Copy link
Member

@taras taras commented Jan 31, 2026

Motivation

The current @effectionx/worker package only supports host-to-worker communication: the host can send messages to workers and receive responses. However, some use cases require the reverse direction - workers initiating requests to the host.

A concrete example is the sweatpants project's web worker transport for MCP tool sessions. In this architecture:

  • Tools run in web workers (the "Principal" that initiates requests)
  • The host responds to tool requests (the "Operative")

This is the opposite of the current pattern where the host is always the initiator.

What's New

Progress Streaming Support

This PR now includes bidirectional progress streaming - the host can send progress updates back to the worker during request processing:

Worker side (receive progress):

await workerMain<never, never, string, void, string, string>(
  function* ({ send }) {
    // Simple request/response (unchanged)
    const response = yield* send("hello");
    
    // With progress streaming
    const subscription = yield* send.stream<number>("process");
    let next = yield* subscription.next();
    while (!next.done) {
      console.log("Progress:", next.value);  // 25, 50, 75
      next = yield* subscription.next();
    }
    const response = next.value;  // "done"
  }
);

Host side (send progress):

yield* worker.forEach<string, string, number>(function* (request, ctx) {
  yield* ctx.progress(25);
  yield* ctx.progress(50);
  yield* ctx.progress(75);
  return "done";
});

Backpressure Semantics

The progress() method implements true backpressure:

  • ctx.progress() blocks until the worker calls subscription.next()
  • The host cannot send progress faster than the worker can receive it
  • If the worker does async work between next() calls, the host remains blocked

This ensures the worker is never overwhelmed with progress updates. The ACK is sent inside subscription.next(), so the host waits for the worker to be ready for the next value.

Key Features

  • True backpressure: Host blocks until worker is ready for next progress
  • Backwards compatible: Existing code without progress continues to work
  • Type-safe: Progress type is a separate type parameter

Approach

This PR adds symmetric bidirectional communication by introducing:

New Worker-Side API: send() function

Workers receive a send function in their workerMain options:

await workerMain<never, never, string, void, string, string>(
  function* ({ send }) {
    const response = yield* send("hello");  // Worker initiates request
    return `received: ${response}`;
  }
);

New Host-Side API: worker.forEach() method

Hosts can handle worker requests using forEach:

const worker = yield* useWorker<never, never, string, void>(workerUrl, opts);

const result = yield* worker.forEach<string, string>(function* (request) {
  return `echo: ${request}`;  // Host handles worker's request
});

Channel Primitives

Introduces reusable channel primitives for request-response over MessageChannel:

  • useChannelResponse<TResponse, TProgress>(options?) - Requester side

    • Creates MessageChannel, returns port to transfer and operation to await response
    • yield* response - waits for response, ignores progress
    • yield* response.progress - returns Subscription that yields progress then response
    • Races message vs close events (detects responder crash)
    • Optional timeout via @effectionx/timebox
    • Returns SerializedResult<T> for type-safe error handling
  • useChannelRequest<TResponse, TProgress>(port) - Responder side

    • Wraps received port with resolve(value), reject(error), and progress(data) operations
    • progress() sends update and waits for ACK (true backpressure)
    • Handles serialization internally (wraps in SerializedResult)
    • Races ACK vs close events (detects requester cancellation)

Wire Protocol

Messages sent over channel (Host → Worker):

| { type: "progress"; data: TProgress }
| { type: "response"; result: SerializedResult<TResponse> }

Acknowledgements (Worker → Host):

| { type: "ack" }           // After response
| { type: "progress_ack" }  // After each progress (sent inside next())

Key Design Decisions

  1. MessageChannel per request - Each worker request creates a MessageChannel for correlation (no IDs needed)
  2. SerializedResult type - Type-safe { ok: true, value } or { ok: false, error: SerializedError } for cross-boundary communication
  3. Error serialization - Errors are serialized as { name, message, stack } and wrapped with cause on the receiving side
  4. ACK mechanism - Responses require acknowledgment before cleanup, guaranteeing delivery
  5. Close detection - Both sides detect if the other crashes/cancels via MessagePort close events
  6. Timeout support - Optional timeout parameter on useChannelResponse using @effectionx/timebox
  7. Request queueing - Requests sent before forEach() is called are queued (unbounded)
  8. Concurrent forEach guard - Only one forEach() call allowed at a time
  9. Message loop aligned with resource lifecycle - Uses a pre-attached subscription, resolves the initial "open" via withResolvers, and ensures outcome settles during teardown if the close message arrives late
  10. True backpressure - Progress ACK is sent inside next(), so host waits for worker readiness

Type Parameter Naming

  • TSend/TRecv - Host-initiated communication (host sends, worker receives/responds)
  • WRequest/WResponse - Worker-initiated communication (worker sends, host receives/responds)
  • WProgress - Progress updates from host to worker

Tests

All 54 worker tests pass (4 skipped):

Channel primitive tests (32 tests):

  • Basic channel creation and round-trip
  • ACK verification
  • Close detection (responder crashes/exits)
  • Timeout behavior (slow responder, fast responder)
  • Cancellation handling (requester cancels, responder exits)
  • Progress streaming tests (15 tests):
    • Receives multiple progress then response
    • yield* response ignores progress
    • Error response after progress
    • Port close during progress
    • Backpressure verification (host blocks until worker ready)
    • Order preservation
    • Complex progress data
    • Zero progress updates
    • Requester cancellation during progress
    • Timeout applies to entire exchange

Worker-initiated request tests (15 tests):

  • Single request handling
  • Multiple sequential requests
  • Concurrent requests from worker
  • Error propagation with cause chain
  • Bidirectional communication
  • Request queueing before forEach
  • Concurrent forEach guard
  • Nested send() inside messages.forEach handler

Policy Compliance

Tests comply with the No-Sleep Test Synchronization Policy:

  • Uses withResolvers() for callback synchronization
  • Uses sleep(0) only for yielding control
  • sleep(ms) only inside spawned tasks to trigger conditions

Related Issues

@pkg-pr-new
Copy link

pkg-pr-new bot commented Jan 31, 2026

Open in StackBlitz

npm i https://pkg.pr.new/thefrontside/effectionx/@effectionx/worker@138

commit: 899bedd

@taras taras force-pushed the feature/worker-initiated-requests branch from 5157da1 to 2ae43a1 Compare January 31, 2026 19:19
@taras taras requested a review from cowboyd January 31, 2026 20:05
@taras taras force-pushed the feature/worker-initiated-requests branch from f437a8a to 3ba18eb Compare February 1, 2026 15:28
taras added 19 commits February 1, 2026 10:31
Add 14 new test cases and 9 test asset workers for the upcoming
worker-initiated requests feature. This allows workers to send
requests to the host, enabling bidirectional communication.

Tests are written first (TDD) and will fail until implementation
is complete. The PLAN-worker-requests.md contains the detailed
implementation plan with all design decisions documented.
Implements the worker→host request/response pattern allowing workers
to send requests to the host and receive responses. This enables use
cases like tool sessions where workers need to call back to the host.

Changes:
- Add send() function to workerMain options for worker→host requests
- Add forEach() method to WorkerResource for host to handle requests
- Implement error serialization with cause chain for cross-boundary errors
- Use race() pattern so forEach exits cleanly when worker closes
- Add concurrent forEach guard with try/finally cleanup
- Fix bidirectional worker test to spawn messages.forEach in background

All 14 new tests pass along with existing tests.
Verifies that workers can call send() to the host while handling
a message from the host inside messages.forEach. This tests the
nested bidirectional communication pattern.
Clarifies that these type parameters are for worker-initiated requests,
distinguishing them from the host-initiated TSend/TRecv types.

Also replaces race() with spawn() for the request processor loop to
improve compatibility with effection 3.0.0.
Ensures compatibility with effection 3.0.0 on macOS by explicitly
halting the request processor task when the outcome resolves, rather
than relying on implicit scope cleanup.
Replace signal/each pattern with a callback-based approach that avoids
the race condition in effection v3. Instead of spawning a processor
task that blocks on a signal iterator, we now:

1. Queue requests that arrive before forEach is called
2. Set a handler callback when forEach is active
3. Dispatch requests via scope.run() from the message handler
4. Clear the handler when forEach exits

This eliminates the need to halt a blocking iterator, which had
inconsistent behavior between effection v3 and v4.
Add two complementary channel primitives for request-response communication:

- useChannelResponse: Requester side - creates MessageChannel, returns port
  to transfer and operation to await response (with automatic ACK)
- useChannelRequest: Responder side - wraps received port, provides
  resolve/reject operations that wait for ACK before closing

Features:
- ACK mechanism guarantees response delivery before port cleanup
- Simple validation throws on unexpected messages
- Proper resource cleanup with finally blocks
- Full test coverage (9 tests)
Plan for integrating useChannelResponse/useChannelRequest into worker
implementation. Includes prerequisite fix for cancellation handling:
- Race ACK wait against port close event
- Prevents responder from hanging when requester is cancelled
- Add SerializedResult<T> type to fix error typing mismatch
- Clarify that ACK is sent for both Ok and Err responses
- Update all code examples to use SerializedOk/SerializedErr
- Note that resolve() is used for both success/error (reject is for Operation errors)
Explicitly document that useChannelResponse and useChannelRequest
handle port.start() and port.close() internally, so removing these
calls from worker code is intentional, not a regression.
Add tests to plan for:
- ACK sent/received on error path (not just success)
- Port closes if responder exits without calling resolve/reject
- Port closes if responder throws before responding
…internally

- resolve(value) wraps in { ok: true, value } internally
- reject(error) serializes and wraps in { ok: false, error } internally
- Callers use natural resolve/reject semantics
- Channel primitive handles SerializedResult wrapping
- Remove SerializedOk/SerializedErr helper functions from exports
- ChannelResponse.operation returns SerializedResult<T>
- Use withResolvers for synchronization instead of sleep
- Add tests for requester cancellation while waiting
- Add tests for requester scope exit without calling operation
- All tests use explicit signals for coordination
- Add SerializedResult<T> type for type-safe cross-boundary error handling
- Update useChannelResponse with close detection and optional timeout
- Update useChannelRequest to wrap responses in SerializedResult internally
- Replace manual MessageChannel usage in worker.ts and worker-main.ts
- Add @effectionx/timebox dependency for timeout support
- Add comprehensive tests for close detection, timeout, and cancellation
- Add .opencode/plans/ to .gitignore
- Remove redundant port.close() from resolve/reject (finally handles it)
- Document reject() as application-level error, not transport error
- Document close event runtime behavior across Node.js/browser/Deno
- Refactor fragile cancellation test to use raw postMessage for precise signaling
Make ChannelResponse<T> extend Operation<SerializedResult<T>> so callers
can yield the response object directly instead of using a separate
.operation property.

Before:
  const { port, operation } = yield* useChannelResponse<T>();
  const result = yield* operation;

After:
  const response = yield* useChannelResponse<T>();
  const result = yield* response;
@taras taras force-pushed the feature/worker-initiated-requests branch from 3ba18eb to f7e2e67 Compare February 1, 2026 15:36
taras added 6 commits February 1, 2026 10:41
Extend channel primitives to support bidirectional progress streaming:

- useChannelResponse: add 'progress' property returning Subscription
- useChannelRequest: add 'progress()' method with backpressure (ACK)
- forEach: pass ForEachContext with progress() to handler
- send.stream<TProgress>(): receive progress updates from host

Wire protocol extended with progress/progress_ack message types.
All tests comply with no-sleep-test-sync policy using withResolvers()
for deterministic synchronization.

Tests: 53 pass (14 new progress streaming tests)
- Add progress streaming section to README with examples
- Document true backpressure: host blocks until worker calls next()
- Add test proving backpressure behavior (host waits for worker readiness)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant