Skip to content

Commit 3cc34f0

Browse files
db: Add dual-mode TursoDb (local/sync) and wire sce sync push|pull
Extend TursoDb<M> to support Turso Cloud sync mode via turso::sync::Builder::new_remote() when both SCE_SYNC_URL and SCE_SYNC_TOKEN are set. Sync-mode methods (push/pull/checkpoint/stats) are explicit — no auto-push from execute(). Wire a user-invocable sce sync push|pull CLI command behind the new sync module (cli/src/services/sync/), connected through clap schema, command registry, and parse runtime dispatch. Remove the unused SYNC_PUSH_THRESHOLD_ENV_KEY constant and write-counter auto-push approach from the earlier design. Co-authored-by: SCE <sce@crocoder.dev>
1 parent 3c5cb71 commit 3cc34f0

14 files changed

Lines changed: 295 additions & 45 deletions

File tree

cli/src/cli_schema.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ pub const VERSION_CLAP_ABOUT: &str = "Print deterministic runtime version metada
3535
pub const VERSION_TOP_LEVEL_PURPOSE: &str = "Print deterministic runtime version metadata";
3636
pub const VERSION_SHOW_IN_TOP_LEVEL_HELP: bool = true;
3737

38+
pub const SYNC_CLAP_ABOUT: &str = "Push or pull changes to/from Turso Cloud";
39+
pub const SYNC_TOP_LEVEL_PURPOSE: &str = "Sync local database with Turso Cloud (push/pull)";
40+
pub const SYNC_SHOW_IN_TOP_LEVEL_HELP: bool = true;
41+
3842
pub const COMPLETION_CLAP_ABOUT: &str = "Generate deterministic shell completion scripts";
3943
pub const COMPLETION_TOP_LEVEL_PURPOSE: &str = "Generate deterministic shell completion scripts";
4044
pub const COMPLETION_SHOW_IN_TOP_LEVEL_HELP: bool = true;
@@ -75,6 +79,11 @@ pub const TOP_LEVEL_COMMANDS: &[TopLevelCommandMetadata] = &[
7579
purpose: COMPLETION_TOP_LEVEL_PURPOSE,
7680
show_in_top_level_help: COMPLETION_SHOW_IN_TOP_LEVEL_HELP,
7781
},
82+
TopLevelCommandMetadata {
83+
name: crate::services::sync::NAME,
84+
purpose: SYNC_TOP_LEVEL_PURPOSE,
85+
show_in_top_level_help: SYNC_SHOW_IN_TOP_LEVEL_HELP,
86+
},
7887
];
7988

8089
#[derive(Parser, Debug)]
@@ -190,6 +199,12 @@ pub enum Commands {
190199
#[arg(long, value_enum)]
191200
shell: CompletionShell,
192201
},
202+
203+
#[command(about = SYNC_CLAP_ABOUT, hide = !SYNC_SHOW_IN_TOP_LEVEL_HELP)]
204+
Sync {
205+
#[command(subcommand)]
206+
subcommand: SyncSubcommand,
207+
},
193208
}
194209

195210
#[derive(Subcommand, Debug, Clone, PartialEq, Eq)]
@@ -273,6 +288,15 @@ pub enum HooksSubcommand {
273288
DiffTrace,
274289
}
275290

291+
#[derive(Subcommand, Debug, Clone, PartialEq, Eq)]
292+
pub enum SyncSubcommand {
293+
#[command(about = "Push local changes to Turso Cloud")]
294+
Push,
295+
296+
#[command(about = "Pull remote changes from Turso Cloud")]
297+
Pull,
298+
}
299+
276300
#[derive(ValueEnum, Clone, Copy, Debug, Default, PartialEq, Eq)]
277301
pub enum OutputFormat {
278302
#[default]

cli/src/services/command_registry.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ pub fn build_default_registry() -> CommandRegistry {
104104
"completion",
105105
crate::services::completion::command::make_completion_command,
106106
);
107+
registry.register("sync", crate::services::sync::command::make_sync_command);
107108
registry
108109
}
109110

