Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions crates/core/src/operations_vtab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,14 @@ extern "C" fn connect(
}

extern "C" fn disconnect(vtab: *mut sqlite::vtab) -> c_int {
unsafe {
drop(Box::from_raw(vtab as *mut VirtualTable));
}
// Assume ownership of vtab since xDisconnect is supposed to destroy the connection.
let vtab = unsafe { Box::from_raw(vtab as *mut VirtualTable) };

// This is an eponymous virtual table. It will only be disconnected when the database is closed.
// So we can use this as a "pre-close" hook and ensure we clear prepared statements the core
// extension might hold.
vtab.state.release_resources();

ResultCode::OK as c_int
}

Expand Down Expand Up @@ -103,6 +108,11 @@ extern "C" fn update(
} else if op == "delete_bucket" {
let result = delete_bucket(db, args[3].text());
vtab_result(vtab, result)
} else if op == "noop" {
// We call this to ensure the table is added to an active database, ensuring that
// the disconnect callback runs before the database is closed (allowing us to free
// resources from the client state).
ResultCode::OK as c_int
} else {
ResultCode::MISUSE as c_int
}
Expand Down
9 changes: 8 additions & 1 deletion crates/core/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use alloc::{
use powersync_sqlite_nostd::{self as sqlite, Context};
use sqlite::{Connection, ResultCode};

use crate::schema::Schema;
use crate::{schema::Schema, sync::SyncClient};

/// State that is shared for a SQLite database connection after the core extension has been
/// registered on it.
Expand All @@ -24,6 +24,7 @@ pub struct DatabaseState {
schema: RefCell<Option<Schema>>,
pending_updates: RefCell<BTreeSet<String>>,
commited_updates: RefCell<BTreeSet<String>>,
pub sync_client: RefCell<Option<SyncClient>>,
}

impl DatabaseState {
Expand Down Expand Up @@ -90,6 +91,12 @@ impl DatabaseState {
core::mem::replace(&mut *committed, Default::default())
}

/// Releases global resources (like prepared statements for the sync client) referenced from
/// this state.
pub fn release_resources(&self) {
self.sync_client.replace(None);
}

/// ## Safety
///
/// This is only safe to call when an `Rc<DatabaseState>` has been installed as the `user_data`
Expand Down
68 changes: 36 additions & 32 deletions crates/core/src/sync/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,26 +188,17 @@ pub struct BucketRequest {
pub after: String,
}

/// Wrapper around a [SyncClient].
///
/// We allocate one instance of this per database (in [register]) - the [SyncClient] has an initial
/// empty state that doesn't consume any resources.
struct SqlController {
client: SyncClient,
}

pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<(), ResultCode> {
extern "C" fn control(
ctx: *mut sqlite::context,
argc: c_int,
argv: *mut *mut sqlite::value,
) -> () {
let result = (|| -> Result<(), PowerSyncError> {
debug_assert!(!ctx.db_handle().get_autocommit());

let controller = unsafe { ctx.user_data().cast::<SqlController>().as_mut() }
.ok_or_else(|| PowerSyncError::unknown_internal())?;
let db = ctx.db_handle();
debug_assert!(!db.get_autocommit());

let state = unsafe { DatabaseState::from_context(&ctx) };
let args = sqlite::args!(argc, argv);
let [op, payload] = args else {
// This should be unreachable, we register the function with two arguments.
Expand All @@ -222,14 +213,24 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<()

let op = op.text();
let event = match op {
"start" => SyncControlRequest::StartSyncStream({
if payload.value_type() == ColumnType::Text {
serde_json::from_str(payload.text())
.map_err(PowerSyncError::as_argument_error)?
} else {
StartSyncStream::default()
}
}),
"start" => {
// Ensure the operations vtab exists. It's not actually used by the sync client,
// but we rely on that vtab being destroyed as a pre-close hook for the database
// connection to free statements preserved across multiple powersync_control
// invocations.
db.exec_safe(
"insert into powersync_operations (op, data) VALUES ('noop', null);",
)?;

SyncControlRequest::StartSyncStream({
if payload.value_type() == ColumnType::Text {
serde_json::from_str(payload.text())
.map_err(PowerSyncError::as_argument_error)?
} else {
StartSyncStream::default()
}
})
}
"stop" => SyncControlRequest::StopSyncStream,
"line_text" => SyncControlRequest::SyncEvent(SyncEvent::TextLine {
data: if payload.value_type() == ColumnType::Text {
Expand Down Expand Up @@ -267,14 +268,25 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<()
"subscriptions" => {
let request = serde_json::from_str(payload.text())
.map_err(PowerSyncError::as_argument_error)?;
return apply_subscriptions(ctx.db_handle(), request);
return apply_subscriptions(db, request);
}
_ => {
return Err(PowerSyncError::argument_error("Unknown operation"));
}
};

let instructions = controller.client.push_event(event)?;
let instructions = {
let mut client = state.sync_client.borrow_mut();

client
.get_or_insert_with(|| {
let state = unsafe { DatabaseState::clone_from(ctx.user_data()) };

SyncClient::new(db, &state)
})
.push_event(event)
}?;

let formatted =
serde_json::to_string(&instructions).map_err(PowerSyncError::internal)?;
ctx.result_text_transient(&formatted);
Expand All @@ -288,23 +300,15 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<()
}
}

unsafe extern "C" fn destroy(ptr: *mut c_void) {
drop(unsafe { Box::from_raw(ptr.cast::<SqlController>()) });
}

let controller = Box::new(SqlController {
client: SyncClient::new(db, state),
});

db.create_function_v2(
"powersync_control",
2,
sqlite::UTF8 | sqlite::DIRECTONLY | SQLITE_RESULT_SUBTYPE,
Some(Box::into_raw(controller).cast()),
Some(Rc::into_raw(state.clone()) as *mut c_void),
Some(control),
None,
None,
Some(destroy),
Some(DatabaseState::destroy_rc),
)?;

db.create_function_v2(
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub use bucket_priority::BucketPriority;
pub use checksum::Checksum;

use crate::state::DatabaseState;
pub use streaming_sync::SyncClient;

pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<(), ResultCode> {
interface::register(db, state)
Expand Down
23 changes: 14 additions & 9 deletions crates/core/src/sync/streaming_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use alloc::{
boxed::Box,
collections::{btree_map::BTreeMap, btree_set::BTreeSet},
format,
rc::Rc,
rc::{Rc, Weak},
string::{String, ToString},
vec::Vec,
};
Expand Down Expand Up @@ -51,16 +51,16 @@ use super::{
/// initialized.
pub struct SyncClient {
db: *mut sqlite::sqlite3,
db_state: Rc<DatabaseState>,
db_state: Weak<DatabaseState>,
/// The current [ClientState] (essentially an optional [StreamingSyncIteration]).
state: ClientState,
}

impl SyncClient {
pub fn new(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Self {
pub fn new(db: *mut sqlite::sqlite3, state: &Rc<DatabaseState>) -> Self {
Self {
db,
db_state: state,
db_state: Rc::downgrade(state),
state: ClientState::Idle,
}
}
Expand Down Expand Up @@ -145,7 +145,7 @@ impl SyncIterationHandle {
fn new(
db: *mut sqlite::sqlite3,
options: StartSyncStream,
state: Rc<DatabaseState>,
state: Weak<DatabaseState>,
) -> Result<Self, PowerSyncError> {
let runner = StreamingSyncIteration {
db,
Expand Down Expand Up @@ -224,7 +224,7 @@ impl<'a> ActiveEvent<'a> {

struct StreamingSyncIteration {
db: *mut sqlite::sqlite3,
state: Rc<DatabaseState>,
state: Weak<DatabaseState>,
adapter: StorageAdapter,
options: StartSyncStream,
status: SyncStatusContainer,
Expand Down Expand Up @@ -813,9 +813,14 @@ impl StreamingSyncIteration {
target: &OwnedCheckpoint,
priority: Option<BucketPriority>,
) -> Result<SyncLocalResult, PowerSyncError> {
let result =
self.adapter
.sync_local(&self.state, target, priority, &self.options.schema)?;
let state = match self.state.upgrade() {
Some(state) => state,
None => return Err(PowerSyncError::unknown_internal()),
};

let result = self
.adapter
.sync_local(&*state, target, priority, &self.options.schema)?;

if matches!(&result, SyncLocalResult::ChangesApplied) {
// Update affected stream subscriptions to mark them as synced.
Expand Down
18 changes: 18 additions & 0 deletions dart/test/sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import 'package:path/path.dart';

import 'utils/native_test_utils.dart';
import 'utils/test_utils.dart';
import 'utils/tracking_vfs.dart';

void main() {
final vfs =
Expand Down Expand Up @@ -1375,6 +1376,23 @@ CREATE TRIGGER users_ref_delete
expect(db.select('select * from user_reference'), isEmpty);
});
});

test('can close database while iteration is active', () {
// The sync client caches prepared statements, we need to ensure those are
// freed when we close the connection since SQLite would keep files open
// otherwise.
final vfs = TrackingFileSystem(
parent: InMemoryFileSystem(), name: 'sync-test-cleanup');
sqlite3.registerVirtualFileSystem(vfs);
addTearDown(() => sqlite3.unregisterVirtualFileSystem(vfs));

db = openTestDatabase(vfs: vfs, fileName: '/test.db')
..select('select powersync_init();');
invokeControl('start', null);
expect(vfs.openFiles, isPositive);
db.dispose();
expect(vfs.openFiles, isZero);
});
}

final priorityBuckets = [
Expand Down
3 changes: 3 additions & 0 deletions dart/test/utils/tracking_vfs.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ final class TrackingFileSystem extends BaseVirtualFileSystem {
int tempWrites = 0;
int dataReads = 0;
int dataWrites = 0;
int openFiles = 0;

TrackingFileSystem({super.name = 'tracking', required this.parent});

Expand All @@ -29,6 +30,7 @@ final class TrackingFileSystem extends BaseVirtualFileSystem {
@override
XOpenResult xOpen(Sqlite3Filename path, int flags) {
final result = parent.xOpen(path, flags);
openFiles++;
return (
outFlags: result.outFlags,
file: TrackingFile(
Expand Down Expand Up @@ -85,6 +87,7 @@ class TrackingFile implements VirtualFileSystemFile {

@override
void xClose() {
vfs.openFiles--;
return parentFile.xClose();
}

Expand Down