Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<li><strong>Row Level Security (RLS)</strong> template for preventing data access on unauthorized rows</li>
<li><strong>Point in Time Rollbacks</strong> for rolling back your database to any minute in the past 30 days</li>
<li><strong>Data Replication</strong> to scale reads beyond the 1,000 RPS limitation</li>
<li><strong>Data Syncing</strong> between local source and your database</li>
<li><strong><a href="./plugins/data-sync/README.md">Data Syncing</a></strong> — pull external Postgres/MySQL into internal SQLite (Issue #72)</li>
<li><strong>Scheduled CRON Tasks</strong> to execute code at desired intervals</li>
</ul>

Expand Down
77 changes: 77 additions & 0 deletions plugins/data-sync/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Data Sync Plugin

Implements **Issue #72**: incremental pull sync from an external relational database into StarbaseDB’s internal SQLite (Durable Object).

## Features

- **Pull sync** with cursor by monotonic `id` or `updated_at`-style timestamp
- **Batching** + pagination on the upstream query
- **Idempotent writes** via SQLite `INSERT … ON CONFLICT DO UPDATE`
- **Metadata** in `tmp_data_sync_meta`, **logs** in `tmp_data_sync_log`
- **Retries** with exponential backoff on read failures
- **Admin-only HTTP API** for status, manual sync, and debug probe
- **PostgreSQL** (TCP via `pg` / Outerbase SDK) and **Hyperdrive** (`postgres` package); **MySQL** uses the same job format with MySQL-specific paging SQL

## Configuration

Cloudflare Workers do not receive arbitrary TOML tables as `env`. Use **`[vars]`** (and **secrets** for passwords).

Conceptually this matches a `[plugins.data-sync]` block like:

```toml
# Not loaded automatically — document your intent; mirror with [vars] below.
# [plugins.data-sync]
# sync_interval = 300
# tables = ["users", "products"]
```

### `wrangler.toml` example

```toml
[vars]
DATA_SYNC_ENABLED = "true"
DATA_SYNC_INTERVAL_SECONDS = "300"
# JSON array of job objects (escape quotes in TOML or use wrangler secret / dashboard)
DATA_SYNC_JOBS = """[{"externalTable":"public.users","localTable":"users","cursorKind":"incremental_id","cursorColumn":"id","pkColumns":["id"]}]"""
DATA_SYNC_BATCH_SIZE = "250"
DATA_SYNC_MAX_RETRIES = "3"
```

Also set the existing Starbase **external DB** variables (`EXTERNAL_DB_TYPE`, `EXTERNAL_DB_HOST`, etc.) or **Hyperdrive** so `dataSource.external` is populated.

### Job object

| Field | Description |
| --------------- | ---------------------------------------------------------------------------------------------- |
| `externalTable` | Upstream table, e.g. `public.users` |
| `localTable` | SQLite table in the DO (must already exist with compatible columns + PK/UNIQUE on `pkColumns`) |
| `cursorKind` | `incremental_id` or `timestamp` |
| `cursorColumn` | Column for paging (`id`, `updated_at`, …) |
| `pkColumns` | Primary key columns for upsert |
| `columnMap` | Optional map `{ "external_col": "sqlite_col" }` |

## HTTP API (Bearer **admin** token)

| Method | Path | Description |
| ------ | ------------------------ | ----------------------------------------------------------------------------------------- |
| `GET` | `/data-sync/sync-status` | Metadata rows + recent log |
| `POST` | `/data-sync/sync-data` | Run sync. Optional JSON body: `{ "tables": ["users"] }` to filter by **local** table name |
| `GET` | `/data-sync/debug` | Redacted external config + `SELECT 1` probe |

## Scheduled sync (CRON)

`DATA_SYNC_INTERVAL_SECONDS` is **informational** only. Schedule sync with:

1. **Cloudflare Workers Cron Triggers** — add a cron in `wrangler.toml` and `fetch` your worker with a route that triggers `POST /data-sync/sync-data` (same host) using the admin token, **or**
2. **`CronPlugin`** — in `src/index.ts`, register a task that performs an HTTP callback to `/data-sync/sync-data`, **or**
3. External scheduler (GitHub Actions, etc.) calling the same endpoint.

## Local demo

See `example/README.md` and `example/docker-compose.yml`.

## Edge notes

- Keep **batch sizes** modest (default 250, max 1000) to respect CPU/time limits.
- **Hyperdrive** is recommended for Postgres from Workers in production.
- Ensure the SQLite side has a **UNIQUE** or **PRIMARY KEY** constraint matching `pkColumns` so `ON CONFLICT` works.
110 changes: 110 additions & 0 deletions plugins/data-sync/adapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/**
* External database read adapter — PostgreSQL / Hyperdrive first; pluggable interface.
* Writes always go to SQLite via Durable Object RPC (not through this adapter).
*/
import postgres from 'postgres'
import type { DataSource, ExternalDatabaseSource } from '../../src/types'
import type { StarbaseDBConfiguration } from '../../src/handler'
import { executeSDKQuery } from '../../src/operation'

export interface ExternalReadAdapter {
/** Run a read-only statement on the external database; returns row objects */
query<T extends Record<string, unknown> = Record<string, unknown>>(
sql: string,
params?: unknown[]
): Promise<T[]>
}

function isHyperdrivePostgres(
ext: ExternalDatabaseSource | undefined
): ext is Extract<ExternalDatabaseSource, { connectionString: string }> {
return (
!!ext &&
ext.dialect === 'postgresql' &&
'connectionString' in ext &&
!!ext.connectionString
)
}

function isHostPostgres(
ext: ExternalDatabaseSource | undefined
): ext is Extract<
ExternalDatabaseSource,
{ dialect: 'postgresql'; host: string }
> {
return !!ext && ext.dialect === 'postgresql' && 'host' in ext && !!ext.host
}

/**
* Create a reader for the external DB configured on `dataSource.external`.
* Reuses StarbaseDB's existing drivers (pg via SDK) or `postgres` for Hyperdrive.
*/
export function createExternalReadAdapter(
dataSource: DataSource,
config: StarbaseDBConfiguration,
ctx?: ExecutionContext
): ExternalReadAdapter | null {
const ext = dataSource.external
if (!ext) return null

if (isHyperdrivePostgres(ext)) {
return {
async query(sql, params = []) {
const sqlConn = postgres(ext.connectionString, {
max: 1,
fetch_types: false,
})
try {
return (await sqlConn.unsafe(
sql,
params as never[]
)) as Record<string, unknown>[]
} finally {
if (ctx) ctx.waitUntil(sqlConn.end())
else await sqlConn.end()
}
},
}
}

if (isHostPostgres(ext) || ext.dialect === 'mysql') {
const readSource: DataSource = {
...dataSource,
source: 'external',
external: ext,
}
return {
async query(sql, params = []) {
const rows = await executeSDKQuery({
sql,
params,
dataSource: readSource,
config,
})
return (Array.isArray(rows) ? rows : []) as Record<
string,
unknown
>[]
},
}
}

return null
}

/** Quote a PostgreSQL identifier (schema/table/column) — validates simple names */
export function quotePgIdent(ident: string): string {
const parts = ident
.split('.')
.map((p) => p.trim())
.filter(Boolean)
const safe = /^[a-zA-Z_][a-zA-Z0-9_]*$/
for (const p of parts) {
if (!safe.test(p)) {
throw new Error(
`[data-sync] Invalid PostgreSQL identifier segment: ${p}`
)
}
}
return parts.map((p) => `"${p.replace(/"/g, '""')}"`).join('.')
}
32 changes: 32 additions & 0 deletions plugins/data-sync/config.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { describe, expect, it } from 'vitest'
import { loadDataSyncConfig } from './config'

describe('loadDataSyncConfig', () => {
it('parses jobs JSON', () => {
const jobs = [
{
externalTable: 'public.users',
localTable: 'users',
cursorKind: 'incremental_id',
cursorColumn: 'id',
pkColumns: ['id'],
},
]
const c = loadDataSyncConfig({
DATA_SYNC_ENABLED: 'true',
DATA_SYNC_JOBS: JSON.stringify(jobs),
DATA_SYNC_BATCH_SIZE: '100',
})
expect(c.enabled).toBe(true)
expect(c.jobs).toHaveLength(1)
expect(c.jobs[0].localTable).toBe('users')
expect(c.batchSize).toBe(100)
})

it('defaults when env empty', () => {
const c = loadDataSyncConfig({})
expect(c.enabled).toBe(false)
expect(c.jobs).toEqual([])
expect(c.syncIntervalSeconds).toBe(300)
})
})
119 changes: 119 additions & 0 deletions plugins/data-sync/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import type { DataSyncPluginConfig, TableSyncJob } from './types'

/** Environment / wrangler vars consumed by the data-sync plugin */
export interface DataSyncEnv {
DATA_SYNC_ENABLED?: string
/** Seconds — informational; use Worker CRON or CronPlugin to invoke sync */
DATA_SYNC_INTERVAL_SECONDS?: string
/**
* JSON array of TableSyncJob objects.
* Example:
* [{"externalTable":"public.users","localTable":"users","cursorKind":"incremental_id","cursorColumn":"id","pkColumns":["id"]}]
*/
DATA_SYNC_JOBS?: string
DATA_SYNC_BATCH_SIZE?: string
DATA_SYNC_MAX_RETRIES?: string
DATA_SYNC_RETRY_BASE_MS?: string
}

const DEFAULT_BATCH = 250
const DEFAULT_RETRIES = 3
const DEFAULT_RETRY_BASE_MS = 400

function truthy(v: string | undefined): boolean {
if (!v) return false
const s = v.toLowerCase().trim()
return s === '1' || s === 'true' || s === 'yes' || s === 'on'
}

function parseJobsJson(raw: string | undefined): TableSyncJob[] {
if (!raw?.trim()) return []
try {
const parsed = JSON.parse(raw) as unknown
if (!Array.isArray(parsed)) {
console.error('[data-sync] DATA_SYNC_JOBS must be a JSON array')
return []
}
const jobs: TableSyncJob[] = []
for (const item of parsed) {
if (!item || typeof item !== 'object') continue
const j = item as Record<string, unknown>
const externalTable = String(j.externalTable ?? '')
const localTable = String(
j.localTable ?? externalTable.split('.').pop() ?? ''
)
const cursorKind =
j.cursorKind === 'timestamp' ? 'timestamp' : 'incremental_id'
const cursorColumn = String(
j.cursorColumn ??
(cursorKind === 'timestamp' ? 'updated_at' : 'id')
)
const pkColumns = Array.isArray(j.pkColumns)
? j.pkColumns.map((x) => String(x))
: j.pkColumns
? [String(j.pkColumns)]
: ['id']
const columnMap =
j.columnMap && typeof j.columnMap === 'object'
? (j.columnMap as Record<string, string>)
: undefined
if (!externalTable || !localTable) {
console.warn(
'[data-sync] Skipping job with missing externalTable/localTable',
item
)
continue
}
jobs.push({
externalTable,
localTable,
cursorKind,
cursorColumn,
pkColumns,
columnMap,
})
}
return jobs
} catch (e) {
console.error('[data-sync] Failed to parse DATA_SYNC_JOBS:', e)
return []
}
}

/**
* Resolve plugin configuration from Worker environment variables.
* (Wrangler does not pass arbitrary TOML tables into `env`; use `vars` or secrets.)
*/
export function loadDataSyncConfig(env: DataSyncEnv): DataSyncPluginConfig {
const interval = Number(env.DATA_SYNC_INTERVAL_SECONDS ?? '300')
return {
enabled: truthy(env.DATA_SYNC_ENABLED),
syncIntervalSeconds:
Number.isFinite(interval) && interval > 0 ? interval : 300,
jobs: parseJobsJson(env.DATA_SYNC_JOBS),
batchSize: Math.min(
1000,
Math.max(
1,
Number(env.DATA_SYNC_BATCH_SIZE ?? DEFAULT_BATCH) ||
DEFAULT_BATCH
)
),
maxRetries: Math.min(
10,
Math.max(
0,
Number(env.DATA_SYNC_MAX_RETRIES ?? DEFAULT_RETRIES) ||
DEFAULT_RETRIES
)
),
retryBaseMs: Math.min(
10_000,
Math.max(
50,
Number(env.DATA_SYNC_RETRY_BASE_MS ?? DEFAULT_RETRY_BASE_MS) ||
DEFAULT_RETRY_BASE_MS
)
),
}
}
Loading