Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,6 @@ public/*

.dev.vars
.wrangler/

# Local PR drafting notes
PR_59_MESSAGE.md
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,41 @@ curl --location 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump' \
</code>
</pre>

For large databases, use the async export flow to avoid request timeout limits:

<pre>
<code>
curl --location --request POST 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump' \
--header 'Authorization: Bearer ABC123' \
--header 'Content-Type: application/json' \
--data '{"callbackUrl":"https://example.com/webhooks/export-complete"}'
</code>
</pre>

The above returns a `jobId`, `statusUrl`, and (when complete) a `downloadUrl`.
If `callbackUrl` is provided, StarbaseDB will POST completion/failure events and retry with backoff on transient failures.

Poll status:

<pre>
<code>
curl --location 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump/JOB_ID_HERE' \
--header 'Authorization: Bearer ABC123'
</code>
</pre>

Download when complete:

<pre>
<code>
curl --location 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump/JOB_ID_HERE/download' \
--header 'Authorization: Bearer ABC123' \
--output database_dump.sql
</code>
</pre>

If you bind an R2 bucket as `EXPORT_R2_BUCKET`, completed async dumps will be uploaded to R2 and streamed from there on download.

<h3>JSON Data Export</h3>
<pre>
<code>
Expand Down
194 changes: 194 additions & 0 deletions src/do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,66 @@ export class StarbaseDBDurableObject extends DurableObject {
"operator" TEXT DEFAULT '='
)`

const exportJobsStatement = `
CREATE TABLE IF NOT EXISTS tmp_export_jobs (
id TEXT PRIMARY KEY,
status TEXT NOT NULL CHECK(status IN ('processing', 'completed', 'failed')),
error TEXT,
callback_url TEXT,
callback_sent INTEGER NOT NULL DEFAULT 0,
callback_attempts INTEGER NOT NULL DEFAULT 0,
next_callback_retry_at INTEGER,
callback_host TEXT,
artifact_key TEXT,
artifact_provider TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
started_at INTEGER NOT NULL,
completed_at INTEGER,
current_table_index INTEGER NOT NULL DEFAULT 0,
current_offset INTEGER NOT NULL DEFAULT 0,
chunk_index INTEGER NOT NULL DEFAULT 0,
total_tables INTEGER NOT NULL DEFAULT 0
)`

const exportJobTablesStatement = `
CREATE TABLE IF NOT EXISTS tmp_export_job_tables (
job_id TEXT NOT NULL,
table_index INTEGER NOT NULL,
table_name TEXT NOT NULL,
PRIMARY KEY (job_id, table_index)
)`

const exportJobChunksStatement = `
CREATE TABLE IF NOT EXISTS tmp_export_job_chunks (
job_id TEXT NOT NULL,
chunk_index INTEGER NOT NULL,
content TEXT,
PRIMARY KEY (job_id, chunk_index)
)`

this.executeQuery({ sql: cacheStatement })
this.executeQuery({ sql: allowlistStatement })
this.executeQuery({ sql: allowlistRejectedStatement })
this.executeQuery({ sql: rlsStatement })
this.executeQuery({ sql: exportJobsStatement })
this.executeQuery({ sql: exportJobTablesStatement })
this.executeQuery({ sql: exportJobChunksStatement })

this.ensureExportSchemaUpgrades()
}

private ensureExportSchemaUpgrades() {
const upgradeStatements = [
`ALTER TABLE tmp_export_jobs ADD COLUMN callback_attempts INTEGER NOT NULL DEFAULT 0;`,
`ALTER TABLE tmp_export_jobs ADD COLUMN next_callback_retry_at INTEGER;`,
]

for (const sql of upgradeStatements) {
this.executeQuery({ sql }).catch(() => {
// Ignore if the column already exists.
})
}
}

init() {
Expand Down Expand Up @@ -106,6 +162,144 @@ export class StarbaseDBDurableObject extends DurableObject {

async alarm() {
try {
const now = Date.now()

const pendingExportJobs = (await this.executeQuery({
sql: `SELECT id, callback_host
FROM tmp_export_jobs
WHERE status = 'processing' AND callback_host IS NOT NULL
ORDER BY updated_at ASC
LIMIT 1;`,
isRaw: false,
})) as Record<string, SqlStorageValue>[]

if (pendingExportJobs.length) {
const job = pendingExportJobs[0]

try {
await fetch(`${job.callback_host}/export/dump/${job.id}`, {
method: 'GET',
headers: {
Authorization: `Bearer ${this.clientAuthToken}`,
'X-Starbase-Alarm': 'true',
},
})
} catch (error) {
console.error('Failed to continue export via alarm:', error)
}
}

const pendingCallbackJobs = (await this.executeQuery({
sql: `SELECT id, callback_host
FROM tmp_export_jobs
WHERE status IN ('completed', 'failed')
AND callback_url IS NOT NULL
AND callback_sent = 0
AND callback_host IS NOT NULL
AND (
next_callback_retry_at IS NULL
OR next_callback_retry_at <= ?
)
ORDER BY updated_at ASC
LIMIT 1;`,
params: [now],
isRaw: false,
})) as Record<string, SqlStorageValue>[]

if (pendingCallbackJobs.length) {
const callbackJob = pendingCallbackJobs[0]

try {
await fetch(
`${callbackJob.callback_host}/export/dump/${callbackJob.id}`,
{
method: 'GET',
headers: {
Authorization: `Bearer ${this.clientAuthToken}`,
'X-Starbase-Alarm': 'true',
},
}
)
} catch (error) {
console.error(
'Failed to continue callback retry via alarm:',
error
)
}
}

const staleExportJobs = (await this.executeQuery({
sql: `SELECT id
FROM tmp_export_jobs
WHERE status IN ('completed', 'failed')
AND completed_at IS NOT NULL
AND completed_at < ?
LIMIT 50;`,
params: [now - 7 * 24 * 60 * 60 * 1000],
isRaw: false,
})) as Record<string, SqlStorageValue>[]

for (const staleJob of staleExportJobs) {
const jobId = String(staleJob.id)

await this.executeQuery({
sql: `DELETE FROM tmp_export_job_chunks WHERE job_id = ?;`,
params: [jobId],
})
await this.executeQuery({
sql: `DELETE FROM tmp_export_job_tables WHERE job_id = ?;`,
params: [jobId],
})
await this.executeQuery({
sql: `DELETE FROM tmp_export_jobs WHERE id = ?;`,
params: [jobId],
})
}

const remainingExportJobs = (await this.executeQuery({
sql: `SELECT
(
SELECT COUNT(*)
FROM tmp_export_jobs
WHERE status = 'processing'
)
+
(
SELECT COUNT(*)
FROM tmp_export_jobs
WHERE status IN ('completed', 'failed')
AND callback_url IS NOT NULL
AND callback_sent = 0
AND (
next_callback_retry_at IS NULL
OR next_callback_retry_at <= ?
)
) AS count;`,
params: [now],
isRaw: false,
})) as Record<string, SqlStorageValue>[]

const activeExportCount = Number(remainingExportJobs[0]?.count || 0)

if (activeExportCount > 0) {
await this.setAlarm(Date.now() + 2000)
} else {
const nextRetryRow = (await this.executeQuery({
sql: `SELECT MIN(next_callback_retry_at) AS next_retry
FROM tmp_export_jobs
WHERE status IN ('completed', 'failed')
AND callback_url IS NOT NULL
AND callback_sent = 0
AND next_callback_retry_at IS NOT NULL;`,
isRaw: false,
})) as Record<string, SqlStorageValue>[]

const nextRetry = Number(nextRetryRow[0]?.next_retry || 0)
if (nextRetry > 0) {
await this.setAlarm(nextRetry)
}
}

// Fetch all the tasks that are marked to emit an event for this cycle.
const task = (await this.executeQuery({
sql: 'SELECT * FROM tmp_cron_tasks WHERE is_active = 1;',
Expand Down
Loading