Skip to content

Commit 684fc2e

Browse files
committed
chore(webapp): trim per-org-basin comments
Pass over the basin-related code to drop running commentary and cloud-product-specific phrasing. No behaviour change. - streamBasinProvisioner.server.ts: shorter module + helper docblocks; drop stale "synchronous org-create call site" rationale that no longer applies after the paid-only refactor. - streamBasinRetentionByPlan.server.ts: tighter module doc; collapse the reconcile-transitions narrative into a short table; drop the "cloud-flavored" framing. - v1StreamsGlobal.server.ts: short doc on resolveStreamBasin; drop references to specific operational state. - env.server.ts: terse one-liner per env var; drop sample basin name example. - platform.v3.server.ts, commonWorker.server.ts, runEngine/types.ts + index.ts, triggerTask.server.ts, api.v1.sessions.ts, the two admin routes and the two row-optional session-channel handlers: drop inline rationale paragraphs that re-explained the reconciler / read-precedence chain at every call site.
1 parent f55746d commit 684fc2e

14 files changed

Lines changed: 78 additions & 322 deletions

apps/webapp/app/env.server.ts

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,8 @@ import { isValidDatabaseUrl } from "./utils/db";
55
import { isValidRegex } from "./utils/regex";
66
import { isValidDuration } from "./services/realtime/duration.server";
77

