Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 0 additions & 36 deletions crates/core/src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ use powersync_sqlite_nostd as sqlite;
use powersync_sqlite_nostd::{Connection, Context};
use sqlite::ResultCode;

use crate::create_sqlite_optional_text_fn;
use crate::create_sqlite_text_fn;
use crate::error::PowerSyncError;
use crate::sync::BucketPriority;

fn powersync_client_id_impl(
ctx: *mut sqlite::context,
Expand Down Expand Up @@ -39,30 +37,6 @@ create_sqlite_text_fn!(
"powersync_client_id"
);

fn powersync_last_synced_at_impl(
ctx: *mut sqlite::context,
_args: &[*mut sqlite::value],
) -> Result<Option<String>, ResultCode> {
let db = ctx.db_handle();

// language=SQLite
let statement = db.prepare_v2("select last_synced_at from ps_sync_state where priority = ?")?;
statement.bind_int(1, BucketPriority::SENTINEL.into())?;

if statement.step()? == ResultCode::ROW {
let client_id = statement.column_text(0)?;
Ok(Some(client_id.to_string()))
} else {
Ok(None)
}
}

create_sqlite_optional_text_fn!(
powersync_last_synced_at,
powersync_last_synced_at_impl,
"powersync_last_synced_at"
);

pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
db.create_function_v2(
"powersync_client_id",
Expand All @@ -74,16 +48,6 @@ pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
None,
None,
)?;
db.create_function_v2(
"powersync_last_synced_at",
0,
sqlite::UTF8 | sqlite::DETERMINISTIC,
None,
Some(powersync_last_synced_at),
None,
None,
None,
)?;

Ok(())
}
1 change: 0 additions & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ mod pre_close_vtab;
mod schema;
mod state;
mod sync;
mod sync_local;
mod update_hooks;
mod utils;
mod uuid;
Expand Down
55 changes: 53 additions & 2 deletions crates/core/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@ use alloc::format;
use alloc::string::{String, ToString};
use alloc::vec::Vec;

use powersync_sqlite_nostd as sqlite;
use powersync_sqlite_nostd::{self as sqlite, Destructor};
use powersync_sqlite_nostd::{Connection, Context};
use serde::Serialize;
use serde_json::json;
use sqlite::ResultCode;

use crate::error::{PSResult, PowerSyncError};
use crate::ext::SafeManagedStmt;
use crate::fix_data::apply_v035_fix;
use crate::schema::inspection::ExistingView;
use crate::sync::BucketPriority;

pub const LATEST_VERSION: i32 = 12;
pub const LATEST_VERSION: i32 = 13;

pub fn powersync_migrate(
ctx: *mut sqlite::context,
Expand Down Expand Up @@ -424,5 +427,53 @@ json_object('sql', 'DELETE FROM ps_migration WHERE id >= 12')
local_db.exec_safe(stmt).into_db_result(local_db)?;
}

if current_version < 13 && target_version >= 13 {
let up = "\
UPDATE ps_stream_subscriptions SET expires_at = expires_at * 1000000, last_synced_at = last_synced_at * 1000000;
ALTER TABLE ps_sync_state RENAME TO ps_sync_state_old;
CREATE TABLE ps_sync_state (
priority INTEGER NOT NULL PRIMARY KEY,
last_synced_at INTEGER NOT NULL
) STRICT;
INSERT INTO ps_sync_state (priority, last_synced_at)
SELECT priority, unixepoch(last_synced_at) * 1000000 FROM ps_sync_state_old;
DROP TABLE ps_sync_state_old;
";
local_db.exec_safe(up).into_db_result(local_db)?;

const DOWN_STATEMENTS: &[&str] = &[
"UPDATE ps_stream_subscriptions SET expires_at = expires_at / 1000000, last_synced_at = last_synced_at / 1000000",
"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new",
"CREATE TABLE ps_sync_state (
priority INTEGER NOT NULL PRIMARY KEY,
last_synced_at TEXT NOT NULL
) STRICT;",
"INSERT INTO ps_sync_state (priority, last_synced_at) SELECT priority, datetime(last_synced_at / 1000000, 'unixepoch') FROM ps_sync_state_new",
"DROP TABLE ps_sync_state_new",
"DELETE FROM ps_migration WHERE id >= 13",
];
let down = serialize_down_statements(DOWN_STATEMENTS)?;
let track_migration =
local_db.prepare_v2("INSERT INTO ps_migration(id, down_migrations) VALUES (?, ?)")?;
track_migration.bind_int(1, 13)?;
track_migration.bind_text(2, &down, Destructor::STATIC)?;
track_migration.exec()?;
}

Ok(())
}

