Skip to content

Commit 5629d2b

Browse files
committed
fix(sdk): drop session-stream auto-reconnect after explicit disconnect
`StandardSessionStreamManager#ensureTailConnected` re-subscribes the SSE tail in `.finally` whenever handlers or once-waiters remain on the key. That's the right move for unexpected tail crashes, but wrong when `session.in.wait()` calls `disconnectStream` to suspend the run via a waitpoint: the run-level `stopInput.on(...)` registered at the top of the `chat.agent` loop keeps the handlers set non-empty, so during the suspend window the tail silently resurrects, the next user message arrives at S2, the tail dispatches it, `stopSub`'s "kind === stop" filter rejects it, the data falls into the buffer, the waitpoint *also* delivers the same record, the SDK resumes, and on the next turn's `messagesInput.on(...)` registration the buffer drain re-fires the handler — `pendingMessages` ends up holding a duplicate of the just-consumed message and the loop runs an extra LLM turn with identical content. Track explicit teardown via `explicitlyDisconnected: Set<string>`. `disconnectStream` adds the key, `.finally` bails when set, `on()` / `once()` clear it so future re-attaches reconnect normally. Honors `wait()`'s expectation that explicit disconnect ⇒ no records buffered or delivered until a fresh `on()`/`once()`, while preserving auto-reconnect for legitimate failures (network drops, etc.). Verified end-to-end against a `chat.agent` reproduction that previously fired three turns per submitted message after suspend; with the fix exactly one turn per message, single LLM call, single persisted assistant reply. Trivial: `wait()` extracts `nextSeq` to a local for readability.
1 parent d221490 commit 5629d2b

2 files changed

Lines changed: 32 additions & 1 deletion

File tree

packages/core/src/v3/sessionStreams/manager.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ export class StandardSessionStreamManager implements SessionStreamManager {
3636
private onceWaiters = new Map<string, OnceWaiter[]>();
3737
private buffer = new Map<string, unknown[]>();
3838
private tails = new Map<string, TailState>();
39+
// Keys that were explicitly torn down by `disconnectStream`. The tail's
40+
// `.finally` reconnect path checks this so a long-lived persistent handler
41+
// (e.g. `chat.agent`'s run-level `stopInput.on(...)`) doesn't silently
42+
// resurrect the tail mid-`session.in.wait()` and re-deliver the record
43+
// that's already being delivered out-of-band via the waitpoint.
44+
private explicitlyDisconnected = new Set<string>();
3945
private seqNums = new Map<string, number>();
4046

4147
constructor(
@@ -58,6 +64,9 @@ export class StandardSessionStreamManager implements SessionStreamManager {
5864
}
5965
handlerSet.add(handler);
6066

67+
// Explicit re-attach clears the "explicitly disconnected" suppression
68+
// so the tail can subscribe again now that callers want delivery back.
69+
this.explicitlyDisconnected.delete(key);
6170
this.#ensureTailConnected(sessionId, io);
6271

6372
const buffered = this.buffer.get(key);
@@ -85,6 +94,7 @@ export class StandardSessionStreamManager implements SessionStreamManager {
8594
): InputStreamOncePromise<unknown> {
8695
const key = keyFor(sessionId, io);
8796

97+
this.explicitlyDisconnected.delete(key);
8898
this.#ensureTailConnected(sessionId, io);
8999

90100
const buffered = this.buffer.get(key);
@@ -168,6 +178,12 @@ export class StandardSessionStreamManager implements SessionStreamManager {
168178
disconnectStream(sessionId: string, io: SessionChannelIO): void {
169179
const key = keyFor(sessionId, io);
170180
const tail = this.tails.get(key);
181+
const bufferedSize = this.buffer.get(key)?.length ?? 0;
182+
// Mark as explicitly disconnected BEFORE we abort, so the tail's
183+
// `.finally` reconnect path sees the flag when it runs (which can be
184+
// synchronous in the AbortError catch). Cleared on the next explicit
185+
// `on()`/`once()`.
186+
this.explicitlyDisconnected.add(key);
171187
if (tail) {
172188
tail.abortController.abort();
173189
this.tails.delete(key);
@@ -223,6 +239,20 @@ export class StandardSessionStreamManager implements SessionStreamManager {
223239
.finally(() => {
224240
this.tails.delete(key);
225241

242+
// If the tail was torn down explicitly via `disconnectStream`,
243+
// honor that — the caller (typically `session.in.wait()`) is
244+
// suspending the run and expects no records to be buffered or
245+
// delivered until a fresh `on()` / `once()` re-attaches. Without
246+
// this guard a run-level persistent handler (e.g. `chat.agent`'s
247+
// `stopInput.on(...)`) would auto-reconnect during the suspend
248+
// window, the resurrected tail would receive the same record the
249+
// waitpoint just delivered, and that record would land in the
250+
// buffer where the next turn's `messagesInput.on(...)` drains it
251+
// and runs a duplicate turn.
252+
if (this.explicitlyDisconnected.has(key)) {
253+
return;
254+
}
255+
226256
const hasHandlers = this.handlers.has(key) && this.handlers.get(key)!.size > 0;
227257
const hasWaiters =
228258
this.onceWaiters.has(key) && this.onceWaiters.get(key)!.length > 0;

packages/trigger-sdk/src/v3/sessions.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,8 @@ export class SessionInputChannel {
617617
// Advance the seq counter so the SSE tail doesn't replay the
618618
// record that was consumed via the waitpoint.
619619
const prevSeq = sessionStreams.lastSeqNum(this.sessionId, "in");
620-
sessionStreams.setLastSeqNum(this.sessionId, "in", (prevSeq ?? -1) + 1);
620+
const nextSeq = (prevSeq ?? -1) + 1;
621+
sessionStreams.setLastSeqNum(this.sessionId, "in", nextSeq);
621622

622623
return { ok: true as const, output: data as T };
623624
} else {

0 commit comments

Comments
 (0)