Skip to content

Commit 99e4850

Browse files
authored
Merge pull request #46 from powersync-ja/fix-existing-data
Fix existing data (remove dangling rows)
2 parents ebfbbef + c2d2ed0 commit 99e4850

15 files changed

+231
-15
lines changed

Cargo.lock

+4-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ inherits = "release"
2424
inherits = "wasm"
2525

2626
[workspace.package]
27-
version = "0.3.5"
27+
version = "0.3.6"
2828
edition = "2021"
2929
authors = ["JourneyApps"]
3030
keywords = ["sqlite", "powersync"]

android/build.gradle.kts

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
}
77

88
group = "co.powersync"
9-
version = "0.3.5"
9+
version = "0.3.6"
1010
description = "PowerSync Core SQLite Extension"
1111

1212
repositories {

android/src/prefab/prefab.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
"name": "powersync_sqlite_core",
33
"schema_version": 2,
44
"dependencies": [],
5-
"version": "0.3.5"
5+
"version": "0.3.6"
66
}

crates/core/src/fix035.rs

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use alloc::format;
2+
3+
use crate::error::{PSResult, SQLiteError};
4+
use sqlite_nostd as sqlite;
5+
use sqlite_nostd::{Connection, ResultCode};
6+
7+
use crate::ext::SafeManagedStmt;
8+
use crate::util::quote_identifier;
9+
10+
// Apply a data migration to fix any existing data affected by the issue
11+
// fixed in v0.3.5.
12+
//
13+
// The issue was that the `ps_updated_rows` table was not being populated
14+
// with remove operations in some cases. This causes the rows to be removed
15+
// from ps_oplog, but not from the ps_data__tables, resulting in dangling rows.
16+
//
17+
// The fix here is to find these dangling rows, and add them to ps_updated_rows.
18+
// The next time the sync_local operation is run, these rows will be removed.
19+
pub fn apply_v035_fix(db: *mut sqlite::sqlite3) -> Result<i64, SQLiteError> {
20+
// language=SQLite
21+
let statement = db
22+
.prepare_v2("SELECT name, powersync_external_table_name(name) FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data__*'")
23+
.into_db_result(db)?;
24+
25+
while statement.step()? == ResultCode::ROW {
26+
let full_name = statement.column_text(0)?;
27+
let short_name = statement.column_text(1)?;
28+
let quoted = quote_identifier(full_name);
29+
30+
// language=SQLite
31+
let statement = db.prepare_v2(&format!(
32+
"
33+
INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id)
34+
SELECT ?1, id FROM {}
35+
WHERE NOT EXISTS (
36+
SELECT 1 FROM ps_oplog
37+
WHERE row_type = ?1 AND row_id = {}.id
38+
);",
39+
quoted, quoted
40+
))?;
41+
statement.bind_text(1, short_name, sqlite::Destructor::STATIC)?;
42+
43+
statement.exec()?;
44+
}
45+
46+
Ok(1)
47+
}

crates/core/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ mod crud_vtab;
1717
mod diff;
1818
mod error;
1919
mod ext;
20+
mod fix035;
2021
mod kv;
2122
mod macros;
2223
mod migrations;

crates/core/src/migrations.rs

+20
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use sqlite_nostd as sqlite;
99
use sqlite_nostd::{Connection, Context};
1010

1111
use crate::error::{PSResult, SQLiteError};
12+
use crate::fix035::apply_v035_fix;
1213

1314
pub fn powersync_migrate(
1415
ctx: *mut sqlite::context,
@@ -283,5 +284,24 @@ VALUES(5,
283284
.into_db_result(local_db)?;
284285
}
285286

287+
if current_version < 6 && target_version >= 6 {
288+
if current_version != 0 {
289+
// Remove dangling rows, but skip if the database is created from scratch.
290+
apply_v035_fix(local_db)?;
291+
}
292+
293+
local_db
294+
.exec_safe(
295+
"\
296+
INSERT INTO ps_migration(id, down_migrations)
297+
VALUES(6,
298+
json_array(
299+
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 6')
300+
));
301+
",
302+
)
303+
.into_db_result(local_db)?;
304+
}
305+
286306
Ok(())
287307
}

crates/core/src/operations.rs

-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
129129

