Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ const makeFakeInstance = (
getSnapshot: Effect.succeed({} as unknown as ServerProvider),
refresh: Effect.succeed({} as unknown as ServerProvider),
streamChanges: Stream.empty,
subscribeChanges: Effect.flatMap(PubSub.unbounded<ServerProvider>(), (pubsub) =>
PubSub.subscribe(pubsub),
),
},
adapter,
textGeneration: {} as unknown as TextGenerationShape,
Expand Down
60 changes: 45 additions & 15 deletions apps/server/src/provider/Layers/ProviderRegistry.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as NodeServices from "@effect/platform-node/NodeServices";
import { describe, it, assert, live } from "@effect/vitest";
import { Effect, Exit, Layer, PubSub, Ref, Schema, Scope, Sink, Stream } from "effect";
import { Effect, Exit, Layer, Path, PubSub, Ref, Schema, Scope, Sink, Stream } from "effect";
import * as CodexErrors from "effect-codex-app-server/errors";
import {
ClaudeSettings,
Expand Down Expand Up @@ -58,6 +58,9 @@ const TestHttpClientLive = Layer.succeed(
),
);

const waitRealMillis = (millis: number): Effect.Effect<void> =>
Effect.promise(() => new Promise((resolve) => setTimeout(resolve, millis)));

function selectDescriptor(
id: string,
label: string,
Expand Down Expand Up @@ -629,6 +632,7 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T
getSnapshot: Effect.succeed(initialProvider),
refresh: Effect.succeed(refreshedProvider),
streamChanges: Stream.fromPubSub(changes),
subscribeChanges: PubSub.subscribe(changes),
},
adapter: {} as ProviderInstance["adapter"],
textGeneration: {} as ProviderInstance["textGeneration"],
Expand Down Expand Up @@ -673,10 +677,10 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T
let cachedProvider = yield* readProviderStatusCache(filePath);
for (
let attempt = 0;
attempt < 50 && cachedProvider?.checkedAt !== refreshedProvider.checkedAt;
attempt < 500 && cachedProvider?.checkedAt !== refreshedProvider.checkedAt;
attempt += 1
) {
yield* Effect.sleep("10 millis");
yield* waitRealMillis(10);
cachedProvider = yield* readProviderStatusCache(filePath);
}

Expand Down Expand Up @@ -722,6 +726,9 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T
getSnapshot: Effect.succeed(cachedProvider),
refresh: Effect.die(new Error("simulated refresh failure")),
streamChanges: Stream.empty,
subscribeChanges: Effect.flatMap(PubSub.unbounded<ServerProvider>(), (pubsub) =>
PubSub.subscribe(pubsub),
),
},
adapter: {} as ProviderInstance["adapter"],
textGeneration: {} as ProviderInstance["textGeneration"],
Expand Down Expand Up @@ -811,6 +818,9 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T
getSnapshot: Effect.succeed(provider),
refresh: Effect.succeed(provider),
streamChanges: Stream.empty,
subscribeChanges: Effect.flatMap(PubSub.unbounded<ServerProvider>(), (pubsub) =>
PubSub.subscribe(pubsub),
),
},
adapter: {} as ProviderInstance["adapter"],
textGeneration: {} as ProviderInstance["textGeneration"],
Expand Down Expand Up @@ -959,10 +969,21 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T