fn serialize_down_statements(statements: &[&'static str]) -> Result<String, PowerSyncError> {
struct DownStatements<'a>(&'a [&'static str]);

impl<'a> Serialize for DownStatements<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.collect_seq(self.0.iter().map(|s| json!({"sql": s})).into_iter())
}
}

serde_json::to_string(&DownStatements(statements)).map_err(PowerSyncError::internal)
}
25 changes: 23 additions & 2 deletions crates/core/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use powersync_sqlite_nostd::{self as sqlite, Context};
use sqlite::{Connection, ResultCode};

use crate::{
error::PowerSyncError,
schema::{InferredSchemaCache, Schema},
sync::SyncClient,
sync::{SyncClient, storage_adapter::StorageAdapter},
};

/// State that is shared for a SQLite database connection after the core extension has been
Expand All @@ -27,6 +28,7 @@ pub struct DatabaseState {
schema: RefCell<Option<Schema>>,
pending_updates: RefCell<BTreeSet<String>>,
commited_updates: RefCell<BTreeSet<String>>,
pub storage_adapter: RefCell<Option<Rc<StorageAdapter>>>,
pub sync_client: RefCell<Option<SyncClient>>,
/// Cached put and delete statements for raw tables, used by the `sync_local` step of the sync
/// client.
Expand Down Expand Up @@ -97,10 +99,29 @@ impl DatabaseState {
core::mem::replace(&mut *committed, Default::default())
}

pub fn storage_adapter(
&self,
db: *mut sqlite::sqlite3,
) -> Result<Rc<StorageAdapter>, PowerSyncError> {
let mut adapter = self.storage_adapter.borrow_mut();
Ok(match *adapter {
Some(ref adapter) => {
debug_assert!(db == adapter.db);
adapter.clone()
}
None => {
let created = Rc::new(StorageAdapter::new(db)?);
*adapter = Some(created.clone());
created
}
})
}

/// Releases global resources (like prepared statements for the sync client) referenced from
/// this state.
pub fn release_resources(&self) {
self.sync_client.replace(None);
self.sync_client.take();
self.storage_adapter.take();
}

/// ## Safety
Expand Down
27 changes: 15 additions & 12 deletions crates/core/src/sync/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::error::PowerSyncError;
use crate::schema::Schema;
use crate::state::DatabaseState;
use crate::sync::diagnostics::{DiagnosticOptions, DiagnosticsEvent};
use crate::sync::storage_adapter::StorageAdapter;
use crate::sync::subscriptions::{StreamKey, apply_subscriptions};
use alloc::borrow::Cow;
use alloc::boxed::Box;
Expand Down Expand Up @@ -267,9 +266,10 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<()
}
}),
"subscriptions" => {
let adapter = state.storage_adapter(db)?;
let request = serde_json::from_str(payload.text())
.map_err(PowerSyncError::as_argument_error)?;
return apply_subscriptions(db, request);
return apply_subscriptions(&adapter, request);
}
_ => {
return Err(PowerSyncError::argument_error("Unknown operation"));
Expand All @@ -278,14 +278,15 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<()

let instructions = {
let mut client = state.sync_client.borrow_mut();

client
.get_or_insert_with(|| {
let client = match *client {
Some(ref mut client) => client,
None => {
let state = unsafe { DatabaseState::clone_from(ctx.user_data()) };

SyncClient::new(db, &state)
})
.push_event(event)
let created = SyncClient::new(db, &state)?;
client.insert(created)
}
};
client.push_event(event)
}?;

let formatted =
Expand Down Expand Up @@ -316,11 +317,11 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<()
"powersync_offline_sync_status",
0,
sqlite::UTF8 | sqlite::DIRECTONLY | SQLITE_RESULT_SUBTYPE,
None,
Some(Rc::into_raw(state.clone()) as *mut c_void),
Some(powersync_offline_sync_status),
None,
None,
None,
Some(DatabaseState::destroy_rc),
)?;

Ok(())
Expand All @@ -330,7 +331,9 @@ fn powersync_offline_sync_status_impl(
ctx: *mut sqlite::context,
_args: &[*mut sqlite::value],
) -> Result<String, PowerSyncError> {
let adapter = StorageAdapter::new(ctx.db_handle())?;
let db_state = unsafe { DatabaseState::from_context(&ctx) };
let adapter = db_state.storage_adapter(ctx.db_handle())?;

let state = adapter.offline_sync_state()?;
let serialized = serde_json::to_string(&state).map_err(PowerSyncError::internal)?;

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod operations;
pub mod storage_adapter;
mod streaming_sync;
mod subscriptions;
mod sync_local;
mod sync_status;

pub use bucket_priority::BucketPriority;
Expand Down
Loading
Loading