Skip to content

Commit b065509

Browse files
committed
fix(webapp): row-optional session-channel routes default to org basin
PUT /realtime/v1/sessions/:session/:io and the SSE GET loader on the same path are row-optional — `:session` may be a `chatId` (externalId) that hasn't been upserted yet. When the row is missing, both used to fall through `resolveStreamBasin` to the legacy global basin. If the row was then created with a per-org basin stamp, follow-up appends and SSE subscribes resolved to per-org while the PUT-returned headers still pointed at legacy — caller writes via those headers landed in the wrong place. Resolve via the org's current basin when the row is absent. A fresh session row would be stamped with that same basin at create time, so all subsequent ops converge. Pre-migration rows (row exists, column null) keep their legacy fallback because `organization` is only passed in the no-row branch — `session.streamBasinName === null` still falls through to the env var, not to the org column. Verified by curl: PUT against a fresh externalId for an org with a per-org basin returns `X-S2-Basin: triggerdotdev-dev-org-<orgId>`; same call for a free-org key still returns the legacy basin.
1 parent 871b993 commit b065509

2 files changed

Lines changed: 26 additions & 1 deletion

File tree

apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,18 @@ const { action } = createActionApiRoute(
5959
});
6060
}
6161

62+
// When the row is missing (externalId form, row not yet upserted),
63+
// default to the org's current basin instead of falling through to
64+
// the legacy global. A fresh session row would be stamped with the
65+
// org's basin at creation time, so subsequent appends/subscribes
66+
// would resolve to the same place — without this, PUT-returned
67+
// headers point at legacy and the actual writes go to per-org once
68+
// the row exists. Pre-migration rows (row exists, column null) keep
69+
// their existing legacy behaviour because we only fall back to org
70+
// when there's no row at all.
6271
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
6372
session: maybeSession,
73+
organization: maybeSession ? null : authentication.environment.organization,
6474
});
6575

6676
if (!(realtimeStream instanceof S2RealtimeStreams)) {
@@ -124,8 +134,13 @@ const loader = createLoaderApiRoute(
124134
},
125135
},
126136
async ({ params, request, authentication, resource }) => {
137+
// Same row-optional reasoning as the PUT handler above: if no row
138+
// exists yet, resolve via the org's current basin so the SSE
139+
// subscribe lands in the same place that subsequent appends will
140+
// (once the row gets created and stamped).
127141
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
128142
session: resource.row,
143+
organization: resource.row ? null : authentication.environment.organization,
129144
});
130145

131146
if (!(realtimeStream instanceof S2RealtimeStreams)) {

apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,27 +34,37 @@ export const v1RealtimeStreams = singleton("realtimeStreams", initializeRedisRea
3434
*
3535
* 1. `run.streamBasinName` (set at trigger time, immutable per-run)
3636
* 2. `session.streamBasinName` (set at session create time)
37-
* 3. `REALTIME_STREAMS_S2_BASIN` (the legacy / OSS / pre-backfill global)
37+
* 3. `organization.streamBasinName` (current org basin — only useful
38+
* when neither a run nor a session row exists yet, e.g. PUT init
39+
* against an externalId before the row is created)
40+
* 4. `REALTIME_STREAMS_S2_BASIN` (the legacy / OSS / pre-backfill global)
3841
*
3942
* Old runs / sessions that pre-date the per-org-basins migration carry
4043
* `null` columns and fall through to the global basin, which is the
4144
* one their streams were originally created in. Once the legacy basin
4245
* drains via S2 retention (~30d on prod today), this fallback can be
4346
* dropped — but it's cheap to keep as a safety net.
4447
*
48+
* Callers should only pass `organization` when they know the row-bearing
49+
* ref is absent (not when its column is null) — otherwise a pre-migration
50+
* row's null column would short-circuit to the org's *current* basin
51+
* instead of the legacy one its streams actually live in.
52+
*
4553
* OSS / s2-lite installs always hit the global path because the
4654
* provisioner is gated by `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED`
4755
* and `streamBasinName` is never written.
4856
*/
4957
export type StreamBasinContext = {
5058
run?: { streamBasinName: string | null } | null;
5159
session?: { streamBasinName: string | null } | null;
60+
organization?: { streamBasinName: string | null } | null;
5261
};
5362

5463
export function resolveStreamBasin(ctx: StreamBasinContext): string | undefined {
5564
return (
5665
ctx.run?.streamBasinName ??
5766
ctx.session?.streamBasinName ??
67+
ctx.organization?.streamBasinName ??
5868
env.REALTIME_STREAMS_S2_BASIN ??
5969
undefined
6070
);

0 commit comments

Comments
 (0)