feat: add @effectionx/durable-streams - Durable generator based workflows with Durable Streams and structured concurrency#179
feat: add @effectionx/durable-streams - Durable generator based workflows with Durable Streams and structured concurrency#179
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds the Changes
Sequence Diagram(s)sequenceDiagram
actor User
participant durableRun as durableRun
participant Stream as DurableStream
participant Index as ReplayIndex
participant Guard as ReplayGuard
participant Effect as DurableEffect
participant Workflow as Workflow
User->>durableRun: durableRun(workflow, {stream})
durableRun->>Stream: readAll()
Stream-->>durableRun: events[]
durableRun->>Index: build ReplayIndex(events)
durableRun->>Guard: run check(event...) (pre-workflow I/O)
durableRun->>Workflow: start execution
Workflow->>Effect: yield DurableEffect
alt replayable
Effect->>Index: peekYield(coroutineId)
Index-->>Effect: cached Yield
Effect->>Guard: decide(cached Yield)
Guard-->>Effect: {replay} or {error}
alt replay
Effect-->>Workflow: resolve with cached result
else error
Effect->>Workflow: take live path
end
else live
Effect->>Workflow: execute operation
Workflow-->>Effect: result
Effect->>Stream: append Yield(event)
Stream-->>Effect: persisted
Effect-->>Workflow: resolve
end
Workflow-->>durableRun: completes
durableRun->>Stream: append Close(ok|err)
Stream-->>durableRun: persisted
durableRun-->>User: return/throw
sequenceDiagram
participant Caller
participant durableAll as durableAll
participant runChild as runDurableChild
participant ChildA as Child A
participant ChildB as Child B
participant Journal as DurableStream
Caller->>durableAll: durableAll([A,B])
durableAll->>runChild: spawn child A (id root.0)
durableAll->>runChild: spawn child B (id root.1)
runChild->>ChildA: execute with DurableCtx
runChild->>ChildB: execute with DurableCtx
par Child executions
ChildA->>Journal: append Yield/Close for root.0
ChildB->>Journal: append Yield/Close for root.1
and
ChildA-->>runChild: result A
ChildB-->>runChild: result B
end
runChild-->>durableAll: child results
durableAll-->>Caller: aggregated [A,B]
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested reviewers
Important Pre-merge checks failedPlease resolve all errors before merging. Addressing warnings is optional. ❌ Failed checks (1 error, 1 warning)
✅ Passed checks (2 passed)
✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
commit: |
There was a problem hiding this comment.
Actionable comments posted: 24
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@durable-streams/combinators.ts`:
- Around line 60-84: The fast-path that returns a cached Close from replayIndex
via replayIndex.hasClose/getClose bypasses the replay guard decision logic, so
before returning the stored result (or throwing/desuspending) call the same
guard check used during replay (e.g., invoke checkReplay()/ReplayGuard.decide()
or the same check path runCheckPhase() uses) to validate the cached Yield/Close
events; specifically, in the early-return branch around
replayIndex.hasClose(childId) (and before using closeEvent.result) run the guard
decision and if it vetoes, fall through to normal replay handling (do not reuse
the cached Close), otherwise proceed to return/throw/suspend as currently
implemented.
In `@durable-streams/divergence-api.test.ts`:
- Around line 151-183: The test only exercises the middleware-enabled scope; add
a second run on a fresh scope that does not call scope.around(Divergence, ...)
and assert it fails with a DivergenceError to ensure middleware does not leak.
Specifically: allocate a new scope via yield* useScope() (e.g., scope2), do NOT
call scope2.around(...), run durableRun (using the same makeEvents() stream or a
fresh InMemoryStream(makeEvents())) and assert the run throws or rejects with
DivergenceError (referencing DivergenceError) rather than succeeding; keep the
existing first run (scope1 with scope1.around) unchanged so both behaviors are
verified.
- Around line 70-82: The test currently only constructs
ContinuePastCloseDivergenceError directly; instead drive a real replay through
the runtime to hit the continue-after-close path and assert the runtime throws
that error type. Build a minimal recorded journal that records a close at the
expected yieldCount (e.g., close after 2 yields) but then invoke the module's
replay/run API so the generator actually continues yielding; call the runtime
function used elsewhere in tests (the replay/run function that consumes
journals) and wrap the call in expect(...).toThrow/try-catch to assert the
thrown error is an instance of ContinuePastCloseDivergenceError and has the
expected properties (coroutineId, yieldCount, message). Ensure the test keeps
the original description and replaces the new ContinuePastCloseDivergenceError
instantiation assertions with the runtime-driven assertions.
In `@durable-streams/each.ts`:
- Around line 81-82: The module-global activeState (typed
DurableEachState<Json>) must be made scope-local so concurrent
spawn()/durableRun() calls or child scopes don’t collide; replace the top-level
activeState singleton with state resolved from the current scope/coroutine
(e.g., store state on the current Effection context or pass it through
durableEach/durableRun spawn calls) and update durableEach.next() (and related
logic referenced by the DurableEachState usage at the other locations) to look
up/receive the scope-owned state instead of reading the global variable; ensure
all places that set/check activeState (including the sections referenced around
the other usages) are updated to use the per-scope retrieval mechanism so
nesting and child-scope behavior work correctly.
In `@durable-streams/effect.ts`:
- Around line 97-101: The comparison currently only checks
entry.description.type and .name, which misses other identity fields on
EffectDescription; update the check to perform a full deep-equality comparison
between entry.description and the current desc (e.g., use a canonical serializer
or deepEqual) so any path/URL/version/metadata differences cause divergence, and
ensure ReplayGuard (where entry.description is passed) is given or compares
against the current desc rather than only entry.description so both sides are
visible for the check.
- Around line 213-267: persistAndResolve can run multiple times (executor calls,
throws, or post-teardown callbacks) causing duplicate yields and scope.run after
cancellation; add a local boolean guard (e.g., let settled = false) and
early-return from persistAndResolve if settled, then set settled = true the
first time you proceed (before calling routine.scope.run or resolving), ensure
the teardown wrapper also sets settled = true before calling the underlying
teardown() so late callbacks are ignored, and initialize teardown safely (noop
fallback) so execute assignment and subsequent calls are robust; update
references to persistAndResolve, teardown, execute, routine.scope.run, and
ctx.stream.append accordingly.
In `@durable-streams/errors.ts`:
- Around line 50-52: The two divergence subclasses override the name to the same
string which loses class identity; update EarlyReturnDivergenceError and
ContinuePastCloseDivergenceError so each sets a unique name (e.g.,
"EarlyReturnDivergenceError" and "ContinuePastCloseDivergenceError") instead of
"DivergenceError", ensuring error.name, stack traces and serialization reflect
the concrete subclass; locate the name assignments in the classes
(EarlyReturnDivergenceError and ContinuePastCloseDivergenceError) and change
them accordingly.
In `@durable-streams/file-guard.ts`:
- Around line 30-33: computeFileHash returns raw hex which mismatches callers
expecting the documented "sha256:..." shape and causes false StaleInputError;
change computeFileHash to normalize its output to the documented form (e.g.,
return `sha256:${hex}`) and ensure any comparison logic that checks stored
contentHash also normalizes inputs (or accepts both raw hex and prefixed forms)
so comparisons between computeFileHash and stored contentHash succeed; update
other related places in this file (the code that constructs/verifies contentHash
around the later block referenced) to use the same normalized "sha256:..."
format.
In `@durable-streams/http-stream.ts`:
- Around line 85-88: The code builds streamUrl by concatenating opts.baseUrl and
opts.streamId directly; change this to construct the URL with new URL() and
encodeURIComponent so streamId is always a single encoded path segment (avoid
path traversal or wrong endpoints). Locate the streamUrl construction (variable
streamUrl and the surrounding use of opts.baseUrl and opts.streamId) and replace
the concatenation with building a URL object from opts.baseUrl and appending `/`
+ encodeURIComponent(opts.streamId) (or using url.pathname adjustments) so
special chars like '/', '?', '#', and '..' are encoded.
In `@durable-streams/operations.ts`:
- Around line 103-113: versionCheck currently returns the recorded version
without validating it against opts.minVersion/opts.maxVersion so old journals
can replay unsupported versions; update the versionCheck generator (the function
named versionCheck which calls createDurableEffect with type "version_gate") to
inspect the persisted value returned by the durable effect and enforce that
opts.minVersion <= value <= opts.maxVersion, and if the persisted value is out
of range either throw a clear error (to stop replay) or resolve an explicit
failure status so callers know the version is unsupported; ensure the check is
applied whenever the effect result is read so raising minVersion later will
prevent accepting an old recorded version.
In `@durable-streams/package.json`:
- Around line 16-18: Update the package.json peerDependencies entry for
"effection" in the durable-streams package: replace the current version
constraint "^4" with a combined constraint that allows Effection v3 or v4 (use
"^3 || ^4") so it matches other monorepo packages and accepts both versions
where peerDependencies is defined.
In `@durable-streams/README.md`:
- Around line 477-479: The README example declares const stream twice which
causes a duplicate-declaration error; update the snippet to show the two
construction forms separately (either by using different variable names like
stream and prepopulatedStream, or by presenting them as two independent
examples) so the InMemoryStream constructor with and without existingEvents is
demonstrated without redeclaring the same const.
- Around line 95-106: Update the README wording to qualify the durability
guarantee: clarify that Workflow<T> only yields DurableEffect values at compile
time and that this guarantee holds unless the explicit escape hatch ephemeral()
is used to run non-durable Operations inside a Workflow; reference the Workflow
type and the ephemeral() function by name so readers understand the exception
and rephrase the sentence “if it compiles as a `Workflow`, it's durable” to
indicate “if it compiles as a `Workflow` and you do not use `ephemeral()`, it's
durable.”
- Around line 200-203: Update the README examples to use the actual package
import paths used by the codebase (replace imports from "@effection/effection"
with "effection" for symbols like run, durableRun, and useHttpDurableStream) and
replace any Deno-specific file reads (Deno.readTextFile) with Node equivalents
(e.g. fs.promises.readFile or require('fs').promises.readFile and decode to
string) so the examples run in Node; also fix the in-code example in
file-guard.ts (around the usage at line 72) to use the same Node fs API instead
of Deno.readTextFile.
In `@durable-streams/replay-index.test.ts`:
- Around line 17-51: The test helpers yieldEvent and closeEvent are forcing
value into undefined (value as undefined) due to a type mismatch with
DurableEvent.result.value; fix by updating the helpers to preserve the actual
value type instead of casting to undefined — make yieldEvent and closeEvent
generic (e.g., yieldEvent<T>(...): DurableEvent<T> and closeEvent<T>(...):
DurableEvent<T>) or otherwise type result.value as unknown|T so you pass value
as the correct type (value as T or value as unknown) and remove the incorrect
value as undefined casts; ensure the DurableEvent type signature aligns with the
generic so these helpers type-check without unsafe casts.
In `@durable-streams/replay-index.ts`:
- Around line 54-60: getClose currently returns stored close events even when a
coroutine is disabled, unlike peekYield() and hasClose(); update
getClose(coroutineId: CoroutineId) to first check this.disabled.has(coroutineId)
(or call isReplayDisabled) and return undefined if disabled, matching
hasClose/peekYield behavior, and apply the same guard to the analogous
close-accessor around lines 85-94 so disabled replay uniformly hides stored
close state.
In `@durable-streams/run.ts`:
- Around line 89-119: The early-return fast-path that checks
replayIndex.hasClose(coroutineId) bypasses ReplayGuard checks; fix by creating
the caller scope and installing DurableCtx before that fast-path and then
invoking the replay guard check before returning/throwing: yield* useScope() to
obtain scope, scope.set(DurableCtx, { replayIndex, stream, coroutineId,
childCounter: 0 }), then if replayIndex.hasClose(...) call yield*
runCheckPhase(events, scope) before reading replayIndex.getClose(...) and
returning the closeEvent result or throwing via deserializeError; this ensures
ReplayGuard.check()/decide() runs for full-replay completions.
- Around line 128-139: The current early-return check only compares the cursor
vs yieldCount for the single coroutineId and ignores any orphaned child
coroutines; to fix, extend the check to enumerate all recorded coroutine ids
from replayIndex (e.g., replayIndex.getAllCoroutineIds() or
replayIndex.listCoroutineIds()) and for each id that is a child of coroutineId
(id.startsWith(coroutineId + '.')) compare that child's
replayIndex.getCursor(childId) against replayIndex.yieldCount(childId) and throw
EarlyReturnDivergenceError(childId, cursor, totalYields) if cursor <
totalYields; keep the existing skip when
replayIndex.isReplayDisabled(coroutineId). Ensure you reference and use
replayIndex.getCursor, replayIndex.yieldCount and construct the error with the
specific child coroutine id.
- Around line 164-172: The code currently swallows errors from
stream.append(closeEvent) inside durableRun, which can leave the journal missing
the terminal event; change this so that when durableRun would otherwise succeed
any append failure is propagated (i.e., do not swallow errors from stream.append
in the success path), and when durableRun is already failing, do not drop the
append error but surface both errors (e.g., by attaching the append error as a
secondary/suppressed error or by combining messages) so callers see both the
original workflow error and the terminal-event persistence failure; update the
block around stream.append(closeEvent) in durableRun to rethrow or combine
errors rather than ignoring them.
In `@durable-streams/stream.ts`:
- Around line 75-79: The reset() helper currently resets events, appendCount,
and injectFailure but leaves the onAppend callback intact; update
InMemoryStream.reset() to also clear the onAppend property (set this.onAppend =
null or undefined) so that callbacks/side effects don't leak between tests —
look for the reset method on the InMemoryStream class and add clearing of
onAppend alongside the existing resets.
- Around line 50-76: The stream currently only copies the array shell so callers
can mutate stored DurableEvent objects; update constructor, readAll(), append(),
snapshot(), and reset() to clone DurableEvent objects when storing into or
returning from this.events so the stream keeps append-only immutable entries.
Specifically: in constructor and reset, push clones of initialEvents into
this.events; in append, call onAppend with a clone (or clone before calling) and
store a clone into this.events; in readAll and snapshot, return an array of
clones (not the original object references). Use the project's preferred
deep-copy utility (e.g., structuredClone or existing clone helper) to perform
cloning consistently.
In `@durable-streams/structured-concurrency.test.ts`:
- Around line 262-279: The test's race relies on immediate promise resolution
(durableCall("fast", tracker.fn("fast", ...)) vs the slow branch) causing
nondeterministic wins; change the test to use a controlled gate/deferred so the
intended winner (e.g., the generator labeled "fast") only resolves when you
explicitly release the gate, and make the losing branch (e.g.,
"slow-step1"/"slow-step2") wait on a separate gate so it remains blocked until
cancellation; update the durableRun/durableRace callers in this snippet (and the
similar cases around lines 306-320, 503-512, 551-563) to use the new
deferred/gate primitives to deterministically release the winner and verify
cancellation behavior rather than relying on scheduling order.
In `@package.json`:
- Around line 34-36: The overrides entry currently pins effection to
"4.1.0-alpha.7" (overrides -> "effection": "4.1.0-alpha.7"); create a tracking
issue to remove this override once effection@4.1.0 is released and record that
issue number next to the override so it’s visible to maintainers (e.g. add a
short non-functional note key or README note referencing the issue), and update
the PR/commit message to mention the tracking issue; ensure the change
references the overrides key and the effection version so future reviewers can
find and remove the override when the stable release lands.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 023fc537-e77f-4a21-9a77-cfe48e804ade
⛔ Files ignored due to path filters (1)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (36)
durable-streams/README.mddurable-streams/combinators.tsdurable-streams/context.tsdurable-streams/deterministic-id.test.tsdurable-streams/divergence-api.test.tsdurable-streams/divergence.test.tsdurable-streams/divergence.tsdurable-streams/durable-each.test.tsdurable-streams/durable-run.test.tsdurable-streams/each.tsdurable-streams/effect.tsdurable-streams/ephemeral.test.tsdurable-streams/ephemeral.tsdurable-streams/errors.tsdurable-streams/file-guard.tsdurable-streams/http-stream.tsdurable-streams/mod.tsdurable-streams/operations.tsdurable-streams/package.jsondurable-streams/replay-guard.test.tsdurable-streams/replay-guard.tsdurable-streams/replay-index.test.tsdurable-streams/replay-index.tsdurable-streams/run.tsdurable-streams/serialize.tsdurable-streams/smoke.test.tsdurable-streams/stream.tsdurable-streams/structured-concurrency.test.tsdurable-streams/tsconfig.jsondurable-streams/types.test.tsdurable-streams/types.tseffect-ts/effect-runtime.tseffect-ts/package.jsonpackage.jsonpnpm-workspace.yamltsconfig.json
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@durable-streams/demo/cook.ts`:
- Around line 178-185: The startup code is performing a redundant
stream.readAll() and logging before calling durableRun (which already calls
stream.readAll() internally); remove the pre-read block (the existing = yield*
stream.readAll() check and its log branches) and simply call return yield*
durableRun(cookDinner, { stream }); so only durableRun performs the journal
read/replay (or, if you need startup logging, move that logic into durableRun or
a single helper so there’s only one readAll call).
- Around line 191-193: The catch block currently assumes the thrown value is an
Error and logs (err as Error).message; change it to normalize unknown throw
values before logging in the catch that ends with process.exit(1) — detect if
err is an Error and use err.message, otherwise convert the thrown value to a
readable string (e.g., String(err) or JSON.stringify for objects) and include
that normalized value in the processLogger/console.error message so non-Error
throws (strings or plain objects) are not logged as "undefined".
In `@durable-streams/demo/server.ts`:
- Around line 20-23: The SIGINT handler calls server.stop() without awaiting,
which can abort async cleanup; update the process.on("SIGINT", ...) handler to
await or chain the Promise returned by DurableStreamTestServer.stop() (e.g.,
make the callback async and await server.stop() or call server.stop().then(() =>
process.exit(0))). Ensure you still call process.exit(0) only after
server.stop() resolves and handle rejection (catch and log error before
exiting).
In `@durable-streams/demo/start.sh`:
- Around line 91-92: Replace the unsafe global pkill with a pane-scoped kill
that targets the tmux pane's process tree: instead of running pkill -9 -f
'demo/cook.ts' via tmux send-keys for "${SESSION}:0.3", obtain that pane's PID
(tmux #{pane_pid}) and kill its child processes (e.g., using kill or pkill -P
against the pane PID) so only the cook pane and its children are terminated;
update the tmux send-keys invocation in start.sh (the line sending the kill to
"${SESSION}:0.3") and mirror the same safer pane-scoped command in the README.
In `@durable-streams/demo/tail.ts`:
- Around line 71-112: The polling loops currently catch all errors and swallow
them; update both loops (the "streamReady" startup loop and the main poll loop
around fetchStream) to inspect caught errors (e.g., DurableStreamError /
FetchError) and only treat NOT_FOUND/404 as the transient "stream not ready"
case to retry after POLL_MS, while rethrowing or logging and exiting for other
error codes/statuses such as UNAUTHORIZED/401, FORBIDDEN/403, or other
protocol/config errors; reference the fetchStream call, the streamReady flag,
lastOffset, and POLL_MS when making the change so non-404 failures are surfaced
instead of silently ignored.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 87628bc2-ef10-4e54-b6d8-9ec307bdd2b0
📒 Files selected for processing (5)
durable-streams/demo/README.mddurable-streams/demo/cook.tsdurable-streams/demo/server.tsdurable-streams/demo/start.shdurable-streams/demo/tail.ts
There was a problem hiding this comment.
Actionable comments posted: 5
♻️ Duplicate comments (6)
durable-streams/demo/tail.ts (1)
29-33: 🧹 Nitpick | 🔵 TrivialConsider using structured error types instead of string parsing.
The
isNotFoundErrorfunction parses error messages to detect 404s, which is fragile. The@durable-streams/clientlibrary providesDurableStreamErrorwith a structuredcodefield (e.g.,"NOT_FOUND") andFetchErrorwith astatusproperty. For a demo this works, but using the structured types would be more reliable.♻️ Suggested improvement
+import { DurableStreamError, FetchError } from "@durable-streams/client"; -function isNotFoundError(error: unknown): boolean { - const message = error instanceof Error ? error.message : String(error); - const upper = message.toUpperCase(); - return upper.includes("NOT_FOUND") || upper.includes("404"); +function isNotFoundError(error: unknown): boolean { + if (error instanceof DurableStreamError) { + return error.code === "NOT_FOUND"; + } + if (error instanceof FetchError) { + return error.status === 404; + } + return false; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durable-streams/demo/tail.ts` around lines 29 - 33, Replace fragile string parsing in isNotFoundError by checking structured error types: first test if error is a DurableStreamError (from `@durable-streams/client`) and return error.code === "NOT_FOUND"; next check for FetchError and return error.status === 404; finally keep the existing message-based fallback for unknown shapes. Update the function name isNotFoundError and its imports/refs to use DurableStreamError and FetchError type guards or instanceof checks so the logic is reliable and preserves backward compatibility.durable-streams/replay-index.ts (1)
122-125:⚠️ Potential issue | 🔴 CriticalDon't filter out closed coroutines in
firstUnconsumed().A previously recorded child usually has both
yieldandclose. Thethis.closes.has(coroutineId)skip makes those orphaned branches invisible, sodurableRun()can miss early-return divergence when new code stops spawning that child. Only replay-disabled coroutines should be excluded here.Minimal fix
for (const [coroutineId, entries] of this.yields.entries()) { if (this.disabled.has(coroutineId)) continue; - if (this.closes.has(coroutineId)) continue; const cursor = this.cursors.get(coroutineId) ?? 0; if (cursor < entries.length) { return { coroutineId, cursor, totalYields: entries.length }; } }As per coding guidelines,
.policies/correctness-invariants.md- Validate inputs; test success/error/halt paths.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durable-streams/replay-index.ts` around lines 122 - 125, In firstUnconsumed(), don't skip coroutineIds that are present in this.closes — only skip replay-disabled coroutines in this.disabled; remove the if (this.closes.has(coroutineId)) continue check so entries from previously closed children (this.yields keys) are considered when computing the earliest unconsumed cursor, ensuring durableRun() can detect early-return divergence; locate the loop over this.yields in the firstUnconsumed method and adjust the filtering accordingly.durable-streams/divergence-api.test.ts (1)
70-80:⚠️ Potential issue | 🟡 MinorDrive the continue-past-close case through
durableRun().This still only invokes
Divergencedirectly, so it can pass even if the runtime stops surfacingContinuePastCloseDivergenceErrorduring replay. Build a recorded journal with aclose, replay it throughdurableRun(), and assert the thrown type from that runtime path instead.As per coding guidelines,
.policies/correctness-invariants.md- Validate inputs; test success/error/halt paths.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durable-streams/divergence-api.test.ts` around lines 70 - 80, The test currently calls Divergence.invoke directly; change it to exercise the runtime path by building a recorded journal containing a close event and replaying it through durableRun (using useScope to get scope as before) so the runtime throws the error during replay; invoke durableRun with the recorded journal that includes the "close" entry, await the result and assert that durableRun rejects/throws with an instance of ContinuePastCloseDivergenceError (keep useScope, Divergence.invoke only for reference — replace the direct call with the durableRun replay), and validate both success/error/halt paths per correctness-invariants.durable-streams/run.ts (1)
107-119:⚠️ Potential issue | 🔴 CriticalThe root close fast-path still skips
ReplayGuard.decide().
runCheckPhase()now precomputes guard state, but this branch returns the storedClosebefore any recordedyieldis sent throughReplayGuard.decide(). Full replays therefore never compare cached observations against stored results, so guards likeuseFileContentGuard()still accept stale completed journals silently.As per coding guidelines,
.policies/correctness-invariants.md- Validate inputs; test success/error/halt paths.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durable-streams/run.ts` around lines 107 - 119, The fast-path that returns the stored Close when replayIndex.hasClose(coroutineId) is true skips guard validation; before returning or throwing based on replayIndex.getClose(coroutineId), ensure the stored Close is passed through the replay guard decision logic (i.e., invoke ReplayGuard.decide() or use the precomputed guard state from runCheckPhase()) so cached observations are validated against stored results; update the branch in run (the block using replayIndex.hasClose/getClose) to call the same guard/decision path used during full replay and only return the result if ReplayGuard.decide() permits it (otherwise act as a mismatch and re-run or throw accordingly).durable-streams/demo/cook.ts (1)
9-10:⚠️ Potential issue | 🟡 MinorUpdate the crash instruction to the pane-scoped kill flow.
Lines 9-10 still tell readers to run the global
pkill -9 -f "demo/cook.ts"pattern that was already removed from the launcher. Point users to the control pane indemo/start.sh(or another targeted kill) so the demo docs do not take down unrelated runs.As per coding guidelines,
.policies/documentation.md- Document lifecycle/cancellation semantics.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durable-streams/demo/cook.ts` around lines 9 - 10, Update the crash instruction text in demo/cook.ts (the lines describing how to kill the process) to stop recommending the global pkill -9 -f "demo/cook.ts" and instead point users to kill only the control pane in demo/start.sh (or another targeted pane/command) e.g., instruct them to use the control pane from demo/start.sh or a pane-scoped tmux/kill command; also ensure this lifecycle/cancellation change is reflected in the project documentation per .policies/documentation.md.durable-streams/operations.ts (1)
103-123: 🧹 Nitpick | 🔵 TrivialVersion range validation is now enforced — consider adding upfront range validation.
The fix correctly validates replayed versions against the supported range (lines 115-120). However, an invalid configuration like
{ minVersion: 5, maxVersion: 3 }would persist version 3 during live execution, then immediately throw because3 < 5. Adding an upfront check would provide a clearer error:💡 Optional: Add upfront range validation
export function* versionCheck( name: string, opts: { minVersion: number; maxVersion: number }, ): Workflow<number> { + if (opts.minVersion > opts.maxVersion) { + throw new Error( + `versionCheck("${name}"): minVersion (${opts.minVersion}) cannot exceed maxVersion (${opts.maxVersion})`, + ); + } + const version = (yield createDurableEffect<number>(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durable-streams/operations.ts` around lines 103 - 123, versionCheck currently validates the replayed version but doesn’t validate the provided range config (opts.minVersion <= opts.maxVersion) up front; add a guard at the start of the versionCheck function to immediately throw a clear error if opts.minVersion > opts.maxVersion (e.g., `throw new Error(...)`) so misconfigured ranges are caught before calling createDurableEffect and attempting to resolve a persisted version.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@durable-streams/file-guard.ts`:
- Around line 69-83: The example currently yields durableCall in the parent
workflow before durableRun installs the durable context; change it so the parent
Operation installs the guard via yield* useFileContentGuard() and then invokes
yield* durableRun(function* innerWorkflow() { /* perform durableCall here */ },
{ stream }) so that durableCall executes inside innerWorkflow under the
durableRun context; specifically move the durableCall invocation into the
innerWorkflow generator (reference durableCall, durableRun, useFileContentGuard,
innerWorkflow, workflow) and ensure the { stream } option is still passed to
durableRun.
In `@durable-streams/README.md`:
- Around line 64-73: Add the missing fence language specifier to the shown
Markdown code blocks: change the triple-backtick fences around the plain text
examples (the block starting with "[0] yield root ..." and the other block at
lines 237-242) to use ```text so they are lint-clean; update both fenced code
blocks in durable-streams/README.md to include the language marker.
- Around line 406-425: The example omits how the path is stored on the effect
description, so update the durableReadFile example to pass the path as effect
metadata so event.description.path is populated; specifically show calling
durableCall with the path included in the description metadata (eg. attach {
path } as the effect description or use the durableCall overload that accepts a
description object) so the journaled description contains { type: "call", name:
"readFile", path }, and reference durableReadFile, durableCall and
event.description.path (and the guard useFileContentGuard) in the explanation so
readers can reproduce the journal entry and the guard check.
- Around line 491-500: The example uses yield* with useHttpDurableStream but
isn't inside a generator; wrap the snippet in a generator context (e.g.,
run(function* () { ... }) or an async generator runner) so yield* inside the
body is valid—ensure the call to useHttpDurableStream stays unchanged and only
its surrounding context is replaced with a generator function containing the
shown import and const stream = yield* useHttpDurableStream(...) code.
In `@durable-streams/run.ts`:
- Around line 151-177: The code constructs a normalized Error as `primary` when
handling the catch for the workflow failure but then rethrows the original
`error`, causing live runs to differ from replayed `Close` events; change the
logic in the catch block so that after successfully appending `closeEvent` (via
`stream.append`) you rethrow the normalized `primary` Error (the same Error used
in the `Close` event) instead of `error`, keeping `Close`, `closeEvent`,
`primary`, `AggregateError`, and `stream.append` behavior intact.
---
Duplicate comments:
In `@durable-streams/demo/cook.ts`:
- Around line 9-10: Update the crash instruction text in demo/cook.ts (the lines
describing how to kill the process) to stop recommending the global pkill -9 -f
"demo/cook.ts" and instead point users to kill only the control pane in
demo/start.sh (or another targeted pane/command) e.g., instruct them to use the
control pane from demo/start.sh or a pane-scoped tmux/kill command; also ensure
this lifecycle/cancellation change is reflected in the project documentation per
.policies/documentation.md.
In `@durable-streams/demo/tail.ts`:
- Around line 29-33: Replace fragile string parsing in isNotFoundError by
checking structured error types: first test if error is a DurableStreamError
(from `@durable-streams/client`) and return error.code === "NOT_FOUND"; next check
for FetchError and return error.status === 404; finally keep the existing
message-based fallback for unknown shapes. Update the function name
isNotFoundError and its imports/refs to use DurableStreamError and FetchError
type guards or instanceof checks so the logic is reliable and preserves backward
compatibility.
In `@durable-streams/divergence-api.test.ts`:
- Around line 70-80: The test currently calls Divergence.invoke directly; change
it to exercise the runtime path by building a recorded journal containing a
close event and replaying it through durableRun (using useScope to get scope as
before) so the runtime throws the error during replay; invoke durableRun with
the recorded journal that includes the "close" entry, await the result and
assert that durableRun rejects/throws with an instance of
ContinuePastCloseDivergenceError (keep useScope, Divergence.invoke only for
reference — replace the direct call with the durableRun replay), and validate
both success/error/halt paths per correctness-invariants.
In `@durable-streams/operations.ts`:
- Around line 103-123: versionCheck currently validates the replayed version but
doesn’t validate the provided range config (opts.minVersion <= opts.maxVersion)
up front; add a guard at the start of the versionCheck function to immediately
throw a clear error if opts.minVersion > opts.maxVersion (e.g., `throw new
Error(...)`) so misconfigured ranges are caught before calling
createDurableEffect and attempting to resolve a persisted version.
In `@durable-streams/replay-index.ts`:
- Around line 122-125: In firstUnconsumed(), don't skip coroutineIds that are
present in this.closes — only skip replay-disabled coroutines in this.disabled;
remove the if (this.closes.has(coroutineId)) continue check so entries from
previously closed children (this.yields keys) are considered when computing the
earliest unconsumed cursor, ensuring durableRun() can detect early-return
divergence; locate the loop over this.yields in the firstUnconsumed method and
adjust the filtering accordingly.
In `@durable-streams/run.ts`:
- Around line 107-119: The fast-path that returns the stored Close when
replayIndex.hasClose(coroutineId) is true skips guard validation; before
returning or throwing based on replayIndex.getClose(coroutineId), ensure the
stored Close is passed through the replay guard decision logic (i.e., invoke
ReplayGuard.decide() or use the precomputed guard state from runCheckPhase()) so
cached observations are validated against stored results; update the branch in
run (the block using replayIndex.hasClose/getClose) to call the same
guard/decision path used during full replay and only return the result if
ReplayGuard.decide() permits it (otherwise act as a mismatch and re-run or throw
accordingly).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: c22e9bd1-289a-46c3-aa62-076d544076b4
📒 Files selected for processing (19)
durable-streams/README.mddurable-streams/demo/README.mddurable-streams/demo/cook.tsdurable-streams/demo/server.tsdurable-streams/demo/start.shdurable-streams/demo/tail.tsdurable-streams/divergence-api.test.tsdurable-streams/divergence.test.tsdurable-streams/effect.tsdurable-streams/errors.tsdurable-streams/file-guard.tsdurable-streams/http-stream.tsdurable-streams/operations.tsdurable-streams/package.jsondurable-streams/replay-index.test.tsdurable-streams/replay-index.tsdurable-streams/run.tsdurable-streams/stream.tsdurable-streams/tsconfig.json
durable-streams/file-guard.ts
Outdated
| * ```ts | ||
| * function* workflow(): Operation<void> { | ||
| * // Install the guard — children inherit it | ||
| * yield* useFileContentGuard(); | ||
| * | ||
| * // Effects store path in description, hash in result | ||
| * const { content } = yield* durableCall("resolve", async () => { | ||
| * const data = await readFile("./input.txt", "utf8"); | ||
| * return { content: data, contentHash: sha256(data) }; | ||
| * }); | ||
| * // description: { type: "call", name: "resolve", path: "./input.txt" } | ||
| * // result: { status: "ok", value: { content: "...", contentHash: "sha256:..." } } | ||
| * | ||
| * yield* durableRun(innerWorkflow, { stream }); | ||
| * } |
There was a problem hiding this comment.
Fix the example so durableCall() runs inside the durableRun() workflow.
Lines 75-78 call a durable effect before durableRun() has installed the durable context, so this snippet currently demonstrates an invalid setup path. Show a parent Operation that does yield* useFileContentGuard() and then yield* durableRun(function* innerWorkflow() { ...durableCall... }, { stream }) instead.
As per coding guidelines, .policies/documentation.md - Document lifecycle/cancellation semantics.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@durable-streams/file-guard.ts` around lines 69 - 83, The example currently
yields durableCall in the parent workflow before durableRun installs the durable
context; change it so the parent Operation installs the guard via yield*
useFileContentGuard() and then invokes yield* durableRun(function*
innerWorkflow() { /* perform durableCall here */ }, { stream }) so that
durableCall executes inside innerWorkflow under the durableRun context;
specifically move the durableCall invocation into the innerWorkflow generator
(reference durableCall, durableRun, useFileContentGuard, innerWorkflow,
workflow) and ensure the { stream } option is still passed to durableRun.
There was a problem hiding this comment.
N/A — the file-guard.ts file was moved to the @effectionx/durable-effects package. The in-code example there uses the correct context.
There was a problem hiding this comment.
Rate Limit Exceeded
@taras have exceeded the limit for the number of chat messages per hour. Please wait 0 minutes and 6 seconds before sending another message.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@durable-streams/demo/README.md`:
- Around line 28-34: The fenced ASCII diagram block in README.md lacks a
language specifier; update the triple-backtick fence around the diagram to
include a plain text specifier (use ```text or ```plaintext) so the code block
becomes ```text followed by the ASCII art and a closing ```, ensuring consistent
markdown linting and rendering of the diagram.
In `@durable-streams/demo/start.sh`:
- Around line 77-78: Replace the hardcoded "sleep 2" readiness wait with a
retry/poll loop that checks the server is actually responsive (e.g., curl or nc
against the server port or a /health endpoint) and exits when ready or after a
timeout; update start.sh to poll the target host:port or health path with short
delays and a max retry count so the demo is robust on slow CI/machines instead
of relying on the fixed sleep.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 4cca7872-6267-4f70-876e-6912f991e5df
📒 Files selected for processing (3)
durable-streams/demo/README.mddurable-streams/demo/observe.tsdurable-streams/demo/start.sh
| # Give the server a moment to bind its port | ||
| sleep 2 |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider a more robust server readiness check.
The hardcoded sleep 2 works for most cases but could be flaky on slower systems or in CI environments. For a demo script this is acceptable, but if reliability becomes an issue, consider polling the server's health endpoint or using a retry loop.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@durable-streams/demo/start.sh` around lines 77 - 78, Replace the hardcoded
"sleep 2" readiness wait with a retry/poll loop that checks the server is
actually responsive (e.g., curl or nc against the server port or a /health
endpoint) and exits when ready or after a timeout; update start.sh to poll the
target host:port or health path with short delays and a max retry count so the
demo is robust on slow CI/machines instead of relying on the fixed sleep.
There was a problem hiding this comment.
Acknowledged — the sleep 2 is a pragmatic choice for a demo script. A more robust readiness check (e.g., polling the server's health endpoint) would add complexity to what's meant to be a simple launcher. The 2-second delay has been reliable in practice.
There was a problem hiding this comment.
Rate Limit Exceeded
@taras have exceeded the limit for the number of chat messages per hour. Please wait 2 minutes and 11 seconds before sending another message.
There was a problem hiding this comment.
♻️ Duplicate comments (2)
durable-streams/demo/README.md (1)
28-35: 🧹 Nitpick | 🔵 TrivialAdd a language specifier to the fenced code block.
The ASCII diagram should use
textorplaintextas the language specifier.-``` +```text ┌─────────────────────┬──────────────────┐🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durable-streams/demo/README.md` around lines 28 - 35, The fenced ASCII diagram block is missing a language specifier; change the opening fence from ``` to ```text (or ```plaintext) for the diagram starting with "┌─────────────────────┬──────────────────┐" so the block is marked as plain text and rendered correctly.durable-streams/demo/start.sh (1)
73-83: 🧹 Nitpick | 🔵 TrivialThe hardcoded
sleep 2may be flaky on slower systems.For a demo script this is acceptable, but consider polling the server's health endpoint if reliability becomes an issue in CI or on slower machines.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durable-streams/demo/start.sh` around lines 73 - 83, The hardcoded sleep in start.sh (the "sleep 2" after launching "$NODE demo/observe.ts") is flaky; replace it with a polling loop that waits for the observer to become ready (e.g., repeatedly curl or tcp-check the port/health endpoint exposed by demo/observe.ts until success or a timeout) before pre-typing the cook and pkill commands, and ensure the loop aborts after a reasonable timeout and logs an error if the observer never becomes available.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@durable-streams/demo/README.md`:
- Around line 28-35: The fenced ASCII diagram block is missing a language
specifier; change the opening fence from ``` to ```text (or ```plaintext) for
the diagram starting with "┌─────────────────────┬──────────────────┐" so the
block is marked as plain text and rendered correctly.
In `@durable-streams/demo/start.sh`:
- Around line 73-83: The hardcoded sleep in start.sh (the "sleep 2" after
launching "$NODE demo/observe.ts") is flaky; replace it with a polling loop that
waits for the observer to become ready (e.g., repeatedly curl or tcp-check the
port/health endpoint exposed by demo/observe.ts until success or a timeout)
before pre-typing the cook and pkill commands, and ensure the loop aborts after
a reasonable timeout and logs an error if the observer never becomes available.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 53adbba1-9f2c-4a8b-99ea-db3a67ee168b
📒 Files selected for processing (4)
durable-streams/demo/README.mddurable-streams/demo/observe.tsdurable-streams/demo/start.shdurable-streams/tsconfig.json
Create new package with PLAN.md, package.json, tsconfig.json, README.md, and placeholder mod.ts. Integrate into monorepo workspace and add effection alpha override for experimental API compatibility. Consumes @effectionx/durable-streams via pkg-pr-new preview URL from PR #179.
3667567 to
c924aba
Compare
Create new package with PLAN.md, package.json, tsconfig.json, README.md, and placeholder mod.ts. Integrate into monorepo workspace and add effection alpha override for experimental API compatibility. Consumes @effectionx/durable-streams via pkg-pr-new preview URL from PR #179.
Create new package with PLAN.md, package.json, tsconfig.json, README.md, and placeholder mod.ts. Integrate into monorepo workspace and add effection alpha override for experimental API compatibility. Consumes @effectionx/durable-streams via pkg-pr-new preview URL from PR #179.
163028a to
51ac3d6
Compare
Create new package with PLAN.md, package.json, tsconfig.json, README.md, and placeholder mod.ts. Integrate into monorepo workspace and add effection alpha override for experimental API compatibility. Consumes @effectionx/durable-streams via pkg-pr-new preview URL from PR #179.
Port durable execution for Effection from ctp-durably into the effectionx monorepo. Crash-safe generator workflows that survive process restarts by journaling effects to an append-only stream. - 17 source files ported from Deno to Node (flat structure) - 11 test files (95 tests, all passing) - Monorepo-wide effection 4.1.0-alpha.7 override for scope.around() - Cherry-pick effect-ts teardown fix for alpha compatibility - README documenting mental model, journal protocol, replay guards, divergence detection, and stream backends
Use a shared WorkflowValue alias (Json | void) in durableRun and structured concurrency combinators so Workflow<void> callbacks type-check under strict tsc checks. Session-ID: ses_33ef1d402ffeBK425121WZG7jG
Add a committable Node-based demo with a README that documents tmux and manual run flows so contributors can reproduce replay and crash-recovery behavior locally. Session-ID: ses_33ef1d402ffeBK425121WZG7jG
Remove the stale ../fs reference from durable-streams/tsconfig.json so check:tsrefs matches the actual workspace imports and CI passes. Session-ID: ses_33ef1d402ffeBK425121WZG7jG
Address PR review suggestions: harden replay safety (run check phase before close fast-path, cross-coroutine early-return detection), deep-clone InMemoryStream events, safe URL construction, normalize file-guard hashes, validate versionCheck range, use distinct error subclass names, improve demo robustness, and update README examples. Session-ID: ses_33ef1d402ffeBK425121WZG7jG
Replace imperative async/await patterns with structured concurrency:
- server.ts: use resource() for server lifecycle, main() as entrypoint,
suspend() to keep alive — teardown is automatic on SIGINT/SIGTERM
- tail.ts: replace polling loop with SSE live mode via @durable-streams/client
stream({ live: true }), bridge ReadableStream into Effection subscription
using createChannel + spawn, consume with each() pattern
Both scripts now get clean shutdown for free via Effection's scope teardown.
… with colors Combine demo/server.ts and demo/tail.ts into demo/observe.ts — a single process that starts the durable streams server and tails the journal via SSE with ANSI color-coded output (cyan for yields, magenta for closes, green/red/yellow for ok/err/cancelled status). Simplify tmux layout from 4 panes to 3: - Observer (server + journal tail) - Cook workflow (focused) - Control (kill command)
Cook pane now takes the full left side for maximum visibility. Observer (80%) and control (20%) stack on the right.
The onError handler was checking for error.code === 'NOT_FOUND' but the client throws a FetchError with status 404. Check instanceof FetchError + status === 404 instead, with a 500ms delay before retry. Swap tmux layout: observer (server + journal) on the left, cook pane full-height on the right for better presentation flow.
The previous kill command targeted the entire process group (kill -9 -PGID) which also killed the pane shell, closing the tmux pane. Now uses pgrep -P to find the child node process and kills only that, leaving the shell alive so the presenter can re-run the cook command.
Previous approach using pgrep -P only found direct children of the shell, missing node subprocesses. Use pkill -9 -f 'demo/cook.ts' to reliably kill all processes matching the cook script.
…guard Move DurableRuntime interface + DurableRuntimeCtx context into durable-streams as the canonical location for the runtime abstraction. Remove file-guard.ts (useFileContentGuard) — replaced by the platform-agnostic version in @effectionx/durable-effects that uses DurableRuntime instead of Node-specific crypto/fs imports.
…d changes - Add DurableRuntime section explaining context-based runtime pattern - Replace built-in file content guard section with reference to @effectionx/durable-effects pre-built guards - Update import examples to use @effectionx/durable-effects
- Enhanced comments explaining description comparison design (type+name only) - Rethrow normalized primary Error instead of raw error in run.ts - Added JSDoc explaining why closed coroutines are skipped in replay-index - Expanded comments documenting activeState limitation with TODO in each.ts - Added comment explaining replay-guard is NOT bypassed by child Close fast-path - Added upfront minVersion > maxVersion validation for versionCheck() - Replaced global pkill with pane-scoped kill in demo start.sh - Updated crash instruction text in demo cook.ts - Fixed fenced code block language specifiers in README.md and demo/README.md
Add 4 specification documents from the original ctp-durably design repo: - protocol-specification.md: Two-event durable execution protocol spec - durable-streams.md: Analysis of Durable Streams as execution log backend - effection-integration.md: Architecture research mapping protocol to Effection - DECISIONS.md: 34 architectural decisions (DEC-001 through DEC-034) References updated for the effectionx monorepo context: - DEC-001 (Deno) and DEC-004 (@std/assert) marked as superseded - DEC-002 updated with alpha.7 pin and tracking issue #181 - DEC-003 updated to reflect two-package split - Project-local lib/ and test/ paths updated to package-root layout
Create new package with PLAN.md, package.json, tsconfig.json, README.md, and placeholder mod.ts. Integrate into monorepo workspace and add effection alpha override for experimental API compatibility. Consumes @effectionx/durable-streams via pkg-pr-new preview URL from PR #179.
51ac3d6 to
abfe832
Compare
…me interface
Adds a stat(path) method that returns file metadata without reading
contents. Returns { exists: false, isFile: false, isDirectory: false }
for missing paths instead of throwing — existence checks should not
require try/catch. Permission errors still propagate.
Create new package with PLAN.md, package.json, tsconfig.json, README.md, and placeholder mod.ts. Integrate into monorepo workspace and add effection alpha override for experimental API compatibility. Consumes @effectionx/durable-streams via pkg-pr-new preview URL from PR #179.
Create new package with PLAN.md, package.json, tsconfig.json, README.md, and placeholder mod.ts. Integrate into monorepo workspace and add effection alpha override for experimental API compatibility. Consumes @effectionx/durable-streams via pkg-pr-new preview URL from PR #179.
Motivation
I want to make it possible to write crash-safe generator workflows with structured concurrency using Effection and Durable Streams
The intended production backend is Durable Streams — an append-only HTTP streaming protocol with idempotent producers and epoch-based zombie fencing, developed by ElectricSQL.
useHttpDurableStreamhandles the serialization requirements that make durable execution safe with this protocol:fetch()with an explicitawait— nolingerMsbatching, because thepersist-before-resumeinvariant requires eachYieldevent to be durably acknowledged before the generator advances.durableAllchildren are serialized through an internal Effection queue + worker, so sequence numbers are assigned and sent in order without application-level coordination.StaleEpochErroron 403) halts execution immediately, preventing a zombie scheduler from corrupting another instance's journal.The
DurableStreaminterface is abstract —InMemoryStreamships for tests, and any append-only store can be adapted.Workflows express themselves as normal generator functions; the runtime journals every effect resolution to an append-only stream. On restart, stored results are replayed directly into the generator without re-executing any side effects. When the journal runs out, execution transitions to live mode. The generator never knows which mode it's in.
If the process crashes between
chargeCardandfulfillOrder, the workflow resumes exactly from that point.chargeCardis not called again.Approach
Two packages
This PR introduces two packages that work together:
@effectionx/durable-streams— Core protocolThe durable execution protocol: event types, replay engine, divergence detection, replay guard API, stream backends.
Also owns the
DurableRuntimeinterface — a platform-agnostic runtime abstraction that effects use for I/O (file reads, subprocess execution, HTTP requests). The runtime is installed via Effection context (scope.set(DurableRuntimeCtx, nodeRuntime())) before callingdurableRun.@effectionx/durable-effects— Concrete effects + guardsBuilt on
durable-streams, provides:durableExec,durableReadFile,durableGlob,durableFetch,durableEval,durableResolve(+ convenience wrappersdurableNow,durableUUID,durableEnv)useFileContentGuard,useGlobContentGuard,useCodeFreshnessGuardnodeRuntime()for Node.js,stubRuntime()for testingcomputeSHA256using Web Crypto APIThe journal
The protocol uses exactly two event types:
Yield— written after a user-facing effect resolves, recording the effect description and result:Close— written when a coroutine terminates (completed, errored, or cancelled). These are load-bearing on restart: they tell the runtime which coroutines finished cleanly and which need re-execution from their last recorded yield.Infrastructure effects — scope setup, context reads, middleware — never produce journal entries. They re-run on replay and produce deterministic results by construction.
Type safety:
Workflow<T>vsOperation<T>Workflow<T>is a generator type constrained to only yieldDurableEffectvalues. TypeScript enforces this at compile time — yielding a plain EffectionOperationinside aWorkflowis a type error:Every
Workflow<T>is structurally compatible withOperation<T>, so workflows compose freely into larger Effection programs.ephemeral(op)is the explicit escape hatch when infrastructure operations are needed inside a workflow — it's transparent to the journal and re-runs on replay.Divergence detection and replay guards
During replay, every yielded effect's
{ type, name }is validated against the journal entry at that cursor position. A mismatch means the code has changed in a way that makes the stored history invalid, and aDivergenceErroris raised.For intentional code changes,
versionCheckembeds a version gate directly in the workflow so old in-flight instances and new instances can coexist in the same codebase.Replay guards handle a second class of problem: the effect sequence is unchanged, but the external world has changed since the journal was written. The
@effectionx/durable-effectspackage provides ready-to-use guards:Guards follow a two-phase model —
checkruns before replay starts and may perform I/O (hashing files, checking timestamps);decideruns synchronously during the replay loop and makes a pure decision from cached observations. Custom guards install viascope.around(ReplayGuard, ...)and compose vianext()chaining.Test results: 96 durable-streams tests + 45 durable-effects tests = 141 tests passing.