Skip to content

Commit d3c03f8

Browse files
committed
fix(data-drains): extract sleepUntilAborted, honor abort across all destinations
1 parent f56c6a2 commit d3c03f8

6 files changed

Lines changed: 38 additions & 60 deletions

File tree

apps/sim/lib/data-drains/destinations/bigquery.ts

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { createLogger } from '@sim/logger'
22
import { toError } from '@sim/utils/errors'
33
import { JWT } from 'google-auth-library'
44
import { z } from 'zod'
5+
import { sleepUntilAborted } from '@/lib/data-drains/destinations/utils'
56
import type { DeliveryMetadata, DrainDestination } from '@/lib/data-drains/types'
67

78
const logger = createLogger('DataDrainBigQueryDestination')
@@ -215,21 +216,6 @@ const RETRYABLE_STATUSES = new Set([408, 429, 500, 502, 503, 504])
215216
const MAX_RETRY_ATTEMPTS = 3
216217
const BASE_RETRY_DELAY_MS = 250
217218

218-
function sleepUntilAborted(ms: number, signal: AbortSignal): Promise<void> {
219-
if (signal.aborted) return Promise.resolve()
220-
return new Promise((resolve) => {
221-
const onAbort = () => {
222-
clearTimeout(timeoutId)
223-
resolve()
224-
}
225-
const timeoutId = setTimeout(() => {
226-
signal.removeEventListener('abort', onAbort)
227-
resolve()
228-
}, ms)
229-
signal.addEventListener('abort', onAbort, { once: true })
230-
})
231-
}
232-
233219
function parseRetryAfter(header: string | null): number | null {
234220
if (!header) return null
235221
const seconds = Number(header)

apps/sim/lib/data-drains/destinations/datadog.ts

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { gzipSync } from 'node:zlib'
22
import { createLogger } from '@sim/logger'
33
import { toError } from '@sim/utils/errors'
44
import { z } from 'zod'
5+
import { sleepUntilAborted } from '@/lib/data-drains/destinations/utils'
56
import type { DeliveryMetadata, DrainDestination } from '@/lib/data-drains/types'
67

78
const logger = createLogger('DataDrainDatadogDestination')
@@ -70,7 +71,7 @@ function buildEndpoint(site: DatadogSite): string {
7071
function parseNdjson(body: Buffer): unknown[] {
7172
const text = body.toString('utf8')
7273
const rows: unknown[] = []
73-
for (const line of text.split('\n')) {
74+
for (const line of text.split(/\r?\n/)) {
7475
if (line.length === 0) continue
7576
rows.push(JSON.parse(line))
7677
}
@@ -129,21 +130,6 @@ function backoffWithJitter(attempt: number, retryAfterMs?: number): number {
129130
return exponential * (0.8 + Math.random() * 0.4)
130131
}
131132

132-
function sleepUntilAborted(ms: number, signal: AbortSignal): Promise<void> {
133-
if (signal.aborted) return Promise.resolve()
134-
return new Promise((resolve) => {
135-
const onAbort = () => {
136-
clearTimeout(timeoutId)
137-
resolve()
138-
}
139-
const timeoutId = setTimeout(() => {
140-
signal.removeEventListener('abort', onAbort)
141-
resolve()
142-
}, ms)
143-
signal.addEventListener('abort', onAbort, { once: true })
144-
})
145-
}
146-
147133
interface PreparedBody {
148134
body: Uint8Array | string
149135
headers: Record<string, string>

apps/sim/lib/data-drains/destinations/gcs.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { createLogger } from '@sim/logger'
22
import { toError } from '@sim/utils/errors'
3-
import { sleep } from '@sim/utils/helpers'
43
import { generateShortId } from '@sim/utils/id'
54
import { JWT } from 'google-auth-library'
65
import { z } from 'zod'
6+
import { sleepUntilAborted } from '@/lib/data-drains/destinations/utils'
77
import type { DrainDestination } from '@/lib/data-drains/types'
88

99
const logger = createLogger('DataDrainGCSDestination')
@@ -229,7 +229,7 @@ async function fetchWithRetry(input: RetryRequestInput): Promise<void> {
229229
error: toError(error).message,
230230
})
231231
if (attempt < MAX_ATTEMPTS) {
232-
await sleep(backoffMs(attempt))
232+
await sleepUntilAborted(backoffMs(attempt), input.signal)
233233
continue
234234
}
235235
throw error
@@ -249,7 +249,7 @@ async function fetchWithRetry(input: RetryRequestInput): Promise<void> {
249249
}
250250
lastError = new Error(`GCS ${input.action} responded with HTTP ${response.status}`)
251251
const retryAfterMs = parseRetryAfter(response.headers.get('retry-after'))
252-
await sleep(backoffMs(attempt, retryAfterMs))
252+
await sleepUntilAborted(backoffMs(attempt, retryAfterMs), input.signal)
253253
}
254254
throw lastError instanceof Error
255255
? lastError

apps/sim/lib/data-drains/destinations/snowflake.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { createHash, createPublicKey } from 'node:crypto'
22
import { createLogger } from '@sim/logger'
33
import { toError } from '@sim/utils/errors'
4-
import { sleep } from '@sim/utils/helpers'
54
import { importPKCS8, SignJWT } from 'jose'
65
import { z } from 'zod'
6+
import { sleepUntilAborted } from '@/lib/data-drains/destinations/utils'
77
import type { DrainDestination } from '@/lib/data-drains/types'
88

99
const logger = createLogger('DataDrainSnowflakeDestination')
@@ -179,16 +179,22 @@ function parseNdjson(body: Buffer): string[] {
179179
* Parses a Retry-After header. Supports both delta-seconds and HTTP-date
180180
* formats. Returns the delay in milliseconds, or `null` if not parseable.
181181
*/
182+
/** Cap Retry-After to keep cancellation latency bounded, matching the other destinations. */
183+
const RETRY_AFTER_MAX_MS = 30_000
184+
182185
function parseRetryAfter(header: string | null): number | null {
183186
if (!header) return null
184187
const trimmed = header.trim()
185188
if (trimmed.length === 0) return null
186189
const seconds = Number(trimmed)
187-
if (Number.isFinite(seconds) && seconds >= 0) return Math.floor(seconds * 1000)
190+
if (Number.isFinite(seconds) && seconds >= 0) {
191+
return Math.min(Math.floor(seconds * 1000), RETRY_AFTER_MAX_MS)
192+
}
188193
const dateMs = Date.parse(trimmed)
189194
if (!Number.isNaN(dateMs)) {
190195
const delta = dateMs - Date.now()
191-
return delta > 0 ? delta : 0
196+
if (delta <= 0) return 0
197+
return Math.min(delta, RETRY_AFTER_MAX_MS)
192198
}
193199
return null
194200
}
@@ -254,7 +260,7 @@ async function executeStatement(input: ExecuteInput): Promise<void> {
254260
error: toError(error).message,
255261
})
256262
if (input.signal.aborted || attempt === EXECUTE_MAX_ATTEMPTS) throw error
257-
await sleep(computeBackoffDelay(attempt))
263+
await sleepUntilAborted(computeBackoffDelay(attempt), input.signal)
258264
continue
259265
}
260266
if (response.status === 202) {
@@ -282,7 +288,7 @@ async function executeStatement(input: ExecuteInput): Promise<void> {
282288
status: response.status,
283289
delayMs: delay,
284290
})
285-
await sleep(delay)
291+
await sleepUntilAborted(delay, input.signal)
286292
}
287293
throw lastError ?? new Error('Snowflake request failed after retries')
288294
}
@@ -309,7 +315,7 @@ async function pollStatement(input: PollInput): Promise<void> {
309315
let interval = POLL_INITIAL_INTERVAL_MS
310316
while (Date.now() < deadline) {
311317
if (input.signal.aborted) throw input.signal.reason ?? new Error('Aborted')
312-
await sleep(interval)
318+
await sleepUntilAborted(interval, input.signal)
313319
const response = await fetch(url, {
314320
headers: {
315321
Authorization: `Bearer ${input.jwt}`,
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/**
2+
* Sleep for `ms` milliseconds, resolving early if `signal` aborts. Used by
3+
* destination retry/poll loops so cancelled drain runs do not hang waiting on
4+
* a `setTimeout` that ignores the abort signal.
5+
*/
6+
export function sleepUntilAborted(ms: number, signal: AbortSignal): Promise<void> {
7+
if (signal.aborted) return Promise.resolve()
8+
return new Promise((resolve) => {
9+
const onAbort = () => {
10+
clearTimeout(timeoutId)
11+
resolve()
12+
}
13+
const timeoutId = setTimeout(() => {
14+
signal.removeEventListener('abort', onAbort)
15+
resolve()
16+
}, ms)
17+
signal.addEventListener('abort', onAbort, { once: true })
18+
})
19+
}

apps/sim/lib/data-drains/destinations/webhook.ts

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
secureFetchWithPinnedIP,
88
validateUrlWithDNS,
99
} from '@/lib/core/security/input-validation.server'
10+
import { sleepUntilAborted } from '@/lib/data-drains/destinations/utils'
1011
import type { DeliveryMetadata, DrainDestination } from '@/lib/data-drains/types'
1112

1213
const logger = createLogger('DataDrainWebhookDestination')
@@ -91,26 +92,6 @@ function sign(body: Buffer, secret: string, timestamp: number): string {
9192
return `t=${timestamp},${SIGNATURE_VERSION}=${hmac}`
9293
}
9394

94-
/**
95-
* Resolves after `ms` or as soon as `signal` aborts, whichever happens first.
96-
* The caller checks `signal.aborted` at the top of the next iteration to
97-
* surface the abort — keeping resolution side-effect-free here.
98-
*/
99-
function sleepUntilAborted(ms: number, signal: AbortSignal): Promise<void> {
100-
if (signal.aborted) return Promise.resolve()
101-
return new Promise((resolve) => {
102-
const onAbort = () => {
103-
clearTimeout(timeoutId)
104-
resolve()
105-
}
106-
const timeoutId = setTimeout(() => {
107-
signal.removeEventListener('abort', onAbort)
108-
resolve()
109-
}, ms)
110-
signal.addEventListener('abort', onAbort, { once: true })
111-
})
112-
}
113-
11495
function backoffWithJitter(attempt: number, retryAfterMs?: number): number {
11596
if (retryAfterMs !== undefined) {
11697
// Floor at 500ms so a misbehaving server returning Retry-After: 0 cannot

0 commit comments

Comments
 (0)