Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions plugins/replication/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Replication plugin

Pulls rows from an external Postgres or MySQL source into the StarbaseDB
Durable Object SQLite, on a configurable per-table interval. Watermarks are
persisted in `_starbase_replication_watermarks` so polling is append-only,
and every tick is recorded in `_starbase_replication_log` for observability.

## Why a pull plugin

External data lives in a primary database somewhere far from the edge. This
plugin lets a StarbaseDB instance act as a close-to-edge read replica that
can be queried alongside (or instead of) the primary. It's pull-based so it
works for any source the host can reach over TCP, no per-provider push
infrastructure required.

## Configuration

Single env var: `REPLICATION_CONFIG_JSON`. Example:

```json
[
{
"source": "postgres",
"conn": "postgres://user:pass@host:5432/db",
"intervalSeconds": 300,
"tables": [
{
"name": "users",
"watermark": "updated_at",
"primaryKey": "id"
},
{ "name": "events", "watermark": "id" }
]
},
{
"source": "mysql",
"conn": "mysql://user:pass@host:3306/db",
"intervalSeconds": 60,
"tables": [
{
"name": "audit_log",
"watermark": "ts",
"target": "external_audit_log"
}
]
}
]
```

Field reference:

| Field | Required | Notes |
| --------------------- | -------- | ---------------------------------------------------------------------------------------------------- |
| `source` | yes | `postgres`, `mysql`, or `mock` (tests only). |
| `conn` | yes\* | Connection string. Required for built-in adapters. |
| `intervalSeconds` | yes | Poll cadence for this source. Per-table cadence inherits. |
| `pageSize` | no | Rows per round-trip (default 1000). |
| `tables[].name` | yes | Source table name. Postgres accepts `schema.table` (defaults to `public`). |
| `tables[].watermark` | yes | Column compared with `>` to pull only new rows. Must be monotonically non-decreasing for that table. |
| `tables[].primaryKey` | no | Column or array of columns. Enables `INSERT OR REPLACE` for upserts. If omitted, append-only. |
| `tables[].target` | no | Override the SQLite table name. Defaults to `name`. |

## Wiring

1. Ensure `[triggers]` is enabled in `wrangler.toml` with at least the
smallest interval you want (e.g. `crons = ["* * * * *"]`).
2. Set `REPLICATION_CONFIG_JSON` in `wrangler.toml`'s `[vars]` block (or as a
secret).
3. Deploy. The plugin self-registers via `src/index.ts` and the
`scheduled()` handler.

The plugin already participates in the `fetch()` plugin chain so admin
operators can:

- `POST /replication/run` — manually fire all due tables (admin token)
- `GET /replication/status` — read current watermarks (admin token)

That is the entire HTTP surface. There is no admin CRUD API, no web UI,
and no mutable runtime config — everything else is a `SELECT` against the
two replication tables, runnable through the normal `/query` endpoint.

## Failure semantics

If pulling one table errors, the watermark is **not** advanced for that
table, the failure is recorded in `_starbase_replication_log`, and other
tables in the same tick still run. The next tick re-attempts from the prior
watermark.

## Plugging in custom adapters

Pass `adapterFactory` to the plugin constructor to ship your own adapter
(e.g. SQL Server, ClickHouse, REST). It must implement
`ReplicationAdapter` from `plugins/replication/types.ts`.
110 changes: 110 additions & 0 deletions plugins/replication/adapters/mysql.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { createConnection, type Connection } from 'mysql2/promise'
import type {
ColumnDef,
PullPage,
ReplicationAdapter,
SqlScalar,
} from '../types'

function mysqlTypeToSqlite(t: string): ColumnDef['sqliteType'] {
const tt = t.toLowerCase()
if (
tt.includes('int') ||
tt === 'bit' ||
tt === 'bool' ||
tt === 'boolean'
) {
return 'INTEGER'
}
if (
tt.includes('decimal') ||
tt.includes('numeric') ||
tt.includes('float') ||
tt.includes('double')
) {
return 'REAL'
}
if (tt.includes('blob') || tt.includes('binary')) return 'BLOB'
return 'TEXT'
}

export class MysqlAdapter implements ReplicationAdapter {
private connPromise: Promise<Connection>

constructor(connectionString: string) {
this.connPromise = createConnection(connectionString)
}

private async conn() {
return this.connPromise
}

async describe(table: string): Promise<ColumnDef[]> {
const conn = await this.conn()
const [rows] = (await conn.query(
`SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY
FROM information_schema.COLUMNS
WHERE TABLE_NAME = ?
AND TABLE_SCHEMA = DATABASE()
ORDER BY ORDINAL_POSITION`,
[table]
)) as [
{
COLUMN_NAME: string
DATA_TYPE: string
COLUMN_KEY: string
}[],
unknown,
]

if (rows.length === 0) {
throw new Error(
`replication: table "${table}" not found in source database`
)
}

return rows.map((r) => ({
name: r.COLUMN_NAME,
sqliteType: mysqlTypeToSqlite(r.DATA_TYPE),
primaryKey: r.COLUMN_KEY === 'PRI',
}))
}

async *pull(opts: {
table: string
watermarkColumn: string
watermark: SqlScalar | null
pageSize: number
}): AsyncIterable<PullPage> {
const conn = await this.conn()
const tbl = `\`${opts.table.replace(/`/g, '``')}\``
const wm = `\`${opts.watermarkColumn.replace(/`/g, '``')}\``