130130
while supersede_statement.step()? == ResultCode::ROW {
131131
// Superseded (deleted) a previous operation, add the checksum
132-
let superseded_op = supersede_statement.column_int64(0)?;
133132
let supersede_checksum = supersede_statement.column_int(1)?;
134133
add_checksum = add_checksum.wrapping_add(supersede_checksum);
135134
op_checksum = op_checksum.wrapping_sub(supersede_checksum);

crates/core/src/view_admin.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ fn powersync_init_impl(
120120

121121
setup_internal_views(local_db)?;
122122

123-
powersync_migrate(ctx, 5)?;
123+
powersync_migrate(ctx, 6)?;
124124

125125
Ok(String::from(""))
126126
}

dart/test/migration_test.dart

+36
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import 'package:test/test.dart';
55

66
import 'utils/native_test_utils.dart';
77
import 'utils/migration_fixtures.dart' as fixtures;
8+
import 'utils/fix_035_fixtures.dart' as fix035;
89
import 'utils/schema.dart';
910

1011
void main() {
@@ -175,5 +176,40 @@ void main() {
175176
'${fixtures.expectedState[3]!.replaceAll(RegExp(r';INSERT INTO ps_migration.*'), '').trim()}\n${fixtures.schemaDown3.trim()}';
176177
expect(schema, equals(expected));
177178
});
179+
180+
test('migrate from 5 with broken data', () async {
181+
var tableSchema = {
182+
'tables': [
183+
{
184+
'name': 'lists',
185+
'columns': [
186+
{'name': 'description', 'type': 'TEXT'}
187+
]
188+
},
189+
{
190+
'name': 'todos',
191+
'columns': [
192+
{'name': 'description', 'type': 'TEXT'}
193+
]
194+
}
195+
]
196+
};
197+
db.select('select powersync_init()');
198+
db.select(
199+
'select powersync_replace_schema(?)', [jsonEncode(tableSchema)]);
200+
201+
db.select('select powersync_test_migration(5)');
202+
db.execute(fix035.dataBroken);
203+
204+
db.select('select powersync_init()');
205+
final data = getData(db);
206+
expect(data, equals(fix035.dataMigrated.trim()));
207+
208+
db.select('insert into powersync_operations(op, data) values(?, ?)',
209+
['sync_local', '']);
210+
211+
final data2 = getData(db);
212+
expect(data2, equals(fix035.dataFixed.trim()));
213+
});
178214
});
179215
}

dart/test/utils/fix_035_fixtures.dart

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/// Data with some records in actual tables but not in ps_oplog
2+
const dataBroken = '''
3+
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
4+
(1, 'b1', 0, 0, 0, 0, 120, 0),
5+
(2, 'b2', 0, 0, 0, 0, 3, 0)
6+
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
7+
(1, 1, 'todos', 't1', '', '{}', 100),
8+
(1, 2, 'todos', 't2', '', '{}', 20),
9+
(2, 3, 'lists', 'l1', '', '{}', 3)
10+
;INSERT INTO ps_data__lists(id, data) VALUES
11+
('l1', '{}'),
12+
('l3', '{}')
13+
;INSERT INTO ps_data__todos(id, data) VALUES
14+
('t1', '{}'),
15+
('t2', '{}'),
16+
('t3', '{}')
17+
''';
18+
19+
/// Data after applying the migration fix, but before sync_local
20+
const dataMigrated = '''
21+
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
22+
(1, 'b1', 0, 0, 0, 0, 120, 0),
23+
(2, 'b2', 0, 0, 0, 0, 3, 0)
24+
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
25+
(1, 1, 'todos', 't1', '', '{}', 100),
26+
(1, 2, 'todos', 't2', '', '{}', 20),
27+
(2, 3, 'lists', 'l1', '', '{}', 3)
28+
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
29+
('lists', 'l3'),
30+
('todos', 't3')
31+
;INSERT INTO ps_data__lists(id, data) VALUES
32+
('l1', '{}'),
33+
('l3', '{}')
34+
;INSERT INTO ps_data__todos(id, data) VALUES
35+
('t1', '{}'),
36+
('t2', '{}'),
37+
('t3', '{}')
38+
''';
39+
40+
/// Data after applying the migration fix and sync_local
41+
const dataFixed = '''
42+
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
43+
(1, 'b1', 0, 0, 0, 0, 120, 0),
44+
(2, 'b2', 0, 0, 0, 0, 3, 0)
45+
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
46+
(1, 1, 'todos', 't1', '', '{}', 100),
47+
(1, 2, 'todos', 't2', '', '{}', 20),
48+
(2, 3, 'lists', 'l1', '', '{}', 3)
49+
;INSERT INTO ps_data__lists(id, data) VALUES
50+
('l1', '{}')
51+
;INSERT INTO ps_data__todos(id, data) VALUES
52+
('t1', '{}'),
53+
('t2', '{}')
54+
''';

dart/test/utils/migration_fixtures.dart

