Skip to content

Commit 31e4753

Browse files
authored
fix(sdk): handle locked ReadableStream when retrying batch trigger (#2917)
When fetch crashes mid-stream during batch item upload (e.g., connection reset, timeout), the request stream may remain locked by fetch's internal reader. Attempting to cancel a locked stream throws 'Invalid state: ReadableStream is locked', causing the batch operation to fail. Added safeStreamCancel() helper that gracefully handles locked streams by catching and ignoring the locked error. The stream will be cleaned up by garbage collection when fetch eventually releases the reader. Fixes customer issue where batchTrigger failed with ReadableStream locked error during network instability.
1 parent 8bc6b99 commit 31e4753

File tree

3 files changed

+89
-8
lines changed

3 files changed

+89
-8
lines changed

.changeset/gentle-streams-flow.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
Fix batch trigger failing with "ReadableStream is locked" error when network failures occur mid-stream. Added safe stream cancellation that gracefully handles locked streams during retry attempts.

packages/core/src/v3/apiClient/index.ts

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -444,14 +444,14 @@ export class ApiClient {
444444

445445
if (retryResult.retry) {
446446
// Cancel the request stream before retry to prevent tee() from buffering
447-
await forRequest.cancel();
447+
await safeStreamCancel(forRequest);
448448
await sleep(retryResult.delay);
449449
// Use the backup stream for retry
450450
return this.#streamBatchItemsWithRetry(batchId, forRetry, retryOptions, attempt + 1);
451451
}
452452

453453
// Not retrying - cancel the backup stream
454-
await forRetry.cancel();
454+
await safeStreamCancel(forRetry);
455455

456456
const errText = await response.text().catch((e) => (e as Error).message);
457457
let errJSON: Object | undefined;
@@ -471,7 +471,7 @@ export class ApiClient {
471471

472472
if (!parsed.success) {
473473
// Cancel backup stream since we're throwing
474-
await forRetry.cancel();
474+
await safeStreamCancel(forRetry);
475475
throw new Error(
476476
`Invalid response from server for batch ${batchId}: ${parsed.error.message}`
477477
);
@@ -484,14 +484,14 @@ export class ApiClient {
484484

485485
if (delay) {
486486
// Cancel the request stream before retry to prevent tee() from buffering
487-
await forRequest.cancel();
487+
await safeStreamCancel(forRequest);
488488
// Retry with the backup stream
489489
await sleep(delay);
490490
return this.#streamBatchItemsWithRetry(batchId, forRetry, retryOptions, attempt + 1);
491491
}
492492

493493
// No more retries - cancel backup stream and throw descriptive error
494-
await forRetry.cancel();
494+
await safeStreamCancel(forRetry);
495495
throw new BatchNotSealedError({
496496
batchId,
497497
enqueuedCount: parsed.data.enqueuedCount ?? 0,
@@ -502,7 +502,7 @@ export class ApiClient {
502502
}
503503

504504
// Success - cancel the backup stream to release resources
505-
await forRetry.cancel();
505+
await safeStreamCancel(forRetry);
506506

507507
return parsed.data;
508508
} catch (error) {
@@ -519,13 +519,13 @@ export class ApiClient {
519519
const delay = calculateNextRetryDelay(retryOptions, attempt);
520520
if (delay) {
521521
// Cancel the request stream before retry to prevent tee() from buffering
522-
await forRequest.cancel();
522+
await safeStreamCancel(forRequest);
523523
await sleep(delay);
524524
return this.#streamBatchItemsWithRetry(batchId, forRetry, retryOptions, attempt + 1);
525525
}
526526

527527
// No more retries - cancel the backup stream
528-
await forRetry.cancel();
528+
await safeStreamCancel(forRetry);
529529

530530
// Wrap in a more descriptive error
531531
const cause = error instanceof Error ? error : new Error(String(error));
@@ -1731,6 +1731,30 @@ function sleep(ms: number): Promise<void> {
17311731
return new Promise((resolve) => setTimeout(resolve, ms));
17321732
}
17331733

1734+
/**
1735+
* Safely cancels a ReadableStream, handling the case where it might be locked.
1736+
*
1737+
* When fetch uses a ReadableStream as a request body and an error occurs mid-transfer
1738+
* (connection reset, timeout, etc.), the stream may remain locked by fetch's internal reader.
1739+
* Attempting to cancel a locked stream throws "Invalid state: ReadableStream is locked".
1740+
*
1741+
* This function gracefully handles that case by catching the error and doing nothing -
1742+
* the stream will be cleaned up by garbage collection when the reader is released.
1743+
*/
1744+
async function safeStreamCancel(stream: ReadableStream<unknown>): Promise<void> {
1745+
try {
1746+
await stream.cancel();
1747+
} catch (error) {
1748+
// Ignore "locked" errors - the stream will be cleaned up when the reader is released.
1749+
// This happens when fetch crashes mid-read and doesn't release the reader lock.
1750+
if (error instanceof TypeError && String(error).includes("locked")) {
1751+
return;
1752+
}
1753+
// Re-throw unexpected errors
1754+
throw error;
1755+
}
1756+
}
1757+
17341758
// ============================================================================
17351759
// NDJSON Stream Helpers
17361760
// ============================================================================

packages/core/src/v3/apiClient/streamBatchItems.test.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,58 @@ describe("streamBatchItems stream cancellation on retry", () => {
398398
expect(cancelCallCount).toBeGreaterThanOrEqual(1);
399399
});
400400

401+
it("handles locked stream when connection error occurs mid-read", async () => {
402+
// This test simulates the real-world scenario where fetch throws an error
403+
// while still holding the reader lock on the request body stream.
404+
// This can happen with connection resets, timeouts, or network failures.
405+
let callIndex = 0;
406+
407+
const mockFetch = vi.fn().mockImplementation(async (_url: string, init?: RequestInit) => {
408+
const currentAttempt = callIndex;
409+
callIndex++;
410+
411+
if (init?.body && init.body instanceof ReadableStream) {
412+
if (currentAttempt === 0) {
413+
// First attempt: Get a reader and start reading, but throw while still holding the lock.
414+
// This simulates a connection error that happens mid-transfer.
415+
const reader = init.body.getReader();
416+
await reader.read(); // Start reading
417+
// DON'T release the lock - this simulates fetch crashing mid-read
418+
throw new TypeError("Connection reset by peer");
419+
}
420+
421+
// Subsequent attempts: consume and release normally
422+
await consumeAndRelease(init.body);
423+
}
424+
425+
// Second attempt: success
426+
return {
427+
ok: true,
428+
json: () =>
429+
Promise.resolve({
430+
id: "batch_test123",
431+
itemsAccepted: 10,
432+
itemsDeduplicated: 0,
433+
sealed: true,
434+
}),
435+
};
436+
});
437+
globalThis.fetch = mockFetch;
438+
439+
const client = new ApiClient("http://localhost:3030", "tr_test_key");
440+
441+
// This should NOT throw "ReadableStream is locked" error
442+
// Instead it should gracefully handle the locked stream and retry
443+
const result = await client.streamBatchItems(
444+
"batch_test123",
445+
[{ index: 0, task: "test-task", payload: "{}" }],
446+
{ retry: { maxAttempts: 3, minTimeoutInMs: 10, maxTimeoutInMs: 50 } }
447+
);
448+
449+
expect(result.sealed).toBe(true);
450+
expect(mockFetch).toHaveBeenCalledTimes(2);
451+
});
452+
401453
it("does not leak memory by leaving tee branches unconsumed during multiple retries", async () => {
402454
let cancelCallCount = 0;
403455
let callIndex = 0;

0 commit comments

Comments
 (0)