diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index e300a5e..e1c9e6c 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -1,9 +1,10 @@ #![no_std] -#![feature(vec_into_raw_parts)] #![allow(internal_features)] +#![feature(btree_set_entry)] #![feature(core_intrinsics)] #![feature(assert_matches)] #![feature(strict_overflow_ops)] +#![feature(vec_into_raw_parts)] extern crate alloc; @@ -33,6 +34,7 @@ mod schema; mod state; mod sync; mod sync_local; +mod update_hooks; mod util; mod uuid; mod version; @@ -79,6 +81,7 @@ fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), PowerSyncError> { crate::kv::register(db)?; crate::state::register(db, state.clone())?; sync::register(db, state.clone())?; + update_hooks::register(db, state.clone())?; crate::schema::register(db)?; crate::operations_vtab::register(db, state.clone())?; diff --git a/crates/core/src/state.rs b/crates/core/src/state.rs index a6ee0b9..e46f87e 100644 --- a/crates/core/src/state.rs +++ b/crates/core/src/state.rs @@ -1,9 +1,14 @@ use core::{ + cell::RefCell, ffi::{c_int, c_void}, sync::atomic::{AtomicBool, Ordering}, }; -use alloc::sync::Arc; +use alloc::{ + collections::btree_set::BTreeSet, + string::{String, ToString}, + sync::Arc, +}; use sqlite::{Connection, ResultCode}; use sqlite_nostd::{self as sqlite, Context}; @@ -14,12 +19,16 @@ use sqlite_nostd::{self as sqlite, Context}; /// functions/vtabs that need access to it. pub struct DatabaseState { pub is_in_sync_local: AtomicBool, + pending_updates: RefCell>, + commited_updates: RefCell>, } impl DatabaseState { pub fn new() -> Self { DatabaseState { is_in_sync_local: AtomicBool::new(false), + pending_updates: Default::default(), + commited_updates: Default::default(), } } @@ -39,6 +48,30 @@ impl DatabaseState { ClearOnDrop(self) } + pub fn track_update(&self, tbl: &str) { + let mut set = self.pending_updates.borrow_mut(); + set.get_or_insert_with(tbl, str::to_string); + } + + pub fn track_rollback(&self) { + self.pending_updates.borrow_mut().clear(); + } + + pub fn track_commit(&self) { + let mut commited = self.commited_updates.borrow_mut(); + let mut pending = self.pending_updates.borrow_mut(); + let pending = core::mem::replace(&mut *pending, Default::default()); + + for pending in pending.into_iter() { + commited.insert(pending); + } + } + + pub fn take_updates(&self) -> BTreeSet { + let mut committed = self.commited_updates.borrow_mut(); + core::mem::replace(&mut *committed, Default::default()) + } + pub unsafe extern "C" fn destroy_arc(ptr: *mut c_void) { drop(unsafe { Arc::from_raw(ptr.cast::()) }); } diff --git a/crates/core/src/update_hooks.rs b/crates/core/src/update_hooks.rs new file mode 100644 index 0000000..0642a66 --- /dev/null +++ b/crates/core/src/update_hooks.rs @@ -0,0 +1,171 @@ +use core::{ + ffi::{CStr, c_char, c_int, c_void}, + ptr::null_mut, + sync::atomic::{AtomicBool, Ordering}, +}; + +use alloc::{boxed::Box, sync::Arc}; +use sqlite_nostd::{ + self as sqlite, Connection, Context, ResultCode, Value, bindings::SQLITE_RESULT_SUBTYPE, +}; + +use crate::{constants::SUBTYPE_JSON, error::PowerSyncError, state::DatabaseState}; + +/// The `powersync_update_hooks` methods works like this: +/// +/// 1. `powersync_update_hooks('install')` installs update hooks on the database, failing if +/// another hook already exists. +/// 2. `powersync_update_hooks('get')` returns a JSON array of table names that have been changed +/// and comitted since the last `powersync_update_hooks` call. +/// +/// The update hooks don't have to be uninstalled manually, that happens when the connection is +/// closed and the function is unregistered. +pub fn register(db: *mut sqlite::sqlite3, state: Arc) -> Result<(), ResultCode> { + let state = Box::new(HookState { + has_registered_hooks: AtomicBool::new(false), + db, + state, + }); + + db.create_function_v2( + "powersync_update_hooks", + 1, + sqlite::UTF8 | sqlite::DETERMINISTIC | SQLITE_RESULT_SUBTYPE, + Some(Box::into_raw(state) as *mut c_void), + Some(powersync_update_hooks), + None, + None, + Some(destroy_function), + )?; + Ok(()) +} + +struct HookState { + has_registered_hooks: AtomicBool, + db: *mut sqlite::sqlite3, + state: Arc, +} + +extern "C" fn destroy_function(ctx: *mut c_void) { + let state = unsafe { Box::from_raw(ctx as *mut HookState) }; + + if state.has_registered_hooks.load(Ordering::Relaxed) { + check_previous( + "update", + &state.state, + state.db.update_hook(None, null_mut()), + ); + check_previous( + "commit", + &state.state, + state.db.commit_hook(None, null_mut()), + ); + check_previous( + "rollback", + &state.state, + state.db.rollback_hook(None, null_mut()), + ); + } +} + +extern "C" fn powersync_update_hooks( + ctx: *mut sqlite::context, + argc: c_int, + argv: *mut *mut sqlite::value, +) { + let args = sqlite::args!(argc, argv); + let op = args[0].text(); + let db = ctx.db_handle(); + let user_data = ctx.user_data() as *const HookState; + + match op { + "install" => { + let state = unsafe { user_data.as_ref().unwrap_unchecked() }; + let db_state = &state.state; + + check_previous( + "update", + db_state, + db.update_hook( + Some(update_hook_impl), + Arc::into_raw(db_state.clone()) as *mut c_void, + ), + ); + check_previous( + "commit", + db_state, + db.commit_hook( + Some(commit_hook_impl), + Arc::into_raw(db_state.clone()) as *mut c_void, + ), + ); + check_previous( + "rollback", + db_state, + db.rollback_hook( + Some(rollback_hook_impl), + Arc::into_raw(db_state.clone()) as *mut c_void, + ), + ); + state.has_registered_hooks.store(true, Ordering::Relaxed); + } + "get" => { + let state = unsafe { user_data.as_ref().unwrap_unchecked() }; + let formatted = serde_json::to_string(&state.state.take_updates()) + .map_err(PowerSyncError::internal); + match formatted { + Ok(result) => { + ctx.result_text_transient(&result); + ctx.result_subtype(SUBTYPE_JSON); + } + Err(e) => e.apply_to_ctx("powersync_update_hooks", ctx), + } + } + _ => { + ctx.result_error("Unknown operation"); + ctx.result_error_code(ResultCode::MISUSE); + } + }; +} + +unsafe extern "C" fn update_hook_impl( + ctx: *mut c_void, + _kind: c_int, + _db: *const c_char, + table: *const c_char, + _rowid: i64, +) { + let state = unsafe { (ctx as *const DatabaseState).as_ref().unwrap_unchecked() }; + let table = unsafe { CStr::from_ptr(table) }; + let Ok(table) = table.to_str() else { + return; + }; + + state.track_update(table); +} + +unsafe extern "C" fn commit_hook_impl(ctx: *mut c_void) -> c_int { + let state = unsafe { (ctx as *const DatabaseState).as_ref().unwrap_unchecked() }; + state.track_commit(); + return 0; // Allow commit to continue normally +} + +unsafe extern "C" fn rollback_hook_impl(ctx: *mut c_void) { + let state = unsafe { (ctx as *const DatabaseState).as_ref().unwrap_unchecked() }; + state.track_rollback(); +} + +fn check_previous(desc: &'static str, expected: &Arc, previous: *const c_void) { + let expected = Arc::as_ptr(expected); + + assert!( + previous.is_null() || previous == expected.cast(), + "Previous call to {desc} hook outside of PowerSync: Expected {expected:p}, installed was {previous:p}", + ); + if !previous.is_null() { + // The hook callbacks own an Arc that needs to be dropped now. + unsafe { + Arc::decrement_strong_count(previous); + } + } +} diff --git a/dart/test/update_hooks_test.dart b/dart/test/update_hooks_test.dart new file mode 100644 index 0000000..50a26d5 --- /dev/null +++ b/dart/test/update_hooks_test.dart @@ -0,0 +1,63 @@ +import 'dart:convert'; + +import 'package:sqlite3/common.dart'; +import 'package:test/test.dart'; + +import 'utils/native_test_utils.dart'; + +void main() { + late CommonDatabase db; + + setUp(() async { + db = openTestDatabase() + ..select('select powersync_init()') + ..execute('CREATE TABLE foo (bar INTEGER);') + ..select("SELECT powersync_update_hooks('install')"); + }); + + tearDown(() { + db.dispose(); + }); + + List collectUpdates() { + final [row] = db.select("SELECT powersync_update_hooks('get')"); + return (json.decode(row.values[0] as String) as List).cast(); + } + + test('is empty initially', () { + expect(collectUpdates(), isEmpty); + }); + + test('reports changed tables', () { + db.execute('INSERT INTO foo DEFAULT VALUES'); + expect(collectUpdates(), ['foo']); + }); + + test('deduplicates tables', () { + final stmt = db.prepare('INSERT INTO foo (bar) VALUES (?)'); + for (var i = 0; i < 1000; i++) { + stmt.execute([i]); + } + stmt.dispose(); + + expect(collectUpdates(), ['foo']); + }); + + test('does not report changes before end of transaction', () { + db.execute('BEGIN'); + db.execute('INSERT INTO foo DEFAULT VALUES'); + expect(collectUpdates(), isEmpty); + db.execute('COMMIT'); + + expect(collectUpdates(), ['foo']); + }); + + test('does not report rollbacks', () { + db.execute('BEGIN'); + db.execute('INSERT INTO foo DEFAULT VALUES'); + expect(collectUpdates(), isEmpty); + db.execute('ROLLBACK'); + + expect(collectUpdates(), isEmpty); + }); +} diff --git a/sqlite-rs-embedded b/sqlite-rs-embedded index f4bbd97..6a868dd 160000 --- a/sqlite-rs-embedded +++ b/sqlite-rs-embedded @@ -1 +1 @@ -Subproject commit f4bbd97a4714e5ab59062a60441978a00e3f87ad +Subproject commit 6a868dd8577b602a2bc9e150ddcd7f84a1c9c0e0