cli/src/services/config/mod.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,10 @@ pub(crate) const ENV_ATTRIBUTION_HOOKS_ENABLED: &str = "SCE_ATTRIBUTION_HOOKS_EN
4444
// Sync (Turso Cloud) env var keys.
4545
// When SCE_SYNC_URL and SCE_SYNC_TOKEN are both set, TursoDb<M> opens in sync
4646
// (remote-backed) mode. When either is absent, local-only mode is used.
47-
// SCE_SYNC_PUSH_THRESHOLD controls how many writes accumulate before an
48-
// automatic push (defaults to 10 when unset).
4947
#[allow(dead_code)]
5048
pub(crate) const SYNC_URL_ENV_KEY: &str = "SCE_SYNC_URL";
5149
#[allow(dead_code)]
5250
pub(crate) const SYNC_TOKEN_ENV_KEY: &str = "SCE_SYNC_TOKEN";
53-
#[allow(dead_code)]
54-
pub(crate) const SYNC_PUSH_THRESHOLD_ENV_KEY: &str = "SCE_SYNC_PUSH_THRESHOLD";
5551

5652
const WORKOS_CLIENT_ID_ENV: &str = "WORKOS_CLIENT_ID";
5753
const WORKOS_CLIENT_ID_BAKED_DEFAULT: &str = "client_sce_default";

cli/src/services/db/mod.rs

