Skip to content

Commit 39f74aa

Browse files
authored
feat(data-drains): add GCS, Azure Blob, BigQuery, Snowflake, and Datadog destinations (#4552)
* feat(data-drains): add GCS, Azure Blob, BigQuery, Snowflake, and Datadog destinations * fix(data-drains): address PR review comments * fix(data-drains): extract sleepUntilAborted, honor abort across all destinations * fix(data-drains): widen BigQuery projectId max and dedupe parseServiceAccount * fix(data-drains): tighten GCS bucket contract and expose Azure endpointSuffix * improvement(data-drains): extract normalizePrefix and buildObjectKey to shared utils * fix(data-drains): retry BigQuery network errors; tighten Azure accountKey contract - BigQuery insertAll now wraps the fetch in try/catch inside the retry loop so DNS failures, socket resets, and timeouts are retried with backoff instead of propagating immediately. - Align azureBlobCredentialsBodySchema with the runtime schema (min 64 / max 120 / base64 regex) so obviously invalid keys are rejected at the API boundary rather than at drain-run time. * improvement(data-drains): consolidate parseRetryAfter; add Datadog NDJSON line context - Extract a single parseRetryAfter helper (capped at 30s, returns number | null) into lib/data-drains/destinations/utils.ts and remove the five local copies in bigquery, datadog, gcs, snowflake, and webhook. - Datadog parseNdjson now wraps JSON.parse in try/catch and surfaces the failing line index, matching BigQuery's parser. * fix(data-drains): correct Datadog size guard and Snowflake VARIANT limit - Datadog payload guard now checks the uncompressed size against the 5 MB limit and the wire size against the 6 MB compressed limit, so gzip cannot smuggle an oversized body past the client-side check. - Snowflake VARIANT limit is 16 MiB (16,777,216 bytes), not 16,000,000 bytes — small payloads between 16 MB and 16 MiB were being rejected unnecessarily. - Drop the unused apiKey field on Datadog PostInput; the key is already embedded in the prepared request headers. * improvement(data-drains): consolidate backoffWithJitter into shared utils Datadog, GCS, and webhook each had byte-identical backoff helpers (BASE 500ms, MAX 30s, jitter ±20%, Retry-After floor). Lift the helper into lib/data-drains/destinations/utils.ts alongside parseRetryAfter and sleepUntilAborted, and drop the per-file copies and their BASE_BACKOFF_MS/MAX_BACKOFF_MS constants. * fix(data-drains): align destinations with live provider specs Audited every destination against live AWS/GCS/Azure/BigQuery/Snowflake/ Datadog/webhook docs and applied spec-correctness fixes: - S3: reserved bucket prefix amzn-s3-demo-, suffixes --x-s3/--table-s3; metadata byte formula excludes x-amz-meta- prefix per AWS spec - GCS: reject -./.- adjacency; UTF-8 prefix cap; forbid .well-known/ acme-challenge/ prefix; ASCII-only x-goog-meta-* enforcement - BigQuery: insertId is 128 chars (not bytes); split DATASET_RE (ASCII) and TABLE_RE (Unicode L/M/N + connectors); UTF-8 byte cap on tableId - Snowflake: disambiguate org-account vs legacy locator account formats; requestId+retry=true for idempotent retries; server-side timeout=600; default column DATA uppercase to match unquoted canonical form - Azure: endpoint suffix allowlist (4 sovereign clouds); accountKey length(88) base64 - Webhook: url max(2048); CRLF/NUL rejection on bearer/secret/sig header * fix(data-drains): address PR review on snowflake poll + shared NDJSON parsing - snowflake pollStatement: per-attempt timeout via AbortSignal.any, retry on 429/5xx with Retry-After + jitter - bigquery parseNdjson error messages now 1-indexed - consolidate parseNdjson variants into shared parseNdjsonLines/parseNdjsonObjects in utils * fix(data-drains): per-attempt fetch timeouts in gcs/bigquery, snowflake poll double-sleep - gcs.fetchWithRetry + bigquery.postInsertAll now use AbortSignal.any with a per-attempt timeout so a hung TCP connection cannot stall the drain - snowflake.pollStatement skips the next interval sleep when it just slept for retry backoff * fix(data-drains): bigquery probe timeout + jittered retries, align Snowflake column default UI/docs - bigquery test() probe now uses AbortSignal.any + per-attempt timeout - bigquery insertAll retry switches to backoffWithJitter for thundering-herd avoidance - Snowflake column placeholder + docs say DATA (uppercase) to match the code default * fix(data-drains): mirror webhook signingSecret min length in form gate isComplete now requires signingSecret >= 32 to match the contract/runtime schema so the Save button can't enable on a value that will fail server-side. * fix(data-drains): validate JSON client-side for Snowflake before binding Switch Snowflake to parseNdjsonObjects so malformed rows are caught locally with 1-indexed line numbers instead of failing the whole INSERT server-side. Re-stringify each parsed object before binding to PARSE_JSON(?). Drop the now-unused parseNdjsonLines helper. * fix(data-drains): cross-cutting audit pass against live provider docs - Azure: bound retryOptions on BlobServiceClient (SDK default tryTimeoutInMs is per-try unbounded; cap at 30s x 5 tries) - Webhook contract: mirror runtime — signingSecret.max(512), bearerToken.max(4096) + CRLF/NUL refine, signatureHeader charset + CRLF/NUL refine - S3 (lib + contract): reject bucket names with dash adjacent to dot; require https:// endpoint at the schema layer - Snowflake: bind original NDJSON line bytes (re-stringifying a JSON.parse'd value loses bigint precision beyond 2^53-1); check pollStatement 200 body for the SQL error envelope (sqlState/code) - Datadog: entry builder writes defaults first then user attrs then forced ddtags/message so user rows can't clobber routing fields; validate config.tags as comma-separated key:value pairs - registry.tsx: tighten isComplete predicates to mirror contract minimums (GCS bucket >= 3, Azure containerName >= 3 / accountKey === 88, BigQuery projectId >= 6, Snowflake account >= 3) * fix(data-drains): force ddsource/service overrides on Datadog entries Previous fix placed ddsource/service before ...attrs, leaving them clobberable by a user row field. Per Datadog docs, service + ddsource pick the processing pipeline, so a drain's routing config must not be overridable per-row. Spread attrs first, then force all four reserved fields (ddsource, service, ddtags, message). * fix(data-drains): preserve row-distinguishing index when BigQuery insertId overflows Truncating from the left dropped the index suffix, so any overflow would collapse all rows in a chunk to the same insertId and BigQuery would silently dedupe them. Path is unreachable today (UUIDs keep raw ~85 chars), but the overflow branch is now correct: hash the prefix, keep the index intact. * fix(data-drains): refresh GCS token per retry, tighten Azure key regex - gcs: rebuild Authorization header per attempt via buildHeaders so token refresh from google-auth-library kicks in if a 5xx retry crosses the hour-long token lifetime - azure_blob: pin account-key regex to {0,2} trailing '=' (base64 of 64 bytes = exactly 88 chars with up to two '=' pad chars) * fix(data-drains): address bugbot review of 6336948 - gcs: allow 1-char dot-separated bucket components (e.g. "a.bucket") to match GCS naming rules — overall name is 3-63 (or up to 222 with dots), but per-component minimum is 1 per Google's spec - bigquery: drain the 401 response body before re-issuing the request with a refreshed token so undici can return the socket to the keep-alive pool - snowflake: hoist getJwt() above the perAttempt timer in executeStatement so JWT signing doesn't eat the network budget (matches the order already used in pollStatement) * fix(data-drains): allow org-account Snowflake identifier with region suffix The account validation rejected `<orgname>-<acctname>.<region>.<cloud>` because `ACCOUNT_LOCATOR_RE`'s first segment forbade hyphens, while `ACCOUNT_ORG_RE` forbade dots. `normalizeAccountForJwt` already handles this composite form. Widen the first segment of `ACCOUNT_LOCATOR_RE` to allow hyphens so the boundary contract and the runtime schema accept what the JWT layer was already designed to process. * fix(data-drains): drain retryable response bodies in datadog/gcs loops Mirrors the bigquery 401 fix. Without consuming the body before sleeping, undici can't return the socket to the keep-alive pool, so each retry leaks a TCP connection instead of reusing it. * fix(data-drains): drain snowflake poll bodies on 202 and retryable status Mirrors the bigquery/datadog/gcs drains. Long async statements can poll many times against the same connection; without consuming the body undici can't return the socket to the keep-alive pool, so each iteration leaks a connection until GC. * fix(data-drains): consume success bodies; check Snowflake sqlState on 200 - gcs: drain the body on success paths so undici can return the socket to the keep-alive pool - snowflake: drain the body on synchronous 200 OK and run the same sqlState envelope check pollStatement already does — otherwise a statement-level failure that completes synchronously would silently return success * fix(data-drains): drain datadog and bigquery probe success bodies Same undici keep-alive issue as the prior fixes: postWithRetries returned the Response on success without draining (callers only read headers); the BigQuery `test()` probe returned without consuming the body. Both now drain before returning. * chore(data-drains): regenerate enum migration as 0206 after staging rebase * fix(data-drains): cap snowflake poll retries; tighten datadog tags min length
1 parent 0b2cfaf commit 39f74aa

24 files changed

Lines changed: 19784 additions & 151 deletions

File tree

apps/docs/content/docs/en/enterprise/data-drains.mdx

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
---
22
title: Data Drains
3-
description: Continuously export workflow logs, audit logs, and Mothership data to your own S3 bucket or HTTPS endpoint on a schedule
3+
description: Continuously export workflow logs, audit logs, and Mothership data to your own object store, data warehouse, observability platform, or HTTPS endpoint on a schedule
44
---
55

66
import { FAQ } from '@/components/ui/faq'
77

8-
Data Drains let organization owners and admins on Enterprise plans continuously export Sim data to a destination they control — a customer-owned S3 bucket or an HTTPS webhook. A drain runs on a schedule, picks up only new rows since its last successful run, and writes them as NDJSON to the destination. Viewing drain configuration and run history is restricted to owners and admins as well, since destinations expose internal bucket names and webhook URLs.
8+
Data Drains let organization owners and admins on Enterprise plans continuously export Sim data to a destination they control — a customer-owned S3 bucket, Google Cloud Storage bucket, Azure Blob container, BigQuery table, Snowflake table, Datadog logs intake, or an HTTPS webhook. A drain runs on a schedule, picks up only new rows since its last successful run, and writes them to the destination. Viewing drain configuration and run history is restricted to owners and admins as well, since destinations expose internal bucket names, table identifiers, and webhook URLs.
99

1010
Drains are independent of [Data Retention](/enterprise/data-retention) but designed to compose with it — see [Pairing with Data Retention](#pairing-with-data-retention) below.
1111

@@ -67,6 +67,62 @@ Object keys are deterministic:
6767

6868
Objects are written with `AES256` server-side encryption.
6969

70+
### Google Cloud Storage
71+
72+
Writes one NDJSON object per delivered chunk to your GCS bucket.
73+
74+
- **Bucket** — the bucket name. Must already exist; Sim does not create buckets.
75+
- **Prefix** *(optional)* — folder path inside the bucket. Trailing slash optional.
76+
- **Service account JSON key** — paste the full JSON key for a service account with `storage.objects.create` (and `storage.objects.delete` if you want "Test connection" to clean up its probe). Sim authenticates via OAuth2 service-account JWT and uploads through the GCS JSON API.
77+
78+
Object names follow the same `{prefix}/{source}/{drainId}/{yyyy}/{mm}/{dd}/{runId}-{seq}.ndjson` layout as S3. Object metadata mirrors the S3 destination's `sim-*` keys via `x-goog-meta-*` headers.
79+
80+
### Azure Blob Storage
81+
82+
Writes one NDJSON block blob per delivered chunk to your container.
83+
84+
- **Account name** — your storage account (3–24 lowercase chars).
85+
- **Container** — must already exist; Sim does not create containers.
86+
- **Prefix** *(optional)* — folder path inside the container.
87+
- **Account key** — a storage account access key with write access to the container.
88+
89+
Blob names follow the same `{prefix}/{source}/{drainId}/{yyyy}/{mm}/{dd}/{runId}-{seq}.ndjson` layout. The `sim-*` metadata is exposed as Azure blob metadata (collapsed to lowercase per Azure's identifier rules).
90+
91+
For sovereign clouds, set **Endpoint suffix** to `blob.core.usgovcloudapi.net` (US Gov), `blob.core.chinacloudapi.cn` (China), or `blob.core.cloudapi.de` (Germany).
92+
93+
### Google BigQuery
94+
95+
Streams each row into a target table via the `tabledata.insertAll` API, with per-row insertId dedup.
96+
97+
- **Project ID** — your GCP project (supports domain-scoped IDs like `example.com:my-project`).
98+
- **Dataset ID / Table ID** — must already exist; Sim does not create tables. The table schema must accommodate the source's row shape (one column per top-level field, or a single `JSON`/`STRING` column with the rest as `ignoreUnknownValues`).
99+
- **Service account JSON key** — needs `roles/bigquery.dataEditor` (insert) and `roles/bigquery.metadataViewer` (for the `tables.get` probe used by "Test connection"). Sim authenticates via OAuth2 service-account JWT.
100+
101+
Each row is sent with an `insertId` of `{drainId}-{runId}-{sequence}-{index}`. BigQuery dedupes inserts with the same `insertId` for ~60 seconds, so retries inside that window won't duplicate. If a chunk reports partial failure (`insertErrors`), the run fails with the offending row indices and an outer-driver retry may duplicate rows that already succeeded — the dispatcher's bounded retry minimizes this risk. Enforced per-request limits: 10 MB body, 50,000 rows.
102+
103+
### Snowflake
104+
105+
Inserts each row into a target VARIANT column via the Snowflake SQL API v2 with key-pair JWT auth.
106+
107+
- **Account** — the Snowflake account identifier. Preferred form is `<orgname>-<acctname>` (no dots). Legacy `<locator>.<region>.<cloud>` is also accepted.
108+
- **User / Warehouse / Database / Schema / Table** — must already exist. The user needs `INSERT` privilege on the table and `USAGE` on the warehouse, database, and schema.
109+
- **Column** *(optional)* — target VARIANT column name. Defaults to `DATA` (matches Snowflake's unquoted identifier folding).
110+
- **Role** *(optional)* — Snowflake role to assume.
111+
- **Private key (PEM)** — PKCS8-encoded RSA private key. Register the matching public key on the Snowflake user via `ALTER USER ... SET RSA_PUBLIC_KEY = '...'`.
112+
113+
Each chunk becomes a single `INSERT INTO "DB"."SCHEMA"."TABLE" ("col") VALUES (PARSE_JSON(?)), ...` with one TEXT binding per row. Identifiers are quoted to preserve case. The destination handles Snowflake's async 202-then-poll pattern transparently. Per-row JSON payloads are capped at 16 MB to match Snowflake's VARIANT limit.
114+
115+
### Datadog Logs
116+
117+
POSTs each row as a log entry to Datadog's v2 logs intake.
118+
119+
- **Site** — your Datadog site: `us1`, `us3`, `us5`, `eu1`, `ap1`, `ap2`, or `gov`.
120+
- **Service** *(optional)* — value for the reserved `service` field. Defaults to `sim`.
121+
- **Tags** *(optional)* — comma-separated `ddtags` appended to every entry alongside auto-injected `sim_drain_id:`, `sim_run_id:`, and `sim_source:` tags.
122+
- **API key** — a Datadog API key (not an Application key) with logs-write permission.
123+
124+
Top-level row fields are auto-indexed as Datadog log attributes. The reserved fields `ddsource`, `service`, `ddtags`, and `message` are always set by Sim and override anything in the row. Payloads above 1 KB are gzip-compressed. Enforced limits match Datadog's intake: 5 MB per request (post-compression), 1000 entries per request, 1 MB per entry.
125+
70126
### HTTPS Webhook
71127

72128
POSTs each chunk as NDJSON to your endpoint.

apps/sim/components/icons.tsx

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6849,3 +6849,26 @@ export function HexIcon(props: SVGProps<SVGSVGElement>) {
68496849
</svg>
68506850
)
68516851
}
6852+
6853+
export function BigQueryIcon(props: SVGProps<SVGSVGElement>) {
6854+
return (
6855+
<svg viewBox='0 0 24 24' xmlns='http://www.w3.org/2000/svg' {...props}>
6856+
<path
6857+
fill='#4386FA'
6858+
d='M12 2.5a9.5 9.5 0 1 0 5.81 17.02l3.4 3.4a1 1 0 0 0 1.41-1.42l-3.4-3.4A9.5 9.5 0 0 0 12 2.5Zm0 2a7.5 7.5 0 1 1 0 15 7.5 7.5 0 0 1 0-15Z'
6859+
/>
6860+
<path fill='#4386FA' d='M8 11h1.6v4H8v-4Zm3 -2h1.6v6H11V9Zm3 1.5h1.6V15H14v-3.5Z' />
6861+
</svg>
6862+
)
6863+
}
6864+
6865+
export function SnowflakeIcon(props: SVGProps<SVGSVGElement>) {
6866+
return (
6867+
<svg viewBox='0 0 24 24' xmlns='http://www.w3.org/2000/svg' {...props}>
6868+
<path
6869+
fill='#29B5E8'
6870+
d='M12 2a1 1 0 0 1 1 1v3.59l2.3-2.3a1 1 0 1 1 1.4 1.42L13 9.41V12h2.6l3.7-3.7a1 1 0 0 1 1.4 1.4L18.42 12H22a1 1 0 1 1 0 2h-3.59l2.3 2.3a1 1 0 0 1-1.4 1.4L15.58 14H13v2.59l3.7 3.7a1 1 0 1 1-1.4 1.4L13 19.42V23a1 1 0 1 1-2 0v-3.58l-2.3 2.3a1 1 0 1 1-1.4-1.4l3.7-3.71V14H8.4l-3.7 3.7a1 1 0 0 1-1.4-1.4L5.58 14H2a1 1 0 0 1 0-2h3.59l-2.3-2.3a1 1 0 0 1 1.4-1.4L8.42 12H11V9.41L7.3 5.71a1 1 0 1 1 1.4-1.42l2.3 2.3V3a1 1 0 0 1 1-1Z'
6871+
/>
6872+
</svg>
6873+
)
6874+
}

apps/sim/ee/data-drains/components/data-drains-settings.tsx

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,14 @@ import {
3030
TableRow,
3131
toast,
3232
} from '@/components/emcn'
33-
import { S3Icon } from '@/components/icons'
33+
import {
34+
AzureIcon,
35+
BigQueryIcon,
36+
DatadogIcon,
37+
GoogleIcon,
38+
S3Icon,
39+
SnowflakeIcon,
40+
} from '@/components/icons'
3441
import { Input as BaseInput } from '@/components/ui'
3542
import type { CreateDataDrainBody, DataDrain, DataDrainRun } from '@/lib/api/contracts/data-drains'
3643
import { useSession } from '@/lib/auth/auth-client'
@@ -62,6 +69,11 @@ const SOURCE_LABELS: Record<(typeof SOURCE_TYPES)[number], string> = {
6269

6370
const DESTINATION_LABELS: Record<(typeof DESTINATION_TYPES)[number], string> = {
6471
s3: 'Amazon S3',
72+
gcs: 'Google Cloud Storage',
73+
azure_blob: 'Azure Blob Storage',
74+
datadog: 'Datadog',
75+
bigquery: 'Google BigQuery',
76+
snowflake: 'Snowflake',
6577
webhook: 'HTTPS webhook',
6678
}
6779

@@ -73,8 +85,22 @@ const CADENCE_LABELS: Record<(typeof CADENCE_TYPES)[number], string> = {
7385
const SOURCE_OPTIONS = SOURCE_TYPES.map((t) => ({ value: t, label: SOURCE_LABELS[t] }))
7486
const CADENCE_OPTIONS = CADENCE_TYPES.map((t) => ({ value: t, label: CADENCE_LABELS[t] }))
7587
function getDestinationIcon(type: (typeof DESTINATION_TYPES)[number]) {
76-
if (type !== 's3') return null
77-
return <S3Icon className='size-[14px] flex-shrink-0 text-[#1B660F]' />
88+
switch (type) {
89+
case 's3':
90+
return <S3Icon className='size-[14px] flex-shrink-0 text-[#1B660F]' />
91+
case 'gcs':
92+
return <GoogleIcon className='size-[14px] flex-shrink-0' />
93+
case 'azure_blob':
94+
return <AzureIcon className='size-[14px] flex-shrink-0' />
95+
case 'datadog':
96+
return <DatadogIcon className='size-[14px] flex-shrink-0' />
97+
case 'bigquery':
98+
return <BigQueryIcon className='size-[14px] flex-shrink-0' />
99+
case 'snowflake':
100+
return <SnowflakeIcon className='size-[14px] flex-shrink-0' />
101+
default:
102+
return null
103+
}
78104
}
79105

80106
const DESTINATION_OPTIONS = DESTINATION_TYPES.map((t) => ({

0 commit comments

Comments
 (0)