feat(hydromancer): add l2Book WebSocket order-book surface#127
feat(hydromancer): add l2Book WebSocket order-book surface#127jaspersagent wants to merge 4 commits into
Conversation
897d716 to
e89a9e7
Compare
| yield* Effect.sleep(Duration.millis(0)); | ||
| yield* Effect.sleep(Duration.millis(0)); |
There was a problem hiding this comment.
Why do we need this? Looks to me like the flush we had before also didn't do anything.
There was a problem hiding this comment.
Right that the old flush() was a no-op for tests that only awaited the runPromise result. These sleeps weren't, because we now observe FakeWebSocket.instances[0] from inside the program: the daemon owns the socket and the layer scope tears it down on exit, so the forkDaemon fiber has to be scheduled before the first new WebSocket(...) is visible.
Replaced with a waitForSocket polling helper + Effect.yieldNow(), so the reason for each wait is explicit.
| it("returns the seeded snapshot for a pre-subscribed coin", async () => { | ||
| const config: HydromancerModuleConfig = { | ||
| ...baseConfig, | ||
| l2BookSubscriptionCoins: ["BTC"], | ||
| }; |
There was a problem hiding this comment.
This doesn't test that it actually sends a subscription message for the initial coins.
There was a problem hiding this comment.
Good catch. Added an explicit expect(sentOnOpen).toEqual([buildSubscribeFrame("l2Book", "BTC")]) before the response check; the test now fails if the subscribe pipeline becomes a no-op.
| // Consecutive onExpire failures per key. A key that keeps failing is | ||
| // force-removed once it reaches the cap so the map cannot grow forever. |
There was a problem hiding this comment.
Ok great, now the map won't grow forever but whichever other process relied on this callback to clean up resources is now most likely leaking something that's probably bigger than a key value pair in a map.
Maybe it makes more sense to use this counter to space out retries? So the first X times we attempt it and log a warning, after X we log an error and try every Y (or some capped exponential backoff)
There was a problem hiding this comment.
Agreed. Reworked: a failing onExpire keeps its entry forever and the counter only escalates log severity (warn for the first 3, error after). Retries happen on every cleanup pass at the configured interval, so a transient downstream failure resolves itself. Added a test that drives 5 failing passes and asserts the entry persists.
| const getOrWaitOrEvict = (key: K) => | ||
| getOrWaitPrice(key).pipe( | ||
| Effect.catchTag("FailedToGetPriceError", (error) => | ||
| Effect.zipRight(deletePrice(key), Effect.fail(error)), | ||
| ), | ||
| ); |
There was a problem hiding this comment.
I think this replaces all uses of getOrWaitPrice, so we can clean that up.
There was a problem hiding this comment.
Done. getOrWaitPrice is a closure-local helper now, not exported. Rewrote price-cache.test.ts against the wrappers (tryGetOrWait, getOrWaitOrEvict); the waiter mechanism is still fully covered via the public surface.
| Effect.sync(() => { | ||
| MutableHashMap.set(entries, key, { value, lastUpdate: now }); |
There was a problem hiding this comment.
Is there a reason to make these functions effects?
There was a problem hiding this comment.
No good reason. They were all Effect.sync. Dropped the wrappers: set/get/remove are plain sync functions now, callers stopped yielding on them. The factory still returns Effect.Effect<FreshnessCache<K,V>> so tests construct it the same way.
| * stale or absent key returns `None`, leaving the caller free to fall back | ||
| * to a pull such as a REST fetch. It has no waiters and no timeouts. | ||
| */ | ||
| export interface FreshnessCache<K, V> { |
There was a problem hiding this comment.
I don't think we're using this interface outside of this file?
There was a problem hiding this comment.
It is. ws-client.ts:127 declares createHydromancerWS(config, assetCache: FreshnessCache<string, AssetCtx>, ...), so the type import is genuinely consumed. Keeping it exported.
| // Only record sockets this test file opened. A leaked WS daemon from | ||
| // another test file also constructs FakeWebSockets; filtering by URL | ||
| // keeps those out of `instances` so the reconnect tests stay reliable. | ||
| if (url.includes("wsclient.test")) { | ||
| FakeWebSocket.instances.push(this); | ||
| } |
There was a problem hiding this comment.
I'm not sure how this would happen, given that each test file seems to define their own fake WS class.
There was a problem hiding this comment.
Each file has its own FakeWebSocket class, but createHydromancerWS forks its connect loop via Effect.forkDaemon, which is intentionally detached from the layer scope. A daemon started by another file outlives its runPromise; when bun switches to this file, the daemon's next reconnect calls new globalThis.WebSocket(...), which is now THIS file's class. The URL filter discriminates by the leaked daemon's config-time wsUrl (captured at fiber start). Expanded the inline comment to spell this out.
| // Polls until at least `count` WebSocket instances exist. Reconnect tests use | ||
| // this instead of a fixed wall-clock wait, which races the reconnect schedule | ||
| // once the machine is under load. | ||
| const waitForInstances = async (count: number, timeoutMs = 2000) => { | ||
| const startedAt = Date.now(); | ||
| while (FakeWebSocket.instances.length < count) { | ||
| if (Date.now() - startedAt > timeoutMs) { | ||
| throw new Error( | ||
| `Timed out waiting for ${count} WebSocket instances; saw ${FakeWebSocket.instances.length}`, | ||
| ); | ||
| } | ||
| await new Promise<void>((r) => setTimeout(r, 5)); | ||
| } | ||
| }; |
There was a problem hiding this comment.
Can't we test this properly with Effect's TestClock?
There was a problem hiding this comment.
Yes in principle. The reconnect Schedule.spaced runs on Effect's clock, so TestClock.adjust would advance it. The friction is the FakeWebSocket↔daemon bridge: triggerOpen/triggerClose dispatch synchronously and the daemon's listeners hop back via Runtime.runSync, so we'd still need yields between the external event and the Effect-driven reaction. Leaving the URL-keyed polling for this PR; happy to do the TestClock refactor as a follow-up.
| // One entry per WS channel: the demand map the cleanup pass drains, | ||
| // the cleanup cadence, and the cache eviction that runs before the | ||
| // channel is unsubscribed. |
There was a problem hiding this comment.
My reading comprehension is not sufficient enough to get what this means :P
There was a problem hiding this comment.
Rewritten. Now names the locals each entry carries (lastRequest, ttl/interval, onEvict) and what the loop below uses them for.
The module serves order books alongside asset contexts: /info accepts
{type: "l2Book", coins} and answers WS-first with a short wait for the
first frame and no REST fallback, returning null for an unseeded coin.
One WebSocket multiplexes both channels through a channel-keyed
registry, and reconnect listeners are detached on close so a reconnect
storm does not strand them. The book cache reuses the shared price
cache with a configurable wait timeout and a tryGetOrWait primitive
that evicts the waiter on timeout; the asset cache moves to a shared
freshness-cache. The ModuleService body parameter is now required.
…ules dxfeed, lo-tech, and pyth-lazer each hand-rolled the same getOrWaitPrice catch-evict-rethrow block; they now call the shared getOrWaitOrEvict primitive. chainlink-streams and pm-insights re-read the request body with request.clone().text() instead of the body the proxy already parsed; they now take the threaded body parameter. Behavior is unchanged.
Signed-off-by: kaynetik <aleksandar@nesovic.dev>
Drop Effect wrappers from the freshness cache's sync mutations, de-export `getOrWaitPrice` now that every production caller goes through the wrappers, and rework `forkIdleCleanup` so a failing onExpire keeps its entry forever and only escalates log severity. Replace the test's `Effect.sleep(0)` dance with a `waitForSocket` helper and add the missing subscribe-frame assertion to the seeded-snapshot l2Book test.
828cedd to
fa6de90
Compare
The hydromancer module serves order books now, alongside asset contexts.
/infoaccepts{"type": "l2Book", "coins": [...]}next to the existingassetContextshape. l2Book is answered WS-first: the request waits briefly for the first frame and returnsnullfor an unseeded coin, with no REST fallback, because books update many times a second and a fallback fetch would be stale before it returned. Both surfaces multiplex over one WebSocket on one credential.The WS code is built not to leak. One channel-keyed registry carries both channels, and reconnect listeners are detached on close so a reconnect storm cannot strand them. The book cache reuses the shared price cache with a short wait timeout and a
tryGetOrWaitprimitive that evicts the waiter once the wait times out.The second commit is cleanup the shared code makes obvious: dxfeed, lo-tech, and pyth-lazer drop their hand-rolled
getOrWaitPricecatch-and-evict block for the sharedgetOrWaitOrEvict, and chainlink-streams and pm-insights take the proxy-parsed body instead of re-reading the request.ModuleService.handleRequestnow requires the body parameter. Config field names are unchanged, so operatorconfig.jsonfiles keep working.Verified end to end against Hydromancer mainnet: both
assetContextandl2Bookreturn live data through a running proxy.