+54-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/// The current database version
2-
const databaseVersion = 5;
2+
const databaseVersion = 6;
33

44
/// This is the base database state that we expect at various schema versions.
55
/// Generated by loading the specific library version, and exporting the schema.
@@ -133,6 +133,45 @@ const expectedState = <int, String>{
133133
;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]')
134134
;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]')
135135
;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]')
136+
''',
137+
6: r'''
138+
;CREATE TABLE ps_buckets(
139+
id INTEGER PRIMARY KEY,
140+
name TEXT NOT NULL,
141+
last_applied_op INTEGER NOT NULL DEFAULT 0,
142+
last_op INTEGER NOT NULL DEFAULT 0,
143+
target_op INTEGER NOT NULL DEFAULT 0,
144+
add_checksum INTEGER NOT NULL DEFAULT 0,
145+
op_checksum INTEGER NOT NULL DEFAULT 0,
146+
pending_delete INTEGER NOT NULL DEFAULT 0
147+
) STRICT
148+
;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER)
149+
;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB)
150+
;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT)
151+
;CREATE TABLE ps_oplog(
152+
bucket INTEGER NOT NULL,
153+
op_id INTEGER NOT NULL,
154+
row_type TEXT,
155+
row_id TEXT,
156+
key TEXT,
157+
data TEXT,
158+
hash INTEGER NOT NULL) STRICT
159+
;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER)
160+
;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id))
161+
;CREATE TABLE ps_updated_rows(
162+
row_type TEXT,
163+
row_id TEXT,
164+
PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID
165+
;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name)
166+
;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key)
167+
;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id)
168+
;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id)
169+
;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null)
170+
;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]')
171+
;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]')
172+
;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]')
173+
;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]')
174+
;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]')
136175
'''
137176
};
138177

@@ -180,13 +219,24 @@ const data1 = <int, String>{
180219
(2, 3, 'lists', 'l1', '', '{}', 3)
181220
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
182221
('lists', 'l2')
222+
''',
223+
6: r'''
224+
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
225+
(1, 'b1', 0, 0, 0, 0, 120, 0),
226+
(2, 'b2', 0, 0, 0, 1005, 3, 0)
227+
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
228+
(1, 1, 'todos', 't1', '', '{}', 100),
229+
(1, 2, 'todos', 't2', '', '{}', 20),
230+
(2, 3, 'lists', 'l1', '', '{}', 3)
231+
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
232+
('lists', 'l2')
183233
'''
184234
};
185235

186236
/// data to test "down" migrations
187237
/// This is slightly different from the above,
188238
/// since we don't preserve all data in the migration process
189-
const dataDown1 = <int, String>{
239+
final dataDown1 = <int, String>{
190240
2: r'''
191241
;INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, pending_delete) VALUES
192242
('$local', 0, 0, 9223372036854775807, 0, 1),
@@ -219,7 +269,8 @@ const dataDown1 = <int, String>{
219269
('b1', 1, 3, 'todos', 't1', '', '{}', 100, 0),
220270
('b1', 2, 3, 'todos', 't2', '', '{}', 20, 0),
221271
('b2', 3, 3, 'lists', 'l1', '', '{}', 3, 0)
222-
'''
272+
''',
273+
5: data1[5]!
223274
};
224275

225276
final finalData1 = data1[databaseVersion]!;

dart/test/utils/schema.dart

+8
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ String getData(CommonDatabase db) {
6161
{
6262
'table': 'ps_updated_rows',
6363
'query': 'select * from ps_updated_rows order by row_type, row_id'
64+
},
65+
{
66+
'table': 'ps_data__lists',
67+
'query': 'select * from ps_data__lists order by id'
68+
},
69+
{
70+
'table': 'ps_data__todos',
71+
'query': 'select * from ps_data__todos order by id'
6472
}
6573
];
6674
List<String> result = [];

powersync-sqlite-core.podspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Pod::Spec.new do |s|
22
s.name = 'powersync-sqlite-core'
3-
s.version = '0.3.5'
3+
s.version = '0.3.6'
44
s.summary = 'PowerSync SQLite Extension'
55
s.description = <<-DESC
66
PowerSync extension for SQLite.

tool/build_xcframework.sh

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ function createXcframework() {
2828
<key>MinimumOSVersion</key>
2929
<string>11.0</string>
3030
<key>CFBundleVersion</key>
31-
<string>0.3.5</string>
31+
<string>0.3.6</string>
3232
<key>CFBundleShortVersionString</key>
33-
<string>0.3.5</string>
33+
<string>0.3.6</string>
3434
</dict>
3535
</plist>
3636
EOF

0 commit comments

Comments
 (0)