Lines changed: 136 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,20 @@ fn sentence_case(value: &str) -> String {
139139
/// Wraps a Turso connection with a tokio current-thread runtime so callers can
140140
/// use synchronous `execute`/`query` methods while the underlying Turso API
141141
/// remains async.
142+
///
143+
/// Supports two modes:
144+
/// - **Local mode** (default): opens a plain Turso file database.
145+
/// - **Sync mode**: opens a Turso Cloud synced database when both
146+
/// `SCE_SYNC_URL` and `SCE_SYNC_TOKEN` environment variables are set.
147+
/// Sync operations (`push`, `pull`, `checkpoint`, `stats`) are available
148+
/// through explicit methods or the `sce sync` CLI command.
142149
#[allow(dead_code)]
143150
pub struct TursoDb<M: DbSpec> {
144151
conn: turso::Connection,
145152
runtime: tokio::runtime::Runtime,
153+
/// Optional sync database handle for Turso Cloud operations (push, pull,
154+
/// checkpoint, stats). `None` when the database is in local-only mode.
155+
sync_db: Option<turso::sync::Database>,
146156
spec: PhantomData<fn() -> M>,
147157
}
148158

@@ -152,6 +162,9 @@ impl<M: DbSpec> TursoDb<M> {
152162
///
153163
/// Parent directories are created automatically. Migrations are run after
154164
/// the database connection is established.
165+
///
166+
/// When both `SCE_SYNC_URL` and `SCE_SYNC_TOKEN` are set, the database opens
167+
/// in sync (Turso Cloud) mode. Otherwise, local-only mode is used.
155168
pub fn new() -> Result<Self> {
156169
let db_name = M::db_name();
157170
let db_path = M::db_path().with_context(|| format!("failed to resolve {db_name} path"))?;
@@ -173,33 +186,70 @@ impl<M: DbSpec> TursoDb<M> {
173186
format!("failed to create {db_name} tokio runtime. Try: rerun the command; if the issue persists, verify the local Tokio runtime environment.")
174187
})?;
175188

176-
let conn = runtime.block_on(async {
177-
let path_str = db_path.to_str().ok_or_else(|| {
178-
anyhow::anyhow!("invalid UTF-8 in database path: {}", db_path.display())
179-
})?;
180-
let db = turso::Builder::new_local(path_str)
181-
.build()
182-
.await
183-
.map_err(|e| {
184-
anyhow::anyhow!(
185-
"failed to open {db_name} database at {}: {e}",
186-
db_path.display()
187-
)
188-
})?;
189-
db.connect()
190-
.map_err(|e| anyhow::anyhow!("failed to connect to {db_name} database: {e}"))
189+
let sync_url = std::env::var(crate::services::config::SYNC_URL_ENV_KEY).ok();
190+
let sync_token = std::env::var(crate::services::config::SYNC_TOKEN_ENV_KEY).ok();
191+
192+
let path_str = db_path.to_str().ok_or_else(|| {
193+
anyhow::anyhow!("invalid UTF-8 in database path: {}", db_path.display())
191194
})?;
192195

193-
let db = Self {
194-
conn,
195-
runtime,
196-
spec: PhantomData,
197-
};
196+
if let (Some(url), Some(token)) = (sync_url, sync_token) {
197+
let (conn, sync_db) = runtime.block_on(async {
198+
let sync_db = turso::sync::Builder::new_remote(path_str)
199+
.with_remote_url(url)
200+
.with_auth_token(token)
201+
.build()
202+
.await
203+
.map_err(|e| {
204+
anyhow::anyhow!(
205+
"failed to open {db_name} synced database at {}: {e}",
206+
db_path.display()
207+
)
208+
})?;
209+
let conn = sync_db.connect().await.map_err(|e| {
210+
anyhow::anyhow!("failed to connect to {db_name} synced database: {e}")
211+
})?;
212+
Ok::<_, anyhow::Error>((conn, sync_db))
213+
})?;
214+
215+
let db = Self {
216+
conn,
217+
runtime,
218+
sync_db: Some(sync_db),
219+
spec: PhantomData,
220+
};
221+
222+
db.run_migrations()
223+
.with_context(|| format!("failed to run {db_name} migrations"))?;
224+
225+
Ok(db)
226+
} else {
227+
let conn = runtime.block_on(async {
228+
let db = turso::Builder::new_local(path_str)
229+
.build()
230+
.await
231+
.map_err(|e| {
232+
anyhow::anyhow!(
233+
"failed to open {db_name} database at {}: {e}",
234+
db_path.display()
235+
)
236+
})?;
237+
db.connect()
238+
.map_err(|e| anyhow::anyhow!("failed to connect to {db_name} database: {e}"))
239+
})?;
198240

199-
db.run_migrations()
200-
.with_context(|| format!("failed to run {db_name} migrations"))?;
241+
let db = Self {
242+
conn,
243+
runtime,
244+
sync_db: None,
245+
spec: PhantomData,
246+
};
201247

202-
Ok(db)
248+
db.run_migrations()
249+
.with_context(|| format!("failed to run {db_name} migrations"))?;
250+
251+
Ok(db)
252+
}
203253
}
204254

205255
/// Execute a SQL statement that does not return rows.
@@ -210,6 +260,10 @@ impl<M: DbSpec> TursoDb<M> {
210260
///
211261
/// # Returns
212262
/// Number of rows affected.
263+
///
264+
/// Sync is not triggered automatically — callers use the explicit `push()`
265+
/// or `pull()` methods (or the `sce sync` CLI command) to sync with the
266+
/// remote.
213267
pub fn execute(&self, sql: &str, params: impl turso::params::IntoParams) -> Result<u64> {
214268
self.runtime.block_on(async {
215269
self.conn
@@ -219,6 +273,65 @@ impl<M: DbSpec> TursoDb<M> {
219273
})
220274
}
221275

276+
/// Push local changes to the remote (sync mode only).
277+
///
278+
/// In local mode this is a no-op that returns `Ok(())`.
279+
pub fn push(&self) -> Result<()> {
280+
match self.sync_db {
281+
Some(ref sync_db) => self
282+
.runtime
283+
.block_on(sync_db.push())
284+
.map_err(|e| anyhow::anyhow!("{} sync push failed: {e}", M::db_name())),
285+
None => Ok(()),
286+
}
287+
}
288+
289+
/// Pull remote changes (sync mode only).
290+
///
291+
/// Returns `true` if any changes were applied to the local database.
292+
/// In local mode this is a no-op that returns `Ok(false)`.
293+
pub fn pull(&self) -> Result<bool> {
294+
match self.sync_db {
295+
Some(ref sync_db) => self
296+
.runtime
297+
.block_on(sync_db.pull())
298+
.map_err(|e| anyhow::anyhow!("{} sync pull failed: {e}", M::db_name())),
299+
None => Ok(false),
300+
}
301+
}
302+
303+
/// Force a WAL checkpoint (sync mode only).
304+
///
305+
/// In local mode this is a no-op that returns `Ok(())`.
306+
pub fn checkpoint(&self) -> Result<()> {
307+
match self.sync_db {
308+
Some(ref sync_db) => self
309+
.runtime
310+
.block_on(sync_db.checkpoint())
311+
.map_err(|e| anyhow::anyhow!("{} sync checkpoint failed: {e}", M::db_name())),
312+
None => Ok(()),
313+
}
314+
}
315+
316+
/// Retrieve sync statistics (sync mode only).
317+
///
318+
/// Returns `None` in local mode.
319+
pub fn stats(&self) -> Result<Option<turso::sync::DatabaseSyncStats>> {
320+
match self.sync_db {
321+
Some(ref sync_db) => self
322+
.runtime
323+
.block_on(sync_db.stats())
324+
.map(Some)
325+
.map_err(|e| anyhow::anyhow!("{} sync stats failed: {e}", M::db_name())),
326+
None => Ok(None),
327+
}
328+
}
329+
330+
/// Returns `true` if the database is in sync (Turso Cloud) mode.
331+
pub fn is_sync_mode(&self) -> bool {
332+
self.sync_db.is_some()
333+
}
334+
222335
/// Execute a SQL query that returns rows.
223336
///
224337
/// # Arguments

cli/src/services/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ pub mod resilience;
2323
pub mod security;
2424
pub mod setup;
2525
pub mod style;
26+
pub mod sync;
2627
pub mod token_storage;
2728
pub mod version;

cli/src/services/parse/command_runtime.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ fn render_missing_subcommand_help(args: &[String]) -> Option<RuntimeCommandHandl
139139
name: services::config::NAME.to_string(),
140140
text: cli_schema::render_help_for_path(&[services::config::NAME])?,
141141
})),
142+
services::sync::NAME => Some(Box::new(services::help::command::HelpTextCommand {
143+
name: services::sync::NAME.to_string(),
144+
text: cli_schema::render_help_for_path(&[services::sync::NAME])?,
145+
})),
142146
_ => None,
143147
}
144148
}
@@ -244,9 +248,24 @@ fn convert_clap_command(
244248
},
245249
}))
246250
}
251+
cli_schema::Commands::Sync { ref subcommand } => convert_sync_subcommand(subcommand),
247252
}
248253
}
249254