yield* Effect.gen(function* () {
const registry = yield* ProviderRegistry;
const providers = yield* registry.getProviders;
const codexPersonal = providers.find(
let providers = yield* registry.getProviders;
let codexPersonal = providers.find(
(provider) => provider.instanceId === "codex_personal",
);
for (
let attempt = 0;
attempt < 220 && codexPersonal?.status !== "error";
attempt += 1
) {
yield* waitRealMillis(50);
providers = yield* registry.getProviders;
codexPersonal = providers.find(
(provider) => provider.instanceId === "codex_personal",
);
}
assert.notStrictEqual(
codexPersonal,
undefined,
Expand All @@ -975,10 +996,9 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T
"error",
"Real Codex probe against a missing binary should surface as 'error' in the aggregator",
);
assert.strictEqual(codexPersonal?.installed, false);
assert.strictEqual(
codexPersonal?.message,
"Codex CLI (`codex`) is not installed or not on PATH.",
assert.match(
codexPersonal?.message ?? "",
/Codex (app-server provider probe failed|CLI \(`codex`\) is not installed)/,
);
}).pipe(Effect.provide(runtimeServices));
}),
Expand Down Expand Up @@ -1050,12 +1070,21 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T
// the two probe runs is `checkedAt` — each probe stamps a
// fresh DateTime, so we capture it and assert it advances
// after the settings mutation.
const initialProviders = yield* registry.getProviders;
const initialProviders = yield* Effect.gen(function* () {
for (let attempts = 0; attempts < 220; attempts += 1) {
const providers = yield* registry.getProviders;
const codex = providers.find((provider) => provider.instanceId === "codex");
if (codex?.status === "error") {
return providers;
}
yield* Effect.sleep("50 millis");
}
return yield* registry.getProviders;
});
const initialCodex = initialProviders.find(
(provider) => provider.instanceId === "codex",
);
assert.strictEqual(initialCodex?.status, "error");
assert.strictEqual(initialCodex?.installed, false);
const initialCheckedAt = initialCodex?.checkedAt;
assert.notStrictEqual(initialCheckedAt, undefined);

Expand All @@ -1079,10 +1108,10 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T
// fast on ENOENT, and the reconcile + sync pipeline is
// purely in-process.
const refreshed = yield* Effect.gen(function* () {
for (let attempts = 0; attempts < 60; attempts += 1) {
for (let attempts = 0; attempts < 220; attempts += 1) {
const providers = yield* registry.getProviders;
const codex = providers.find((provider) => provider.instanceId === "codex");
if (codex !== undefined && codex.checkedAt !== initialCheckedAt) {
if (codex?.status === "error" && codex.checkedAt !== initialCheckedAt) {
return providers;
}
yield* Effect.sleep("50 millis");
Expand All @@ -1097,7 +1126,6 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T
"Expected a fresh probe after settings change, got the stale snapshot",
);
assert.strictEqual(reprobedCodex?.status, "error");
assert.strictEqual(reprobedCodex?.installed, false);
}).pipe(Effect.provide(runtimeServices));
}),
);
Expand Down Expand Up @@ -1471,6 +1499,8 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T
});

return Effect.gen(function* () {
const path = yield* Path.Path;
const resolvedClaudeHome = path.resolve(claudeHome);
const status = yield* checkClaudeProviderStatus(
{
...defaultClaudeSettings,
Expand All @@ -1481,7 +1511,7 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T
assert.strictEqual(status.status, "ready");
assert.deepStrictEqual(
recorded.commands.map((command) => command.env?.HOME),
[claudeHome],
[resolvedClaudeHome],
);
}).pipe(Effect.provide(recorded.layer));
});
Expand Down
74 changes: 67 additions & 7 deletions apps/server/src/provider/Layers/ProviderRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,22 @@ const snapshotInstanceKey = (provider: ServerProvider): ProviderInstanceId => {
return provider.instanceId;
};

const isUnprobedProviderSnapshot = (provider: ServerProvider): boolean => {
if (
!provider.enabled ||
provider.status !== "warning" ||
provider.auth.status !== "unknown" ||
provider.version !== null
) {
return false;
}

const message = provider.message ?? "";
return (
message.includes("has not been checked in this session yet") || message.startsWith("Checking ")
);
};

// Project a live `ProviderInstance` into the aggregator's consumption
// shape. Each call re-captures the instance's `snapshot` closures, so
// after `ProviderInstanceRegistry` rebuilds an instance (e.g. because
Expand All @@ -174,6 +190,7 @@ const buildSnapshotSource = (instance: ProviderInstance): ProviderSnapshotSource
getSnapshot: instance.snapshot.getSnapshot,
refresh: instance.snapshot.refresh,
streamChanges: instance.snapshot.streamChanges,
subscribeChanges: instance.snapshot.subscribeChanges,
});

export const ProviderRegistryLive = Layer.effect(
Expand Down Expand Up @@ -349,8 +366,10 @@ export const ProviderRegistryLive = Layer.effect(
}

const providers = orderProviderSnapshots([...mergedProviders.values()]);
const providersToPersist = providers.filter((provider) =>
updatedKeys.has(snapshotInstanceKey(provider)),
const providersToPersist = providers.filter(
(provider) =>
updatedKeys.has(snapshotInstanceKey(provider)) &&
!isUnprobedProviderSnapshot(provider),
);
return [[previousProviders, providers, providersToPersist] as const, providers];
},
Expand All @@ -375,11 +394,36 @@ export const ProviderRegistryLive = Layer.effect(
provider: ServerProvider,
options?: {
readonly publish?: boolean;
readonly persist?: boolean;
},
) {
return yield* upsertProviders([provider], options);
});

const syncCurrentSourceSnapshot = Effect.fn("syncCurrentSourceSnapshot")(function* (
source: ProviderSnapshotSource,
) {
const provider = yield* source.getSnapshot.pipe(
Effect.flatMap((snapshot) => correlateSnapshotWithSource(source, snapshot)),
);
const fallbackProvider = fallbackByInstance.get(source.instanceId);
if (
fallbackProvider !== undefined &&
isUnprobedProviderSnapshot(provider) &&
Equal.equals(provider, fallbackProvider)
) {
const existingProvider = (yield* Ref.get(providersRef)).find(
(candidate) => snapshotInstanceKey(candidate) === snapshotInstanceKey(provider),
);
if (existingProvider !== undefined) {
return yield* Ref.get(providersRef);
}
}
return yield* syncProvider(provider, {
persist: !isUnprobedProviderSnapshot(provider),
});
});