8-
/**
9-
* `z.string()` constrained to a duration string parseable by
10-
* `parseDuration` (e.g. `7d`, `30d`, `365d`, `1h`). Validated at boot
11-
* so a typo'd retention env var fails fast at startup rather than
12-
* lurking until the first basin operation.
13-
*/
8+
// `z.string()` constrained to a `parseDuration`-parseable string (e.g.
9+
// `7d`, `1h`). Validated at boot so a typo'd duration fails fast.
1410
function durationString() {
1511
return z
1612
.string()
@@ -1519,33 +1515,20 @@ const EnvironmentSchema = z
15191515
REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS: z.coerce.number().int().default(100),
15201516
REALTIME_STREAMS_S2_MAX_RETRIES: z.coerce.number().int().default(10),
15211517
REALTIME_STREAMS_S2_WAIT_SECONDS: z.coerce.number().int().default(60),
1522-
/// Per-org basin migration. When "true", the webapp provisions a
1523-
/// dedicated S2 basin per org with plan-tied retention and stamps
1524-
/// `streamBasinName` on new TaskRun / Session rows. OSS / s2-lite
1525-
/// installs leave this off and keep using the single basin defined
1526-
/// by `REALTIME_STREAMS_S2_BASIN`.
1518+
// When "true", provision a dedicated S2 basin per org and stamp
1519+
// `streamBasinName` on new rows. Off keeps everything on the single
1520+
// basin defined by `REALTIME_STREAMS_S2_BASIN`.
15271521
REALTIME_STREAMS_PER_ORG_BASINS_ENABLED: z.enum(["true", "false"]).default("false"),
1528-
/// Naming pattern for per-org basins: `{prefix}-{env}-org-{slug}`
1529-
/// e.g. `triggerdotdev-prod-org-acme-corp`. Cluster + tier shorthand
1530-
/// — kept short to stay under S2's basin-name length limit.
1522+
// Per-org basin name = `{prefix}-{env}-org-{orgId}`.
15311523
REALTIME_STREAMS_BASIN_NAME_PREFIX: z.string().default("triggerdotdev"),
15321524
REALTIME_STREAMS_BASIN_NAME_ENV: z.string().default("dev"),
1533-
/// Default retention for new basins (S2 duration syntax: 7d / 30d / 1y).
1534-
/// Used at org-create and as the fallback when no plan-specific
1535-
/// retention is resolved. Operators that don't run a billing API
1536-
/// only need this one.
15371525
REALTIME_STREAMS_BASIN_DEFAULT_RETENTION: durationString().default("30d"),
1538-
/// Plan-specific retention overrides — only consulted by the
1539-
/// optional `streamBasinRetentionByPlan` shim. Operators that
1540-
/// don't map plans to retention (OSS, self-hosted) can ignore
1541-
/// these and rely on the default above.
1526+
// Plan-specific retention overrides consulted by the
1527+
// streamBasinRetentionByPlan shim only.
15421528
REALTIME_STREAMS_BASIN_RETENTION_FREE: durationString().default("7d"),
15431529
REALTIME_STREAMS_BASIN_RETENTION_HOBBY: durationString().default("30d"),
15441530
REALTIME_STREAMS_BASIN_RETENTION_PRO: durationString().default("365d"),
1545-
/// Storage class applied to per-org basins at create time.
15461531
REALTIME_STREAMS_BASIN_STORAGE_CLASS: z.enum(["express", "standard"]).default("express"),
1547-
/// `delete_on_empty_min_age` applied to per-org basins. Streams
1548-
/// that go empty for this long are reaped automatically.
15491532
REALTIME_STREAMS_BASIN_DELETE_ON_EMPTY_MIN_AGE: durationString().default("1h"),
15501533
REALTIME_STREAMS_DEFAULT_VERSION: z.enum(["v1", "v2"]).default("v1"),
15511534
WAIT_UNTIL_TIMEOUT_MS: z.coerce.number().int().default(600_000),

apps/webapp/app/routes/admin.api.v1.stream-basins.backfill.ts

Lines changed: 6 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,9 @@ import { commonWorker } from "~/v3/commonWorker.server";
77
import { logger } from "~/services/logger.server";
88

99
/**
10-
* One-shot backfill that enqueues `v3.reconcileStreamBasinForOrg` for
11-
* every non-deleted org. The reconciler decides per-org what to do:
12-
* provision a basin for paid orgs that don't have one, reconfigure
13-
* retention for paid orgs whose tier changed, deprovision (null the
14-
* column) for free orgs that were mistakenly provisioned. Idempotent
15-
* — re-running converges to the desired state.
16-
*
17-
* - Admin auth via `requireAdminApiRequest` (PAT in `Authorization`).
18-
* - Refuses to run when `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED=false`
19-
* so OSS / s2-lite installs can't accidentally trigger basin
20-
* operations against a misconfigured backend.
21-
* - `dryRun=true` (default false) returns the count without enqueueing.
22-
* - `limit` (default 1000, max 10000) caps a single invocation. To
23-
* page through more orgs than `limit`, pass `afterOrgId` from the
24-
* previous response's `nextAfterOrgId`.
25-
* - Each job is keyed `reconcileStreamBasin:<orgId>` so concurrent
26-
* calls converge to one job per org.
10+
* Backfill: enqueue `v3.reconcileStreamBasinForOrg` for every
11+
* non-deleted org. Idempotent. Page through `>limit` orgs by passing
12+
* `afterOrgId` from the previous response's `nextAfterOrgId`.
2713
*/
2814

2915
const BodySchema = z
@@ -73,11 +59,8 @@ export async function action({ request }: ActionFunctionArgs) {
7359

7460
const { dryRun, limit, afterOrgId } = parsed;
7561

76-
// Walk every non-deleted org. The reconcile worker is fast for the
77-
// no-op case (free with null column) so enqueueing for all is fine
78-
// — saves us from doing per-org billing lookups here just to filter
79-
// candidates. Cursor on `id` (cuid is sortable) gives stable paging
80-
// across calls; `createdAt` ties get broken by the cursor.
62+
// Reconcile is fast for the no-op case, so we enqueue for all orgs
63+
// rather than filter on plan here.
8164
const candidates = await prisma.organization.findMany({
8265
where: { deletedAt: null },
8366
orderBy: { id: "asc" },
@@ -89,9 +72,6 @@ export async function action({ request }: ActionFunctionArgs) {
8972
const lastReturnedId = candidates[candidates.length - 1]?.id;
9073
const nextAfterOrgId = candidates.length === limit && lastReturnedId ? lastReturnedId : null;
9174

92-
// Orgs still beyond the cursor we just returned. On the final page,
93-
// `lastReturnedId` is undefined (empty result) or the response is short
94-
// of `limit`, so this is 0 — exactly what the caller needs to stop.
9575
const remaining = lastReturnedId
9676
? await prisma.organization.count({
9777
where: { deletedAt: null, id: { gt: lastReturnedId } },
@@ -128,10 +108,6 @@ export async function action({ request }: ActionFunctionArgs) {
128108
}
129109
}
130110

131-
// `remaining` counts orgs strictly past the cursor returned to the
132-
// caller. Enqueue failures don't change this — re-running with the
133-
// same `afterOrgId` would page through the same window and the
134-
// per-org idempotency key keeps it safe.
135111
const response: BackfillResponse = {
136112
ok: true,
137113
dryRun: false,
@@ -151,8 +127,7 @@ export async function action({ request }: ActionFunctionArgs) {
151127
return json(response);
152128
}
153129

154-
// GET returns the current state without doing anything — useful for
155-
// monitoring "is the backfill done yet?" from a dashboard / curl.
130+
// GET: read-only progress — orgs with vs without a basin stamped.
156131
export async function loader({ request }: ActionFunctionArgs) {
157132
await requireAdminApiRequest(request);
158133

apps/webapp/app/routes/admin.api.v1.stream-basins.reconfigure.ts

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,14 @@ import {
99
import { commonWorker } from "~/v3/commonWorker.server";
1010

1111
/**
12-
* Admin trigger for stream-basin reconfiguration. The plan-change path
13-
* in `setPlan` enqueues the same reconcile job automatically when
14-
* billing is wired; this route exists for ops + e2e testing.
12+
* Admin route for forcing a basin reconfigure for an org. Two modes:
1513
*
16-
* - Default (`{ orgId }`): enqueues `v3.reconcileStreamBasinForOrg`,
17-
* the full reconciler. It resolves retention from the org's current
18-
* plan and either provisions, reconfigures, or deprovisions the basin
19-
* to match — including nulling `streamBasinName` if the org is now on
20-
* a free plan. No-op when billing isn't configured (OSS) or when
21-
* `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED=false`.
22-
* - With `retention`: skips the worker queue and the reconciler entirely.
23-
* Calls `reconfigureBasinForOrg` inline with the given duration string
24-
* (e.g. `"7d"`, `"30d"`, `"365d"`, `"1y"`). Useful for validating the
25-
* PATCH wire shape end-to-end and as a manual override (e.g.
26-
* enterprise contracts) — does NOT touch the column or check the plan.
14+
* - `{ orgId }`: enqueues `v3.reconcileStreamBasinForOrg` (the full
15+
* reconciler). May provision, reconfigure, or deprovision based on
16+
* the org's current plan.
17+
* - `{ orgId, retention }`: bypasses the reconciler and PATCHes the
18+
* basin retention inline against the given duration. Doesn't touch
19+
* the column or check the plan.
2720
*/
2821
const BodySchema = z
2922
.object({
@@ -58,9 +51,6 @@ export async function action({ request }: ActionFunctionArgs) {
5851
}
5952

6053
if (parsed.data.retention) {
61-
// Direct, synchronous reconfigure with the explicit retention.
62-
// Skips the worker queue + billing lookup so the PATCH is
63-
// verifiable in the response. Errors surface as 500.
6454
await reconfigureBasinForOrg(parsed.data.orgId, parsed.data.retention);
6555
return json({
6656
ok: true,

apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -123,19 +123,10 @@ const { action, loader } = createActionApiRoute(
123123
// and remove the pending registration.
124124
if (!result.isCached) {
125125
try {
126-
// Session streams are always v2 (S2) — the writer in
127-
// `appendPartToSessionStream` and the SSE subscribe both
128-
// hardcode "v2", so the race-check reader has to match.
129-
// Don't fall through to the run's own `realtimeStreamsVersion`,
130-
// which only describes the run's run-scoped streams.
131-
//
132-
// Resolve basin from `session` only (not `run`). The append-side
133-
// writer in `realtime.v1.sessions.$session.$io.append.ts` passes
134-
// only `{ session }`, and `resolveStreamBasin` prefers `run` over
135-
// `session` when both are present. During the per-org-basin
136-
// migration window, `run.streamBasinName` and
137-
// `session.streamBasinName` can differ — the writes land in the
138-
// session's basin, so the race-check has to read from the same.
126+
// Session streams are hardcoded v2 by the append-side writer
127+
// and SSE subscribe, so the race-check reader matches. Basin
128+
// comes from `session` only — the writer side passes the same
129+
// and we have to read from the same basin to find the record.
139130
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
140131
session: maybeSession,
141132
});

apps/webapp/app/routes/api.v1.sessions.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,6 @@ const { action } = createActionApiRoute(
167167
runtimeEnvironmentId: authentication.environment.id,
168168
environmentType: authentication.environment.type,
169169
organizationId: authentication.environment.organizationId,
170-
// Stamp the org's S2 basin so realtime reads on this
171-
// session's `.in/.out` channels resolve without joining
172-
// Organization. Null until per-org basins are provisioned.
173170
streamBasinName: authentication.environment.organization.streamBasinName,
174171
},
175172
update: { triggerConfig: triggerConfigJson },
@@ -190,9 +187,6 @@ const { action } = createActionApiRoute(
190187
runtimeEnvironmentId: authentication.environment.id,
191188
environmentType: authentication.environment.type,
192189
organizationId: authentication.environment.organizationId,
193-
// Stamp the org's S2 basin so realtime reads on this
194-
// session's `.in/.out` channels resolve without joining
195-
// Organization. Null until per-org basins are provisioned.
196190
streamBasinName: authentication.environment.organization.streamBasinName,
197191
},
198192
});

apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,9 @@ const { action } = createActionApiRoute(
5959
});
6060
}
6161

62-
// When the row is missing (externalId form, row not yet upserted),
63-
// default to the org's current basin instead of falling through to
64-
// the legacy global. A fresh session row would be stamped with the
65-
// org's basin at creation time, so subsequent appends/subscribes
66-
// would resolve to the same place — without this, PUT-returned
67-
// headers point at legacy and the actual writes go to per-org once
68-
// the row exists. Pre-migration rows (row exists, column null) keep
69-
// their existing legacy behaviour because we only fall back to org
70-
// when there's no row at all.
62+
// No-row form: resolve via the org so the stream initialised here
63+
// matches what later appends/subscribes will land on once the row
64+
// is created.
7165
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
7266
session: maybeSession,
7367
organization: maybeSession ? null : authentication.environment.organization,
@@ -134,10 +128,7 @@ const loader = createLoaderApiRoute(
134128
},
135129
},
136130
async ({ params, request, authentication, resource }) => {
137-
// Same row-optional reasoning as the PUT handler above: if no row
138-
// exists yet, resolve via the org's current basin so the SSE
139-
// subscribe lands in the same place that subsequent appends will
140-
// (once the row gets created and stamped).
131+
// Same no-row fallback as PUT above.
141132
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
142133
session: resource.row,
143134
organization: resource.row ? null : authentication.environment.organization,

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -395,10 +395,6 @@ export class RunEngineTriggerTaskService {
395395
bulkActionId: body.options?.bulkActionId,
396396
planType,
397397
realtimeStreamsVersion: options.realtimeStreamsVersion,
398-
// Stamp the org's S2 basin onto the new TaskRun so
399-
// realtime read paths can resolve the basin without
400-
// joining `Organization`. Null in OSS / pre-backfill;
401-
// reads then fall back to the global basin env var.
402398
streamBasinName: environment.organization.streamBasinName,
403399
debounce: body.options?.debounce,
404400
annotations,

apps/webapp/app/services/platform.v3.server.ts

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -434,32 +434,15 @@ export async function setPlan(
434434
}
435435
}
436436

437-
/**
438-
* Best-effort enqueue: when an org's plan changes we reconcile its
439-
* stream-basin state. The reconciler handles every transition:
440-
*
441-
* free → paid: provision a dedicated basin with the plan's retention.
442-
* paid → paid: reconfigure the existing basin's retention.
443-
* paid → free: null `Organization.streamBasinName`. Future runs/sessions
444-
* flow to the shared global basin; the per-org basin
445-
* lingers until existing streams age out on their original
446-
* retention.
447-
* free → free: no-op.
448-
*
449-
* Idempotent and a no-op when per-org basins are disabled or billing
450-
* isn't configured. Failures are logged but never block the plan
451-
* change itself — billing has already accepted by the time we reach
452-
* this code.
453-
*/
437+
// Best-effort: failures are logged but never block the plan change.
438+
// The reconciler is idempotent and re-reads the plan when it runs, so
439+
// concurrent plan changes collapse to one pending job per org.
454440
async function enqueueStreamBasinReconcile(orgId: string) {
455441
try {
456442
const { commonWorker } = await import("~/v3/commonWorker.server");
457443
await commonWorker.enqueue({
458444
job: "v3.reconcileStreamBasinForOrg",
459445
payload: { orgId },
460-
// Per-org dedupe key — concurrent plan changes collapse to one
461-
// pending reconcile job. The job re-reads the current plan when
462-
// it executes, so the latest tier wins.
463446
id: `reconcileStreamBasin:${orgId}`,
464447
});
465448
} catch (error) {

0 commit comments

Comments
 (0)