Skip to content

Commit 1e4fee9

Browse files
update watch implementation
1 parent abca0c9 commit 1e4fee9

File tree

4 files changed

+128
-72
lines changed

4 files changed

+128
-72
lines changed

Demo/PowerSyncExample/PowerSync/SystemManager.swift

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,17 @@ class SystemManager {
3636
func watchLists(_ callback: @escaping (_ lists: [ListContent]) -> Void ) async {
3737
do {
3838
for try await lists in try self.db.watch<ListContent>(
39-
sql: "SELECT * FROM \(LISTS_TABLE)",
40-
parameters: [],
41-
mapper: { cursor in
42-
try ListContent(
43-
id: cursor.getString(name: "id"),
44-
name: cursor.getString(name: "name"),
45-
createdAt: cursor.getString(name: "created_at"),
46-
ownerId: cursor.getString(name: "owner_id")
47-
)
48-
}
39+
options: WatchOptions(
40+
sql: "SELECT * FROM \(LISTS_TABLE)",
41+
mapper: { cursor in
42+
try ListContent(
43+
id: cursor.getString(name: "id"),
44+
name: cursor.getString(name: "name"),
45+
createdAt: cursor.getString(name: "created_at"),
46+
ownerId: cursor.getString(name: "owner_id")
47+
)
48+
}
49+
)
4950
) {
5051
callback(lists)
5152
}
@@ -55,10 +56,20 @@ class SystemManager {
5556
}
5657

5758
func insertList(_ list: NewListContent) async throws {
58-
_ = try await self.db.execute(
59-
sql: "INSERT INTO \(LISTS_TABLE) (id, created_at, name, owner_id) VALUES (uuid(), datetime(), ?, ?)",
60-
parameters: [list.name, connector.currentUserID]
59+
let id = try await self.db.get(sql: "select uuid() as uuid", parameters: [], mapper: {cursor in try cursor.getString(name: "uuid")})
60+
61+
let result = try await self.db.execute(
62+
sql: "INSERT INTO \(LISTS_TABLE) (id, created_at, name, owner_id) VALUES (?, datetime(), ?, ?)",
63+
parameters: [id, list.name, connector.currentUserID]
6164
)
65+
66+
// insert 2000k todos
67+
// try await self.db.writeTransaction(callback: { tx in
68+
// for (_) in 0..<10000 {
69+
// try tx.execute(sql: "INSERT INTO \(TODOS_TABLE) (id, list_id, description) VALUES (uuid(), ?, ?)", parameters: [id, "Todo \(Int.random(in: 0..<1000))"])
70+
// }
71+
72+
// })
6273
}
6374

6475
func deleteList(id: String) async throws {
@@ -86,8 +97,8 @@ class SystemManager {
8697
listId: cursor.getString(name: "list_id"),
8798
photoId: cursor.getStringOptional(name: "photo_id"),
8899
description: cursor.getString(name: "description"),
89-
isComplete: cursor.getBoolean(name: "completed"),
90-
createdAt: cursor.getString(name: "created_at"),
100+
isComplete: cursor.getBooleanOptional(name: "completed") ?? false,
101+
createdAt: cursor.getStringOptional(name: "created_at"),
91102
completedAt: cursor.getStringOptional(name: "completed_at"),
92103
createdBy: cursor.getStringOptional(name: "created_by"),
93104
completedBy: cursor.getStringOptional(name: "completed_by")

Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift

Lines changed: 41 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -161,64 +161,58 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
161161
parameters: [Any]?,
162162
mapper: @escaping (SqlCursor) -> RowType
163163
) throws -> AsyncThrowingStream<[RowType], Error> {
164-
AsyncThrowingStream { continuation in
165-
Task {
166-
do {
167-
for await values in try self.kotlinDatabase.watch(
168-
sql: sql,
169-
parameters: parameters,
170-
mapper: mapper
171-
) {
172-
try continuation.yield(safeCast(values, to: [RowType].self))
173-
}
174-
continuation.finish()
175-
} catch {
176-
continuation.finish(throwing: error)
177-
}
178-
}
179-
}
164+
try watch(options: WatchOptions(sql: sql, parameters: parameters, mapper: mapper))
180165
}
181166

182167
func watch<RowType>(
183168
sql: String,
184169
parameters: [Any]?,
185170
mapper: @escaping (SqlCursor) throws -> RowType
186171
) throws -> AsyncThrowingStream<[RowType], Error> {
187-
AsyncThrowingStream { continuation in
188-
Task {
189-
do {
190-
var mapperError: Error?
191-
for try await values in try self.kotlinDatabase.watch(
192-
sql: sql,
193-
parameters: parameters,
194-
mapper: { cursor in do {
195-
return try mapper(cursor)
196-
} catch {
197-
mapperError = error
198-
// The value here does not matter. We will throw the exception later
199-
// This is not ideal, this is only a workaround until we expose fine grained access to Kotlin SDK internals.
200-
return nil as RowType?
201-
} }
202-
) {
203-
if mapperError != nil {
204-
throw mapperError!
172+
try watch(options: WatchOptions(sql: sql, parameters: parameters, mapper: mapper))
173+
}
174+
175+
func watch<RowType>(
176+
options: WatchOptions<RowType>
177+
) throws -> AsyncThrowingStream<[RowType], Error> {
178+
AsyncThrowingStream { continuation in
179+
Task {
180+
do {
181+
var mapperError: Error?
182+
for try await values in try self.kotlinDatabase.watch(
183+
sql: options.sql,
184+
parameters: options.parameters,
185+
throttleMs: KotlinLong(value: options.throttleMs),
186+
mapper: { cursor in do {
187+
return try options.mapper(cursor)
188+
} catch {
189+
mapperError = error
190+
// The value here does not matter. We will throw the exception later
191+
// This is not ideal, this is only a workaround until we expose fine grained access to Kotlin SDK internals.
192+
return nil as RowType?
193+
} }
194+
) {
195+
if mapperError != nil {
196+
throw mapperError!
197+
}
198+
try continuation.yield(safeCast(values, to: [RowType].self))
205199
}
206-
try continuation.yield(safeCast(values, to: [RowType].self))
200+
continuation.finish()
201+
} catch {
202+
continuation.finish(throwing: error)
207203
}
208-
continuation.finish()
209-
} catch {
210-
continuation.finish(throwing: error)
211204
}
212205
}
213206
}
214-
}
215-
216-
public func writeTransaction<R>(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R {
217-
return try safeCast(await kotlinDatabase.writeTransaction(callback: TransactionCallback(callback: callback)), to: R.self)
218-
}
219-
220-
public func readTransaction<R>(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R {
221-
return try safeCast(await kotlinDatabase.readTransaction(callback: TransactionCallback(callback: callback)), to: R.self)
222-
}
207+
208+
209+
func writeTransaction<R>(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R {
210+
return try safeCast(await kotlinDatabase.writeTransaction(callback: TransactionCallback(callback: callback)), to: R.self)
211+
}
212+
213+
func readTransaction<R>(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R {
214+
return try safeCast(await kotlinDatabase.readTransaction(callback: TransactionCallback(callback: callback)), to: R.self)
215+
}
216+
223217
}
224218

Sources/PowerSync/QueriesProtocol.swift

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,22 @@ import Foundation
22
import Combine
33
import PowerSyncKotlin
44

5+
public let DEFAULT_WATCH_THROTTLE_MS = Int64(30);
6+
7+
public struct WatchOptions<RowType> {
8+
public var sql: String
9+
public var parameters: [Any]
10+
public var throttleMs: Int64
11+
public var mapper: (SqlCursor) throws -> RowType
12+
13+
public init(sql: String, parameters: [Any]? = [], throttleMs: Int64? = DEFAULT_WATCH_THROTTLE_MS, mapper: @escaping (SqlCursor) throws -> RowType) {
14+
self.sql = sql
15+
self.parameters = parameters ?? [] // Default to empty array if nil
16+
self.throttleMs = throttleMs ?? DEFAULT_WATCH_THROTTLE_MS // Default to the constant if nil
17+
self.mapper = mapper;
18+
}
19+
}
20+
521
public protocol Queries {
622
/// Execute a write query (INSERT, UPDATE, DELETE)
723
/// Using `RETURNING *` will result in an error.
@@ -68,6 +84,10 @@ public protocol Queries {
6884
parameters: [Any]?,
6985
mapper: @escaping (SqlCursor) throws -> RowType
7086
) throws -> AsyncThrowingStream<[RowType], Error>
87+
88+
func watch<RowType>(
89+
options: WatchOptions<RowType>
90+
) throws -> AsyncThrowingStream<[RowType], Error>
7191

7292
/// Execute a write transaction with the given callback
7393
func writeTransaction<R>(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R
@@ -108,4 +128,27 @@ extension Queries {
108128
) throws -> AsyncThrowingStream<[RowType], Error> {
109129
return try watch(sql: sql, parameters: [], mapper: mapper)
110130
}
131+
132+
133+
/// Execute a read-only (SELECT) query every time the source tables are modified
134+
/// and return the results as an array in a Publisher.
135+
func watch<RowType>(
136+
sql: String,
137+
parameters: [Any]?,
138+
mapper: @escaping (SqlCursor) -> RowType
139+
) throws -> AsyncThrowingStream<[RowType], Error> {
140+
return try watch(sql: sql, parameters: [], mapper: mapper)
141+
}
142+
143+
/// Execute a read-only (SELECT) query every time the source tables are modified
144+
/// and return the results as an array in a Publisher.
145+
func watch<RowType>(
146+
sql: String,
147+
parameters: [Any]?,
148+
mapper: @escaping (SqlCursor) throws -> RowType
149+
) throws -> AsyncThrowingStream<[RowType], Error> {
150+
return try watch(sql: sql, parameters: [], mapper: mapper)
151+
}
152+
153+
111154
}

Tests/PowerSyncTests/Kotlin/KotlinPowerSyncDatabaseImplTests.swift

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -198,13 +198,13 @@ final class KotlinPowerSyncDatabaseImplTests: XCTestCase {
198198

199199
// Create an actor to handle concurrent mutations
200200
actor ResultsStore {
201-
private var results: [[String]] = []
201+
private var results: Set<String> = []
202202

203203
func append(_ names: [String]) {
204-
results.append(names)
204+
results.formUnion(names)
205205
}
206206

207-
func getResults() -> [[String]] {
207+
func getResults() -> Set<String> {
208208
results
209209
}
210210

@@ -213,17 +213,20 @@ final class KotlinPowerSyncDatabaseImplTests: XCTestCase {
213213
}
214214
}
215215

216+
216217
let resultsStore = ResultsStore()
217218

218219
let stream = try database.watch(
219-
sql: "SELECT name FROM users ORDER BY id",
220-
parameters: nil
221-
) { cursor in
222-
cursor.getString(index: 0)!
223-
}
220+
options: WatchOptions(
221+
sql: "SELECT name FROM users ORDER BY id",
222+
mapper: { cursor in
223+
cursor.getString(index: 0)!
224+
}
225+
))
224226

225227
let watchTask = Task {
226228
for try await names in stream {
229+
print(names)
227230
await resultsStore.append(names)
228231
if await resultsStore.count() == 2 {
229232
expectation.fulfill()
@@ -240,13 +243,18 @@ final class KotlinPowerSyncDatabaseImplTests: XCTestCase {
240243
sql: "INSERT INTO users (id, name, email) VALUES (?, ?, ?)",
241244
parameters: ["2", "User 2", "[email protected]"]
242245
)
246+
243247

244248
await fulfillment(of: [expectation], timeout: 5)
245249
watchTask.cancel()
246250

247251
let finalResults = await resultsStore.getResults()
248-
XCTAssertEqual(finalResults.count, 2)
249-
XCTAssertEqual(finalResults[1], ["User 1", "User 2"])
252+
// The count of invocations here can vary a lot depending on the order of execution
253+
// In some cases the creation of the users can fire before the initial watched query
254+
// has emitted a result.
255+
// However the watched query should always emit the latest result set.
256+
XCTAssertLessThanOrEqual(finalResults.count, 3)
257+
XCTAssertEqual(finalResults, ["User 1", "User 2"])
250258
}
251259

252260
func testWatchError() async throws {

0 commit comments

Comments
 (0)