let cursor = opts.watermark
// eslint-disable-next-line no-constant-condition
while (true) {
const sql =
cursor === null
? `SELECT * FROM ${tbl} ORDER BY ${wm} ASC LIMIT ${Math.floor(opts.pageSize)}`
: `SELECT * FROM ${tbl} WHERE ${wm} > ? ORDER BY ${wm} ASC LIMIT ${Math.floor(opts.pageSize)}`

const [rows] = (await conn.query(
sql,
cursor === null ? [] : [cursor]
)) as [Record<string, SqlScalar>[], unknown]

if (rows.length === 0) return

const last = rows[rows.length - 1][opts.watermarkColumn] ?? null
yield { rows, nextWatermark: last }
cursor = last

if (rows.length < opts.pageSize) return
}
}

async close(): Promise<void> {
const conn = await this.conn()
await conn.end()
}
}
133 changes: 133 additions & 0 deletions plugins/replication/adapters/postgres.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import postgres from 'postgres'
import type {
ColumnDef,
PullPage,
ReplicationAdapter,
SqlScalar,
} from '../types'

/**
* Map a Postgres type name (information_schema.columns.data_type) to a
* SQLite affinity. Anything we don't explicitly recognise becomes TEXT,
* which is the safe choice for SQLite (it stores all values as TEXT under
* the TEXT affinity rule).
*/
function pgTypeToSqlite(pgType: string): ColumnDef['sqliteType'] {
const t = pgType.toLowerCase()
if (
t.includes('int') ||
t === 'bigserial' ||
t === 'serial' ||
t === 'smallserial'
) {
return 'INTEGER'
}
if (
t.includes('numeric') ||
t.includes('decimal') ||
t.includes('real') ||
t.includes('double') ||
t === 'money'
) {
return 'REAL'
}
if (t === 'boolean') return 'INTEGER'
if (t === 'bytea') return 'BLOB'
return 'TEXT'
}

export class PostgresAdapter implements ReplicationAdapter {
private client: ReturnType<typeof postgres>

constructor(connectionString: string) {
// Keep the pool small — replication runs on Cloudflare Workers where
// long-lived connections are an anti-pattern. fetch_types=false skips
// a startup query that doesn't work with PgBouncer/Hyperdrive style
// poolers.
this.client = postgres(connectionString, {
max: 2,
fetch_types: false,
})
}

async describe(table: string): Promise<ColumnDef[]> {
const parts = parseQualified(table)
const rows = await this.client<
{ column_name: string; data_type: string }[]
>`
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = ${parts.schema}
AND table_name = ${parts.name}
ORDER BY ordinal_position
`

if (rows.length === 0) {
throw new Error(
`replication: table "${table}" not found in source (schema=${parts.schema})`
)
}

const pkRows = await this.client<{ column_name: string }[]>`
SELECT a.attname AS column_name
FROM pg_index i
JOIN pg_attribute a
ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = ${`${parts.schema}.${parts.name}`}::regclass
AND i.indisprimary
`
const pkSet = new Set(pkRows.map((r) => r.column_name))

return rows.map((r) => ({
name: r.column_name,
sqliteType: pgTypeToSqlite(r.data_type),
primaryKey: pkSet.has(r.column_name),
}))
}

async *pull(opts: {
table: string
watermarkColumn: string
watermark: SqlScalar | null
pageSize: number
}): AsyncIterable<PullPage> {
const parts = parseQualified(opts.table)
const qualified = `"${parts.schema}"."${parts.name}"`
const wmCol = `"${opts.watermarkColumn.replace(/"/g, '""')}"`

let cursor = opts.watermark
// Loop pages until a short page is returned. Bounded by `pageSize`
// and the natural end of the table; the `> cursor` predicate makes it
// strictly progressing.
// eslint-disable-next-line no-constant-condition
while (true) {
const rows: Record<string, SqlScalar>[] =
cursor === null
? await this.client.unsafe(
`SELECT * FROM ${qualified} ORDER BY ${wmCol} ASC LIMIT ${opts.pageSize}`
)
: await this.client.unsafe(
`SELECT * FROM ${qualified} WHERE ${wmCol} > $1 ORDER BY ${wmCol} ASC LIMIT ${opts.pageSize}`,
[cursor as any]
)

if (rows.length === 0) return

const last = rows[rows.length - 1][opts.watermarkColumn] ?? null
yield { rows, nextWatermark: last }
cursor = last

if (rows.length < opts.pageSize) return
}
}

async close(): Promise<void> {
await this.client.end({ timeout: 5 })
}
}

function parseQualified(table: string): { schema: string; name: string } {
const idx = table.indexOf('.')
if (idx === -1) return { schema: 'public', name: table }
return { schema: table.slice(0, idx), name: table.slice(idx + 1) }
}
Loading