255+
#[allow(clippy::unnecessary_wraps)]
256+
fn convert_sync_subcommand(
257+
subcommand: &cli_schema::SyncSubcommand,
258+
) -> Result<RuntimeCommandHandle, ClassifiedError> {
259+
let subcommand = match subcommand {
260+
cli_schema::SyncSubcommand::Push => services::sync::SyncSubcommand::Push,
261+
cli_schema::SyncSubcommand::Pull => services::sync::SyncSubcommand::Pull,
262+
};
263+
264+
Ok(Box::new(services::sync::command::SyncCommand {
265+
subcommand,
266+
}))
267+
}
268+
250269
#[allow(clippy::unnecessary_wraps, clippy::needless_pass_by_value)]
251270
fn convert_auth_subcommand(
252271
subcommand: cli_schema::AuthSubcommand,

cli/src/services/sync/command.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use std::borrow::Cow;
2+
3+
use crate::app::AppContext;
4+
use crate::services::command_registry::{RuntimeCommand, RuntimeCommandHandle};
5+
use crate::services::error::ClassifiedError;
6+
use crate::services::sync;
7+
8+
pub struct SyncCommand {
9+
pub subcommand: sync::SyncSubcommand,
10+
}
11+
12+
impl RuntimeCommand for SyncCommand {
13+
fn name(&self) -> Cow<'_, str> {
14+
Cow::Borrowed(sync::NAME)
15+
}
16+
17+
fn execute(&self, _context: &AppContext) -> Result<String, ClassifiedError> {
18+
sync::run_sync(self.subcommand).map_err(|error| ClassifiedError::runtime(error.to_string()))
19+
}
20+
}
21+
22+
/// Construct a `SyncCommand` with the Push subcommand (used by the registry).
23+
#[allow(dead_code)]
24+
pub fn make_sync_command() -> RuntimeCommandHandle {
25+
Box::new(SyncCommand {
26+
subcommand: sync::SyncSubcommand::Push,
27+
})
28+
}

cli/src/services/sync/mod.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
pub mod command;
2+
3+
use anyhow::{Context, Result};
4+
5+
use crate::services::agent_trace_db::AgentTraceDb;
6+
7+
pub const NAME: &str = "sync";
8+
9+
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
10+
pub enum SyncSubcommand {
11+
Push,
12+
Pull,
13+
}
14+
15+
/// Perform a push or pull sync operation on the Agent Trace database.
16+
///
17+
/// Opens the Agent Trace DB (which auto-detects sync mode from env vars).
18+
/// If the database is in sync mode, executes the requested operation.
19+
/// If in local mode, returns an error indicating sync is not configured.
20+
pub fn run_sync(subcommand: SyncSubcommand) -> Result<String> {
21+
let db = AgentTraceDb::new().context("failed to open Agent Trace DB for sync")?;
22+
23+
if !db.is_sync_mode() {
24+
return Err(anyhow::anyhow!(
25+
"Sync is not configured. Set SCE_SYNC_URL and SCE_SYNC_TOKEN to enable sync mode."
26+
));
27+
}
28+
29+
match subcommand {
30+
SyncSubcommand::Push => {
31+
db.push().with_context(|| "sync push failed")?;
32+
Ok("Pushed local changes to remote.".to_string())
33+
}
34+
SyncSubcommand::Pull => {
35+
let has_changes = db.pull().with_context(|| "sync pull failed")?;
36+
if has_changes {
37+
Ok("Pulled remote changes.".to_string())
38+
} else {
39+
Ok("No remote changes to pull.".to_string())
40+
}
41+
}
42+
}
43+
}

0 commit comments

Comments
 (0)