Skip to content

Commit 9d9970d

Browse files
committed
prevent duplicate messages by reconnecting and passing in the last sequence ID
1 parent eb70542 commit 9d9970d

File tree

1 file changed

+3
-0
lines changed

1 file changed

+3
-0
lines changed

packages/core/src/v3/inputStreams/manager.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ export class StandardInputStreamManager implements InputStreamManager {
224224

225225
async #runTail(runId: string, streamId: string, signal: AbortSignal): Promise<void> {
226226
try {
227+
const lastSeq = this.seqNums.get(streamId);
227228
const stream = await this.apiClient.fetchStream<unknown>(
228229
runId,
229230
`input/${streamId}`,
@@ -232,6 +233,8 @@ export class StandardInputStreamManager implements InputStreamManager {
232233
baseUrl: this.baseUrl,
233234
// Max allowed by the SSE endpoint is 600s; the tail will reconnect on close
234235
timeoutInSeconds: 600,
236+
// Resume from last seen sequence number to avoid replaying history on reconnect
237+
lastEventId: lastSeq !== undefined ? String(lastSeq) : undefined,
235238
onPart: (part) => {
236239
const seqNum = parseInt(part.id, 10);
237240
if (Number.isFinite(seqNum)) {

0 commit comments

Comments
 (0)