Skip to content

feat(hydromancer): add l2Book WebSocket order-book surface#127

Open
jaspersagent wants to merge 4 commits into
sedaprotocol:mainfrom
jaspersagent:feat/hydromancer-l2book
Open

feat(hydromancer): add l2Book WebSocket order-book surface#127
jaspersagent wants to merge 4 commits into
sedaprotocol:mainfrom
jaspersagent:feat/hydromancer-l2book

Conversation

@jaspersagent
Copy link
Copy Markdown
Contributor

@jaspersagent jaspersagent commented May 21, 2026

The hydromancer module serves order books now, alongside asset contexts.

/info accepts {"type": "l2Book", "coins": [...]} next to the existing assetContext shape. l2Book is answered WS-first: the request waits briefly for the first frame and returns null for an unseeded coin, with no REST fallback, because books update many times a second and a fallback fetch would be stale before it returned. Both surfaces multiplex over one WebSocket on one credential.

The WS code is built not to leak. One channel-keyed registry carries both channels, and reconnect listeners are detached on close so a reconnect storm cannot strand them. The book cache reuses the shared price cache with a short wait timeout and a tryGetOrWait primitive that evicts the waiter once the wait times out.

The second commit is cleanup the shared code makes obvious: dxfeed, lo-tech, and pyth-lazer drop their hand-rolled getOrWaitPrice catch-and-evict block for the shared getOrWaitOrEvict, and chainlink-streams and pm-insights take the proxy-parsed body instead of re-reading the request. ModuleService.handleRequest now requires the body parameter. Config field names are unchanged, so operator config.json files keep working.

Verified end to end against Hydromancer mainnet: both assetContext and l2Book return live data through a running proxy.

@jaspersagent jaspersagent force-pushed the feat/hydromancer-l2book branch 2 times, most recently from 897d716 to e89a9e7 Compare May 21, 2026 14:40
@jasperdg jasperdg requested a review from Thomasvdam May 22, 2026 12:06
Comment on lines +426 to +427
yield* Effect.sleep(Duration.millis(0));
yield* Effect.sleep(Duration.millis(0));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? Looks to me like the flush we had before also didn't do anything.

Copy link
Copy Markdown
Contributor Author

@jaspersagent jaspersagent May 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right that the old flush() was a no-op for tests that only awaited the runPromise result. These sleeps weren't, because we now observe FakeWebSocket.instances[0] from inside the program: the daemon owns the socket and the layer scope tears it down on exit, so the forkDaemon fiber has to be scheduled before the first new WebSocket(...) is visible.

Replaced with a waitForSocket polling helper + Effect.yieldNow(), so the reason for each wait is explicit.