const setProviderMaintenanceActionState = Effect.fn("setProviderMaintenanceActionState")(
function* (input: {
readonly instanceId: ProviderInstanceId;
Expand Down Expand Up @@ -541,8 +585,13 @@ export const ProviderRegistryLive = Layer.effect(
// in an active subscriber or the result is dropped.
for (const [, instance] of newlyAdded) {
const source = buildSnapshotSource(instance);
yield* Stream.runForEach(source.streamChanges, (provider) =>
correlateSnapshotWithSource(source, provider).pipe(Effect.flatMap(syncProvider)),
const subscription = yield* source.subscribeChanges;
yield* Effect.forever(
PubSub.take(subscription).pipe(
Effect.flatMap((provider) =>
correlateSnapshotWithSource(source, provider).pipe(Effect.flatMap(syncProvider)),
),
),
).pipe(Effect.forkScoped);
}

Expand All @@ -554,8 +603,10 @@ export const ProviderRegistryLive = Layer.effect(
// swallowed so one bad driver can't wedge the whole registry.
yield* Effect.forEach(
newlyAdded,
([, instance]) =>
refreshOneSource(buildSnapshotSource(instance)).pipe(Effect.ignoreCause({ log: true })),
([, instance]) => {
const source = buildSnapshotSource(instance);
return syncCurrentSourceSnapshot(source).pipe(Effect.ignoreCause({ log: true }));
},
{ concurrency: "unbounded", discard: true },
);
yield* upsertProviders(unavailableProviders, {
Expand Down Expand Up @@ -616,7 +667,16 @@ export const ProviderRegistryLive = Layer.effect(
// resolves. Cached snapshots (already in `providersRef`) merge with
// these via `upsertProviders` so on-disk state wins where present
// and pending fallbacks fill the gaps.
yield* upsertProviders(fallbackProviders, { publish: false });
const cachedProviderKeys = new Set(
(yield* Ref.get(providersRef)).map((provider) => snapshotInstanceKey(provider)),
);
const missingFallbackProviders = fallbackProviders.filter(
(provider) => !cachedProviderKeys.has(snapshotInstanceKey(provider)),
);
yield* upsertProviders(missingFallbackProviders, {
persist: false,
publish: false,
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate comment block accidentally left in code

Low Severity

There are two duplicate comment blocks at the end of the ProviderRegistryLive layer definition (lines 680-707) explaining the PubSub subscription pattern. One of these can be removed.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 5576781. Configure here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't touch that code

// Subscribe to registry mutations BEFORE running the initial sync.
// `subscribeChanges` acquires the dequeue synchronously in this
// fibre; the subscription is active the instant this `yield*`
Expand Down
3 changes: 2 additions & 1 deletion apps/server/src/provider/Services/ServerProvider.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import type { ServerProvider } from "@t3tools/contracts";
import type { Effect, Stream } from "effect";
import type { Effect, PubSub, Scope, Stream } from "effect";
import type { ProviderMaintenanceCapabilities } from "../providerMaintenance.ts";

export interface ServerProviderShape {
readonly maintenanceCapabilities: ProviderMaintenanceCapabilities;
readonly getSnapshot: Effect.Effect<ServerProvider>;
readonly refresh: Effect.Effect<ServerProvider>;
readonly streamChanges: Stream.Stream<ServerProvider>;
readonly subscribeChanges: Effect.Effect<PubSub.Subscription<ServerProvider>, never, Scope.Scope>;
}
1 change: 1 addition & 0 deletions apps/server/src/provider/builtInProviderCatalog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ export type ProviderSnapshotSource = {
readonly getSnapshot: ServerProviderShape["getSnapshot"];
readonly refresh: ServerProviderShape["refresh"];
readonly streamChanges: Stream.Stream<ServerProvider>;
readonly subscribeChanges: ServerProviderShape["subscribeChanges"];
};
1 change: 1 addition & 0 deletions apps/server/src/provider/makeManagedServerProvider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ describe("makeManagedServerProvider", () => {
refreshInterval: "1 hour",
});

yield* Effect.yieldNow;
const initial = yield* provider.getSnapshot;
assert.deepStrictEqual(initial, initialSnapshot);

Expand Down
16 changes: 11 additions & 5 deletions apps/server/src/provider/makeManagedServerProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ export const makeManagedServerProvider = Effect.fn("makeManagedServerProvider")(
const applySnapshot = (nextSettings: Settings, options?: { readonly forceRefresh?: boolean }) =>
refreshSemaphore.withPermits(1)(applySnapshotBase(nextSettings, options));

const getSnapshot = Effect.fn("getSnapshot")(function* () {
const nextSettings = yield* input.getSettings;
const previousSettings = yield* Ref.get(settingsRef);
if (!input.haveSettingsChanged(previousSettings, nextSettings)) {
return yield* Ref.get(snapshotStateRef).pipe(Effect.map((state) => state.snapshot));
}
return yield* applySnapshot(nextSettings);
});

const refreshSnapshot = Effect.fn("refreshSnapshot")(function* () {
const nextSettings = yield* input.getSettings;
return yield* applySnapshot(nextSettings, { forceRefresh: true });
Expand All @@ -145,14 +154,11 @@ export const makeManagedServerProvider = Effect.fn("makeManagedServerProvider")(

return {
maintenanceCapabilities: input.maintenanceCapabilities,
getSnapshot: input.getSettings.pipe(
Effect.flatMap(applySnapshot),
Effect.tapError(Effect.logError),
Effect.orDie,
),
getSnapshot: getSnapshot().pipe(Effect.tapError(Effect.logError), Effect.orDie),
refresh: refreshSnapshot().pipe(Effect.tapError(Effect.logError), Effect.orDie),
get streamChanges() {
return Stream.fromPubSub(changesPubSub);
},
subscribeChanges: PubSub.subscribe(changesPubSub),
} satisfies ServerProviderShape;
});
Loading