-
Notifications
You must be signed in to change notification settings - Fork 2
feat(worker): Add worker-initiated requests support #138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
taras
wants to merge
26
commits into
main
Choose a base branch
from
feature/worker-initiated-requests
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
commit: |
5157da1 to
2ae43a1
Compare
f437a8a to
3ba18eb
Compare
Add 14 new test cases and 9 test asset workers for the upcoming worker-initiated requests feature. This allows workers to send requests to the host, enabling bidirectional communication. Tests are written first (TDD) and will fail until implementation is complete. The PLAN-worker-requests.md contains the detailed implementation plan with all design decisions documented.
Implements the worker→host request/response pattern allowing workers to send requests to the host and receive responses. This enables use cases like tool sessions where workers need to call back to the host. Changes: - Add send() function to workerMain options for worker→host requests - Add forEach() method to WorkerResource for host to handle requests - Implement error serialization with cause chain for cross-boundary errors - Use race() pattern so forEach exits cleanly when worker closes - Add concurrent forEach guard with try/finally cleanup - Fix bidirectional worker test to spawn messages.forEach in background All 14 new tests pass along with existing tests.
Verifies that workers can call send() to the host while handling a message from the host inside messages.forEach. This tests the nested bidirectional communication pattern.
Clarifies that these type parameters are for worker-initiated requests, distinguishing them from the host-initiated TSend/TRecv types. Also replaces race() with spawn() for the request processor loop to improve compatibility with effection 3.0.0.
Ensures compatibility with effection 3.0.0 on macOS by explicitly halting the request processor task when the outcome resolves, rather than relying on implicit scope cleanup.
Replace signal/each pattern with a callback-based approach that avoids the race condition in effection v3. Instead of spawning a processor task that blocks on a signal iterator, we now: 1. Queue requests that arrive before forEach is called 2. Set a handler callback when forEach is active 3. Dispatch requests via scope.run() from the message handler 4. Clear the handler when forEach exits This eliminates the need to halt a blocking iterator, which had inconsistent behavior between effection v3 and v4.
Add two complementary channel primitives for request-response communication: - useChannelResponse: Requester side - creates MessageChannel, returns port to transfer and operation to await response (with automatic ACK) - useChannelRequest: Responder side - wraps received port, provides resolve/reject operations that wait for ACK before closing Features: - ACK mechanism guarantees response delivery before port cleanup - Simple validation throws on unexpected messages - Proper resource cleanup with finally blocks - Full test coverage (9 tests)
Plan for integrating useChannelResponse/useChannelRequest into worker implementation. Includes prerequisite fix for cancellation handling: - Race ACK wait against port close event - Prevents responder from hanging when requester is cancelled
- Add SerializedResult<T> type to fix error typing mismatch - Clarify that ACK is sent for both Ok and Err responses - Update all code examples to use SerializedOk/SerializedErr - Note that resolve() is used for both success/error (reject is for Operation errors)
Explicitly document that useChannelResponse and useChannelRequest handle port.start() and port.close() internally, so removing these calls from worker code is intentional, not a regression.
Add tests to plan for: - ACK sent/received on error path (not just success) - Port closes if responder exits without calling resolve/reject - Port closes if responder throws before responding
…internally
- resolve(value) wraps in { ok: true, value } internally
- reject(error) serializes and wraps in { ok: false, error } internally
- Callers use natural resolve/reject semantics
- Channel primitive handles SerializedResult wrapping
- Remove SerializedOk/SerializedErr helper functions from exports
- ChannelResponse.operation returns SerializedResult<T>
- Use withResolvers for synchronization instead of sleep - Add tests for requester cancellation while waiting - Add tests for requester scope exit without calling operation - All tests use explicit signals for coordination
- Add SerializedResult<T> type for type-safe cross-boundary error handling - Update useChannelResponse with close detection and optional timeout - Update useChannelRequest to wrap responses in SerializedResult internally - Replace manual MessageChannel usage in worker.ts and worker-main.ts - Add @effectionx/timebox dependency for timeout support - Add comprehensive tests for close detection, timeout, and cancellation - Add .opencode/plans/ to .gitignore
- Remove redundant port.close() from resolve/reject (finally handles it) - Document reject() as application-level error, not transport error - Document close event runtime behavior across Node.js/browser/Deno - Refactor fragile cancellation test to use raw postMessage for precise signaling
Make ChannelResponse<T> extend Operation<SerializedResult<T>> so callers
can yield the response object directly instead of using a separate
.operation property.
Before:
const { port, operation } = yield* useChannelResponse<T>();
const result = yield* operation;
After:
const response = yield* useChannelResponse<T>();
const result = yield* response;
3ba18eb to
f7e2e67
Compare
Extend channel primitives to support bidirectional progress streaming: - useChannelResponse: add 'progress' property returning Subscription - useChannelRequest: add 'progress()' method with backpressure (ACK) - forEach: pass ForEachContext with progress() to handler - send.stream<TProgress>(): receive progress updates from host Wire protocol extended with progress/progress_ack message types. All tests comply with no-sleep-test-sync policy using withResolvers() for deterministic synchronization. Tests: 53 pass (14 new progress streaming tests)
- Add progress streaming section to README with examples - Document true backpressure: host blocks until worker calls next() - Add test proving backpressure behavior (host waits for worker readiness)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Motivation
The current
@effectionx/workerpackage only supports host-to-worker communication: the host can send messages to workers and receive responses. However, some use cases require the reverse direction - workers initiating requests to the host.A concrete example is the sweatpants project's web worker transport for MCP tool sessions. In this architecture:
This is the opposite of the current pattern where the host is always the initiator.
What's New
Progress Streaming Support
This PR now includes bidirectional progress streaming - the host can send progress updates back to the worker during request processing:
Worker side (receive progress):
Host side (send progress):
Backpressure Semantics
The
progress()method implements true backpressure:ctx.progress()blocks until the worker callssubscription.next()next()calls, the host remains blockedThis ensures the worker is never overwhelmed with progress updates. The ACK is sent inside
subscription.next(), so the host waits for the worker to be ready for the next value.Key Features
Approach
This PR adds symmetric bidirectional communication by introducing:
New Worker-Side API:
send()functionWorkers receive a
sendfunction in theirworkerMainoptions:New Host-Side API:
worker.forEach()methodHosts can handle worker requests using
forEach:Channel Primitives
Introduces reusable channel primitives for request-response over MessageChannel:
useChannelResponse<TResponse, TProgress>(options?)- Requester sideyield* response- waits for response, ignores progressyield* response.progress- returns Subscription that yields progress then response@effectionx/timeboxSerializedResult<T>for type-safe error handlinguseChannelRequest<TResponse, TProgress>(port)- Responder sideresolve(value),reject(error), andprogress(data)operationsprogress()sends update and waits for ACK (true backpressure)SerializedResult)Wire Protocol
Messages sent over channel (Host → Worker):
Acknowledgements (Worker → Host):
Key Design Decisions
{ ok: true, value }or{ ok: false, error: SerializedError }for cross-boundary communication{ name, message, stack }and wrapped withcauseon the receiving sideuseChannelResponseusing@effectionx/timeboxforEach()is called are queued (unbounded)forEach()call allowed at a time"open"viawithResolvers, and ensures outcome settles during teardown if the close message arrives latenext(), so host waits for worker readinessType Parameter Naming
TSend/TRecv- Host-initiated communication (host sends, worker receives/responds)WRequest/WResponse- Worker-initiated communication (worker sends, host receives/responds)WProgress- Progress updates from host to workerTests
All 54 worker tests pass (4 skipped):
Channel primitive tests (32 tests):
yield* responseignores progressWorker-initiated request tests (15 tests):
Policy Compliance
Tests comply with the No-Sleep Test Synchronization Policy:
withResolvers()for callback synchronizationsleep(0)only for yielding controlsleep(ms)only inside spawned tasks to trigger conditionsRelated Issues