Comment on lines +626 to +630
it("returns the seeded snapshot for a pre-subscribed coin", async () => {
const config: HydromancerModuleConfig = {
...baseConfig,
l2BookSubscriptionCoins: ["BTC"],
};
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't test that it actually sends a subscription message for the initial coins.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Added an explicit expect(sentOnOpen).toEqual([buildSubscribeFrame("l2Book", "BTC")]) before the response check; the test now fails if the subscribe pipeline becomes a no-op.

Comment on lines +35 to +36
// Consecutive onExpire failures per key. A key that keeps failing is
// force-removed once it reaches the cap so the map cannot grow forever.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok great, now the map won't grow forever but whichever other process relied on this callback to clean up resources is now most likely leaking something that's probably bigger than a key value pair in a map.

Maybe it makes more sense to use this counter to space out retries? So the first X times we attempt it and log a warning, after X we log an error and try every Y (or some capped exponential backoff)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Reworked: a failing onExpire keeps its entry forever and the counter only escalates log severity (warn for the first 3, error after). Retries happen on every cleanup pass at the configured interval, so a transient downstream failure resolves itself. Added a test that drives 5 failing passes and asserts the entry persists.

Comment on lines +121 to +126
const getOrWaitOrEvict = (key: K) =>
getOrWaitPrice(key).pipe(
Effect.catchTag("FailedToGetPriceError", (error) =>
Effect.zipRight(deletePrice(key), Effect.fail(error)),
),
);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this replaces all uses of getOrWaitPrice, so we can clean that up.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. getOrWaitPrice is a closure-local helper now, not exported. Rewrote price-cache.test.ts against the wrappers (tryGetOrWait, getOrWaitOrEvict); the waiter mechanism is still fully covered via the public surface.

Comment on lines +31 to +32
Effect.sync(() => {
MutableHashMap.set(entries, key, { value, lastUpdate: now });
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to make these functions effects?

Copy link
Copy Markdown
Contributor Author

@jaspersagent jaspersagent May 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No good reason. They were all Effect.sync. Dropped the wrappers: set/get/remove are plain sync functions now, callers stopped yielding on them. The factory still returns Effect.Effect<FreshnessCache<K,V>> so tests construct it the same way.

* stale or absent key returns `None`, leaving the caller free to fall back
* to a pull such as a REST fetch. It has no waiters and no timeouts.
*/
export interface FreshnessCache<K, V> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we're using this interface outside of this file?

Copy link
Copy Markdown
Contributor Author

@jaspersagent jaspersagent May 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is. ws-client.ts:127 declares createHydromancerWS(config, assetCache: FreshnessCache<string, AssetCtx>, ...), so the type import is genuinely consumed. Keeping it exported.

Comment on lines +162 to +167
// Only record sockets this test file opened. A leaked WS daemon from
// another test file also constructs FakeWebSockets; filtering by URL
// keeps those out of `instances` so the reconnect tests stay reliable.
if (url.includes("wsclient.test")) {
FakeWebSocket.instances.push(this);
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how this would happen, given that each test file seems to define their own fake WS class.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each file has its own FakeWebSocket class, but createHydromancerWS forks its connect loop via Effect.forkDaemon, which is intentionally detached from the layer scope. A daemon started by another file outlives its runPromise; when bun switches to this file, the daemon's next reconnect calls new globalThis.WebSocket(...), which is now THIS file's class. The URL filter discriminates by the leaked daemon's config-time wsUrl (captured at fiber start). Expanded the inline comment to spell this out.

Comment on lines +202 to +215
// Polls until at least `count` WebSocket instances exist. Reconnect tests use
// this instead of a fixed wall-clock wait, which races the reconnect schedule
// once the machine is under load.
const waitForInstances = async (count: number, timeoutMs = 2000) => {
const startedAt = Date.now();
while (FakeWebSocket.instances.length < count) {
if (Date.now() - startedAt > timeoutMs) {
throw new Error(
`Timed out waiting for ${count} WebSocket instances; saw ${FakeWebSocket.instances.length}`,
);
}
await new Promise<void>((r) => setTimeout(r, 5));
}
};
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we test this properly with Effect's TestClock?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes in principle. The reconnect Schedule.spaced runs on Effect's clock, so TestClock.adjust would advance it. The friction is the FakeWebSocket↔daemon bridge: triggerOpen/triggerClose dispatch synchronously and the daemon's listeners hop back via Runtime.runSync, so we'd still need yields between the external event and the Effect-driven reaction. Leaving the URL-keyed polling for this PR; happy to do the TestClock refactor as a follow-up.

Comment on lines +42 to +44
// One entry per WS channel: the demand map the cleanup pass drains,
// the cleanup cadence, and the cache eviction that runs before the
// channel is unsubscribed.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reading comprehension is not sufficient enough to get what this means :P

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewritten. Now names the locals each entry carries (lastRequest, ttl/interval, onEvict) and what the loop below uses them for.

jaspersagent and others added 4 commits May 24, 2026 07:20
The module serves order books alongside asset contexts: /info accepts
{type: "l2Book", coins} and answers WS-first with a short wait for the
first frame and no REST fallback, returning null for an unseeded coin.
One WebSocket multiplexes both channels through a channel-keyed
registry, and reconnect listeners are detached on close so a reconnect
storm does not strand them. The book cache reuses the shared price
cache with a configurable wait timeout and a tryGetOrWait primitive
that evicts the waiter on timeout; the asset cache moves to a shared
freshness-cache. The ModuleService body parameter is now required.
…ules

dxfeed, lo-tech, and pyth-lazer each hand-rolled the same getOrWaitPrice
catch-evict-rethrow block; they now call the shared getOrWaitOrEvict
primitive. chainlink-streams and pm-insights re-read the request body
with request.clone().text() instead of the body the proxy already
parsed; they now take the threaded body parameter. Behavior is
unchanged.
Signed-off-by: kaynetik <aleksandar@nesovic.dev>
Drop Effect wrappers from the freshness cache's sync mutations, de-export
`getOrWaitPrice` now that every production caller goes through the wrappers,
and rework `forkIdleCleanup` so a failing onExpire keeps its entry forever
and only escalates log severity. Replace the test's `Effect.sleep(0)` dance
with a `waitForSocket` helper and add the missing subscribe-frame assertion
to the seeded-snapshot l2Book test.
@kaynetik kaynetik force-pushed the feat/hydromancer-l2book branch from 828cedd to fa6de90 Compare May 24, 2026 05:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants