Skip to content

Commit 1381c9d

Browse files
committed
feat(webapp): make LLM pricing pub/sub subscription opt-in per process
Subscribing every replica to the reload channel — admin dashboards, workers, anything that imports the registry — fans out a full-table reload across processes that don't actually need real-time pricing freshness. The 5-minute interval is enough for those. Add LLM_PRICING_RELOAD_PUBSUB_ENABLED (default true). Set false on non-OTel services in multi-service deployments so only the span-ingesting processes subscribe and reload on publish. Default-true preserves current behavior for single-service self-hosted deployments without any env tuning.
1 parent 1699582 commit 1381c9d

2 files changed

Lines changed: 63 additions & 50 deletions

File tree

apps/webapp/app/env.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1426,6 +1426,12 @@ const EnvironmentSchema = z
14261426
LLM_PRICING_RELOAD_INTERVAL_MS: z.coerce.number().int().default(5 * 60 * 1000), // 5 minutes
14271427
LLM_PRICING_RELOAD_CHANNEL: z.string().default("llm-registry:reload"),
14281428
LLM_PRICING_RELOAD_DEBOUNCE_MS: z.coerce.number().int().default(1000),
1429+
// Whether to subscribe this process to the LLM_PRICING_RELOAD_CHANNEL.
1430+
// Defaults to true so single-service self-hosted deployments work without
1431+
// tuning. In multi-service deployments, set this to false on services
1432+
// that don't ingest spans (dashboard, workers) — only the OTel-ingesting
1433+
// services need the registry to reload in real time.
1434+
LLM_PRICING_RELOAD_PUBSUB_ENABLED: BoolEnv.default(true),
14291435
LLM_PRICING_SEED_ON_STARTUP: BoolEnv.default(false),
14301436
LLM_PRICING_READY_TIMEOUT_MS: z.coerce.number().int().default(500),
14311437
LLM_METRICS_BATCH_SIZE: z.coerce.number().int().default(5000),

apps/webapp/app/v3/llmPricingRegistry.server.ts

Lines changed: 57 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -37,62 +37,69 @@ export const llmPricingRegistry = singleton("llmPricingRegistry", () => {
3737
});
3838
}, reloadInterval);
3939

40-
// Pub/sub reload — billing's LLM registry worker publishes on this channel
41-
// immediately after writing new/changed model rows, so all webapp pods see
42-
// updates within ~1s instead of waiting for the next interval tick.
43-
const subscriber = createRedisClient("llm-pricing:subscriber", {
44-
keyPrefix: "llm-pricing:subscriber:",
45-
host: env.COMMON_WORKER_REDIS_HOST,
46-
port: env.COMMON_WORKER_REDIS_PORT,
47-
username: env.COMMON_WORKER_REDIS_USERNAME,
48-
password: env.COMMON_WORKER_REDIS_PASSWORD,
49-
tlsDisabled: env.COMMON_WORKER_REDIS_TLS_DISABLED === "true",
50-
clusterMode: env.COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED === "1",
51-
});
40+
// Pub/sub reload is opt-in per process. Without it, the registry stays
41+
// accurate via the existing 5-minute interval. In multi-service deployments
42+
// we only want the OTel-ingesting services subscribed — the dashboard and
43+
// worker services don't need real-time pricing freshness and shouldn't pile
44+
// onto each publish with a full-table reload.
45+
if (env.LLM_PRICING_RELOAD_PUBSUB_ENABLED) {
46+
const subscriber = createRedisClient("llm-pricing:subscriber", {
47+
keyPrefix: "llm-pricing:subscriber:",
48+
host: env.COMMON_WORKER_REDIS_HOST,
49+
port: env.COMMON_WORKER_REDIS_PORT,
50+
username: env.COMMON_WORKER_REDIS_USERNAME,
51+
password: env.COMMON_WORKER_REDIS_PASSWORD,
52+
tlsDisabled: env.COMMON_WORKER_REDIS_TLS_DISABLED === "true",
53+
clusterMode: env.COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED === "1",
54+
});
5255

53-
subscriber.subscribe(env.LLM_PRICING_RELOAD_CHANNEL).catch((err) => {
54-
logger.warn("Failed to subscribe to LLM pricing reload channel", {
55-
channel: env.LLM_PRICING_RELOAD_CHANNEL,
56-
error: err instanceof Error ? err.message : String(err),
56+
subscriber.subscribe(env.LLM_PRICING_RELOAD_CHANNEL).catch((err) => {
57+
logger.warn("Failed to subscribe to LLM pricing reload channel", {
58+
channel: env.LLM_PRICING_RELOAD_CHANNEL,
59+
error: err instanceof Error ? err.message : String(err),
60+
});
5761
});
58-
});
5962

60-
// Coalesce reload calls so a burst of publishes only triggers one reload.
61-
// A reload always fires within LLM_PRICING_RELOAD_DEBOUNCE_MS of the first
62-
// publish in a burst; subsequent publishes during that window are no-ops
63-
// because the trailing-edge reload will pick up everything when it queries
64-
// the DB. Bounds reload rate to at most 1 / debounce-window regardless of
65-
// how chatty the publisher is.
66-
const debounceMs = env.LLM_PRICING_RELOAD_DEBOUNCE_MS;
67-
let pendingReloadTimer: NodeJS.Timeout | null = null;
68-
69-
function scheduleReload() {
70-
if (pendingReloadTimer) return;
71-
pendingReloadTimer = setTimeout(() => {
72-
pendingReloadTimer = null;
73-
registry.reload().catch((err) => {
74-
logger.warn("Failed to reload LLM pricing registry from pub/sub", {
75-
error: err instanceof Error ? err.message : String(err),
63+
// Coalesce reload calls so a burst of publishes only triggers one
64+
// reload. The first publish schedules a reload at
65+
// T+LLM_PRICING_RELOAD_DEBOUNCE_MS; subsequent publishes during that
66+
// window are no-ops because the trailing reload picks up everything
67+
// when it queries the DB. Bounds reload rate to at most 1 per debounce
68+
// window regardless of publisher chattiness.
69+
const debounceMs = env.LLM_PRICING_RELOAD_DEBOUNCE_MS;
70+
let pendingReloadTimer: NodeJS.Timeout | null = null;
71+
72+
function scheduleReload() {
73+
if (pendingReloadTimer) return;
74+
pendingReloadTimer = setTimeout(() => {
75+
pendingReloadTimer = null;
76+
registry.reload().catch((err) => {
77+
logger.warn("Failed to reload LLM pricing registry from pub/sub", {
78+
error: err instanceof Error ? err.message : String(err),
79+
});
7680
});
77-
});
78-
}, debounceMs);
79-
}
81+
}, debounceMs);
82+
}
8083

81-
subscriber.on("message", (channel) => {
82-
if (channel !== env.LLM_PRICING_RELOAD_CHANNEL) return;
83-
scheduleReload();
84-
});
84+
subscriber.on("message", (channel) => {
85+
if (channel !== env.LLM_PRICING_RELOAD_CHANNEL) return;
86+
scheduleReload();
87+
});
8588

86-
signalsEmitter.on("SIGTERM", () => {
87-
clearInterval(interval);
88-
if (pendingReloadTimer) clearTimeout(pendingReloadTimer);
89-
void subscriber.quit().catch(() => {});
90-
});
91-
signalsEmitter.on("SIGINT", () => {
92-
clearInterval(interval);
93-
if (pendingReloadTimer) clearTimeout(pendingReloadTimer);
94-
void subscriber.quit().catch(() => {});
95-
});
89+
signalsEmitter.on("SIGTERM", () => {
90+
clearInterval(interval);
91+
if (pendingReloadTimer) clearTimeout(pendingReloadTimer);
92+
void subscriber.quit().catch(() => {});
93+
});
94+
signalsEmitter.on("SIGINT", () => {
95+
clearInterval(interval);
96+
if (pendingReloadTimer) clearTimeout(pendingReloadTimer);
97+
void subscriber.quit().catch(() => {});
98+
});
99+
} else {
100+
signalsEmitter.on("SIGTERM", () => clearInterval(interval));
101+
signalsEmitter.on("SIGINT", () => clearInterval(interval));
102+
}
96103

97104
return registry;
98105
});

0 commit comments

Comments
 (0)