diff --git a/.gitignore b/.gitignore index 1a87a994..de000360 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,6 @@ coverage datastore.nims nimcache TODO +nim.cfg +nimble.develop +nimble.paths diff --git a/config.nims b/config.nims index 052a576c..ed3a5315 100644 --- a/config.nims +++ b/config.nims @@ -10,3 +10,7 @@ when (NimMajor, NimMinor) == (1, 2): when (NimMajor, NimMinor) > (1, 2): switch("hint", "XCannotRaiseY:off") +# begin Nimble config (version 2) +when withDir(thisDir(), system.fileExists("nimble.paths")): + include "nimble.paths" +# end Nimble config diff --git a/datastore.nim b/datastore.nim index 6d43a209..5a971a5c 100644 --- a/datastore.nim +++ b/datastore.nim @@ -1,7 +1,7 @@ import ./datastore/datastore import ./datastore/fsds -import ./datastore/sql +# import ./datastore/sql import ./datastore/mountedds import ./datastore/tieredds -export datastore, fsds, mountedds, tieredds, sql +export datastore, fsds, mountedds, tieredds diff --git a/datastore.nimble b/datastore.nimble index 6252c6d1..3b5b28ef 100644 --- a/datastore.nimble +++ b/datastore.nimble @@ -6,14 +6,18 @@ author = "Status Research & Development GmbH" description = "Simple, unified API for multiple data stores" license = "Apache License 2.0 or MIT" -requires "nim >= 1.2.0", +requires "nim >= 1.6.14", "asynctest >= 0.3.1 & < 0.4.0", - "chronos", + "chronos#0277b65be2c7a365ac13df002fba6e172be55537", "questionable >= 0.10.3 & < 0.11.0", "sqlite3_abi", "stew", "unittest2", - "upraises >= 0.1.0 & < 0.2.0" + "pretty", + "threading", + "taskpools", + "upraises >= 0.1.0 & < 0.2.0", + "chronicles" task coverage, "generates code coverage report": var (output, exitCode) = gorgeEx("which lcov") diff --git a/datastore/backend.nim b/datastore/backend.nim new file mode 100644 index 00000000..5790b121 --- /dev/null +++ b/datastore/backend.nim @@ -0,0 +1,67 @@ +import std/algorithm +import std/options + +import pkg/questionable/results + +import ./threads/databuffer +import ./types + +export databuffer, types, SortOrder + +## Types for datastore backends. +## +## These should be syncrhonous and work with both GC types +## and DataBuffer's. This makes it easier to make them threadsafe. +## + +type + DbQueryResponse*[K, V] = tuple[key: Option[K], data: V] + + DbQuery*[K] = object + key*: K # Key to be queried + value*: bool # Flag to indicate if data should be returned + limit*: int # Max items to return - not available in all backends + offset*: int # Offset from which to start querying - not available in all backends + sort*: SortOrder # Sort order - not available in all backends + + KeyId* = object + ## serialized Key ID, equivalent to `key.id()` + data*: DataBuffer + + ## Accepted backend key and value types + DbKey* = string | KeyId + DbVal* = seq[byte] | DataBuffer + + DbBatchEntry*[K, V] = tuple[key: K, data: V] + + DbQueryHandle*[K, V, T] = object + query*: DbQuery[K] + cancel*: bool + closed*: bool + env*: T + +proc dbQuery*[K]( + key: K, + value = false, + sort = SortOrder.Ascending, + offset = 0, + limit = -1 +): DbQuery[K] = + DbQuery[K](key: key, value: value, sort: sort, offset: offset, limit: limit) + +proc `$`*(id: KeyId): string = $(id.data) + +proc toKey*(tp: typedesc[KeyId], id: cstring): KeyId = KeyId.new(id) +proc toKey*(tp: typedesc[string], id: cstring): string = $(id) + +template toVal*(tp: typedesc[DataBuffer], id: openArray[byte]): DataBuffer = DataBuffer.new(id) +template toVal*(tp: typedesc[seq[byte]], id: openArray[byte]): seq[byte] = @(id) + +proc new*(tp: typedesc[KeyId], id: cstring): KeyId = + KeyId(data: DataBuffer.new(id.toOpenArray(0, id.len()-1))) + +proc new*(tp: typedesc[KeyId], id: string): KeyId = + KeyId(data: DataBuffer.new(id)) + +template toOpenArray*(x: DbKey): openArray[char] = + x.data.toOpenArray(char) diff --git a/datastore/datastore.nim b/datastore/datastore.nim index d7fa45fd..c0f1f0d1 100644 --- a/datastore/datastore.nim +++ b/datastore/datastore.nim @@ -13,32 +13,37 @@ push: {.upraises: [].} type BatchEntry* = tuple[key: Key, data: seq[byte]] -method has*(self: Datastore, key: Key): Future[?!bool] {.base, locks: "unknown".} = +method has*(self: Datastore, key: Key): Future[?!bool] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method delete*(self: Datastore, key: Key): Future[?!void] {.base, locks: "unknown".} = +method delete*(self: Datastore, key: Key): Future[?!void] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method delete*(self: Datastore, keys: seq[Key]): Future[?!void] {.base, locks: "unknown".} = +method delete*(self: Datastore, keys: seq[Key]): Future[?!void] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method get*(self: Datastore, key: Key): Future[?!seq[byte]] {.base, locks: "unknown".} = +method get*(self: Datastore, key: Key): Future[?!seq[byte]] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method put*(self: Datastore, key: Key, data: seq[byte]): Future[?!void] {.base, locks: "unknown".} = +method put*(self: Datastore, key: Key, data: seq[byte]): Future[?!void] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, locks: "unknown".} = +method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method close*(self: Datastore): Future[?!void] {.base, async, locks: "unknown".} = +method close*(self: Datastore): Future[?!void] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method query*( - self: Datastore, - query: Query): Future[?!QueryIter] {.base, gcsafe.} = +method query*(self: Datastore, + query: Query + ): Future[?!QueryIter] {.base, gcsafe, raises: [].} = raiseAssert("Not implemented!") -proc contains*(self: Datastore, key: Key): Future[bool] {.async.} = +method queryIter*(self: Datastore, + query: Query + ): ?!(iterator(): ?!QueryResponse) {.base, gcsafe, raises: [].} = + raiseAssert("Not implemented!") + +proc contains*(self: Datastore, key: Key): Future[bool] {.async, raises: [].} = return (await self.has(key)) |? false diff --git a/datastore/fsds.nim b/datastore/fsds.nim index 163a52ab..65f5cdd4 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -155,6 +155,9 @@ method put*( return success() proc dirWalker(path: string): iterator: string {.gcsafe.} = + var localPath {.threadvar.}: string + + localPath = path return iterator(): string = try: for p in path.walkDirRec(yieldFilter = {pcFile}, relative = true): @@ -188,10 +191,23 @@ method query*( var iter = QueryIter.new() + var lock = newAsyncLock() # serialize querying under threads proc next(): Future[?!QueryResponse] {.async.} = + defer: + if lock.locked: + lock.release() + + if lock.locked: + return failure (ref DatastoreError)(msg: "Should always await query features") + let path = walker() + if iter.finished: + return failure "iterator is finished" + + await lock.acquire() + if finished(walker): iter.finished = true return success (Key.none, EmptyBytes) diff --git a/datastore/query.nim b/datastore/query.nim index 1c51a6e2..256ad97e 100644 --- a/datastore/query.nim +++ b/datastore/query.nim @@ -1,27 +1,24 @@ import pkg/upraises +import std/algorithm import pkg/chronos import pkg/questionable import pkg/questionable/results import ./key import ./types +import ./backend + +export types +export options, SortOrder type - SortOrder* {.pure.} = enum - Assending, - Descending - Query* = object - key*: Key # Key to be queried - value*: bool # Flag to indicate if data should be returned - limit*: int # Max items to return - not available in all backends - offset*: int # Offset from which to start querying - not available in all backends - sort*: SortOrder # Sort order - not available in all backends + ## Front end types + Query* = DbQuery[Key] - QueryResponse* = tuple[key: ?Key, data: seq[byte]] - QueryEndedError* = object of DatastoreError + QueryResponse* = DbQueryResponse[Key, seq[byte]] - GetNext* = proc(): Future[?!QueryResponse] {.upraises: [], gcsafe, closure.} + GetNext* = proc(): Future[?!QueryResponse] {.upraises: [], gcsafe.} IterDispose* = proc(): Future[?!void] {.upraises: [], gcsafe.} QueryIter* = ref object finished*: bool @@ -38,17 +35,13 @@ proc defaultDispose(): Future[?!void] {.upraises: [], gcsafe, async.} = proc new*(T: type QueryIter, dispose = defaultDispose): T = QueryIter(dispose: dispose) -proc init*( - T: type Query, - key: Key, - value = true, - sort = SortOrder.Assending, - offset = 0, - limit = -1): T = - - T( - key: key, - value: value, - sort: sort, - offset: offset, - limit: limit) +proc init*(T: type Query, + key: Key, + value = true, + sort = SortOrder.Ascending, + offset = 0, + limit = -1): Query = + dbQuery[Key](key, value, sort, offset, limit) + +proc toKey*(key: KeyId): Key {.inline, raises: [].} = + Key.init($key.data).expect("expected valid key here for but got `" & $key.data & "`") diff --git a/datastore/sql.nim b/datastore/sql.nim index 9c30c9e5..28b1f3d0 100644 --- a/datastore/sql.nim +++ b/datastore/sql.nim @@ -1,3 +1,102 @@ +import std/times +import std/options + +import pkg/chronos +import pkg/questionable +import pkg/questionable/results +import pkg/sqlite3_abi +from pkg/stew/results as stewResults import isErr +import pkg/upraises + +import std/sequtils +import ../datastore +import ./backend import ./sql/sqliteds -export sqliteds +export datastore, sqliteds + +push: {.upraises: [].} + +type + SQLiteDatastore* = ref object of Datastore + db: SQLiteBackend[KeyId, DataBuffer] + +proc path*(self: SQLiteDatastore): string = + self.db.path() + +proc readOnly*(self: SQLiteDatastore): bool = + self.db.readOnly() + +method has*(self: SQLiteDatastore, + key: Key): Future[?!bool] {.async.} = + return self.db.has(KeyId.new key.id()) + +method delete*(self: SQLiteDatastore, + key: Key): Future[?!void] {.async.} = + return self.db.delete(KeyId.new key.id()) + +method delete*(self: SQLiteDatastore, + keys: seq[Key]): Future[?!void] {.async.} = + let dkeys = keys.mapIt(KeyId.new it.id()) + return self.db.delete(dkeys) + +method get*(self: SQLiteDatastore, + key: Key): Future[?!seq[byte]] {.async.} = + self.db.get(KeyId.new key.id()).map() do(d: DataBuffer) -> seq[byte]: + d.toSeq() + +method put*(self: SQLiteDatastore, + key: Key, + data: seq[byte]): Future[?!void] {.async.} = + self.db.put(KeyId.new key.id(), DataBuffer.new data) + +method put*(self: SQLiteDatastore, + batch: seq[BatchEntry]): Future[?!void] {.async.} = + var dbatch: seq[tuple[key: KeyId, data: DataBuffer]] + for entry in batch: + dbatch.add((KeyId.new entry.key.id(), DataBuffer.new entry.data)) + self.db.put(dbatch) + +method close*(self: SQLiteDatastore): Future[?!void] {.async.} = + self.db.close() + +method queryIter*( + self: SQLiteDatastore, + query: Query +): ?!(iterator(): ?!QueryResponse) = + + let dbquery = dbQuery( + key= KeyId.new query.key.id(), + value= query.value, + limit= query.limit, + offset= query.offset, + sort= query.sort, + ) + var qhandle = ? self.db.query(dbquery) + + let iter = iterator(): ?!QueryResponse = + for resp in qhandle.iter(): + without qres =? resp, err: + yield QueryResponse.failure err + let k = qres.key.map() do(k: KeyId) -> Key: + Key.init($k).expect("valid key") + let v: seq[byte] = qres.data.toSeq() + yield success (k, v) + + success iter + +proc new*( + T: type SQLiteDatastore, + path: string, + readOnly = false): ?!SQLiteDatastore = + + success SQLiteDatastore( + db: ? newSQLiteBackend[KeyId, DataBuffer](path, readOnly)) + +proc new*( + T: type SQLiteDatastore, + db: SQLiteBackend[KeyId, DataBuffer]): ?!T = + + success T( + db: db, + readOnly: db.readOnly) diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index aa632747..feb0785b 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -1,55 +1,53 @@ import std/times import std/options -import pkg/chronos import pkg/questionable import pkg/questionable/results import pkg/sqlite3_abi from pkg/stew/results as stewResults import isErr import pkg/upraises -import ../datastore +import ../backend import ./sqlitedsdb -export datastore, sqlitedsdb +export backend, sqlitedsdb push: {.upraises: [].} type - SQLiteDatastore* = ref object of Datastore - readOnly: bool - db: SQLiteDsDb + SQLiteBackend*[K: DbKey, V: DbVal] = object + db: SQLiteDsDb[K, V] -proc path*(self: SQLiteDatastore): string = - self.db.dbPath +proc path*[K,V](self: SQLiteBackend[K,V]): string = + $self.db.dbPath -proc `readOnly=`*(self: SQLiteDatastore): bool - {.error: "readOnly should not be assigned".} +proc readOnly*[K,V](self: SQLiteBackend[K,V]): bool = self.db.readOnly proc timestamp*(t = epochTime()): int64 = (t * 1_000_000).int64 -method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} = +proc has*[K,V](self: SQLiteBackend[K,V], key: K): ?!bool = var exists = false + key = key proc onData(s: RawStmtPtr) = exists = sqlite3_column_int64(s, ContainsStmtExistsCol.cint).bool - if err =? self.db.containsStmt.query((key.id), onData).errorOption: + if err =? self.db.containsStmt.query((key), onData).errorOption: return failure err return success exists -method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async.} = - return self.db.deleteStmt.exec((key.id)) +proc delete*[K,V](self: SQLiteBackend[K,V], key: K): ?!void = + return self.db.deleteStmt.exec((key)) -method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.} = +proc delete*[K,V](self: SQLiteBackend[K,V], keys: openArray[K]): ?!void = if err =? self.db.beginStmt.exec().errorOption: return failure(err) for key in keys: - if err =? self.db.deleteStmt.exec((key.id)).errorOption: + if err =? self.db.deleteStmt.exec((key)).errorOption: if err =? self.db.rollbackStmt.exec().errorOption: return failure err.msg @@ -60,35 +58,36 @@ method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.} return success() -method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} = +proc get*[K,V](self: SQLiteBackend[K,V], key: K): ?!V = # see comment in ./filesystem_datastore re: finer control of memory - # allocation in `method get`, could apply here as well if bytes were read + # allocation in `proc get`, could apply here as well if bytes were read # incrementally with `sqlite3_blob_read` - var - bytes: seq[byte] + var bytes: V proc onData(s: RawStmtPtr) = - bytes = self.db.getDataCol() + bytes = dataCol[V](self.db.getDataCol) - if err =? self.db.getStmt.query((key.id), onData).errorOption: + if err =? self.db.getStmt.query((key), onData).errorOption: return failure(err) if bytes.len <= 0: return failure( - newException(DatastoreKeyNotFound, "Key doesn't exist")) + newException(DatastoreKeyNotFound, "key doesn't exist")) return success bytes -method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} = - return self.db.putStmt.exec((key.id, data, timestamp())) +proc put*[K,V](self: SQLiteBackend[K,V], key: K, data: V): ?!void = + return self.db.putStmt.exec((key, data, timestamp())) -method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} = +proc put*[K,V](self: SQLiteBackend[K,V], batch: openArray[DbBatchEntry[K,V]]): ?!void = if err =? self.db.beginStmt.exec().errorOption: return failure err for entry in batch: - if err =? self.db.putStmt.exec((entry.key.id, entry.data, timestamp())).errorOption: + let putStmt = self.db.putStmt + let item = (entry.key, entry.data, timestamp()) + if err =? putStmt.exec(item).errorOption: if err =? self.db.rollbackStmt.exec().errorOption: return failure err @@ -99,25 +98,26 @@ method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.asy return success() -method close*(self: SQLiteDatastore): Future[?!void] {.async.} = +proc close*[K,V](self: SQLiteBackend[K,V]): ?!void = self.db.close() return success() -method query*( - self: SQLiteDatastore, - query: Query): Future[?!QueryIter] {.async.} = +proc query*[K,V]( + self: SQLiteBackend[K,V], + query: DbQuery[K] +): Result[DbQueryHandle[K,V,RawStmtPtr], ref CatchableError] = var - iter = QueryIter() queryStr = if query.value: QueryStmtDataIdStr else: QueryStmtIdStr - if query.sort == SortOrder.Descending: + case query.sort: + of Descending: queryStr &= QueryStmtOrderDescending - else: + of Ascending: queryStr &= QueryStmtOrderAscending if query.limit != 0: @@ -127,14 +127,14 @@ method query*( queryStr &= QueryStmtOffset let - queryStmt = QueryStmt.prepare( - self.db.env, queryStr).expect("should not fail") + queryStmt = ? QueryStmt.prepare(self.db.env, queryStr) s = RawStmtPtr(queryStmt) + queryKey = $query.key & "*" var v = sqlite3_bind_text( - s, 1.cint, (query.key.id & "*").cstring, -1.cint, SQLITE_TRANSIENT_GCSAFE) + s, 1.cint, queryKey.cstring, queryKey.len().cint, SQLITE_TRANSIENT_GCSAFE) if not (v == SQLITE_OK): return failure newException(DatastoreError, $sqlite3_errstr(v)) @@ -151,25 +151,28 @@ method query*( if not (v == SQLITE_OK): return failure newException(DatastoreError, $sqlite3_errstr(v)) - proc next(): Future[?!QueryResponse] {.async.} = - if iter.finished: - return failure(newException(QueryEndedError, "Calling next on a finished query!")) + success DbQueryHandle[K,V,RawStmtPtr](query: query, env: s) + +proc close*[K,V](handle: var DbQueryHandle[K,V,RawStmtPtr]) = + if not handle.closed: + handle.closed = true + discard sqlite3_reset(handle.env) + discard sqlite3_clear_bindings(handle.env) + handle.env.dispose() + +iterator iter*[K, V](handle: var DbQueryHandle[K, V, RawStmtPtr]): ?!DbQueryResponse[K, V] = + while not handle.cancel: - let - v = sqlite3_step(s) + let v = sqlite3_step(handle.env) case v of SQLITE_ROW: let - key = Key.init( - $sqlite3_column_text_not_null(s, QueryStmtIdCol)) - .expect("should not fail") + key = K.toKey(sqlite3_column_text_not_null(handle.env, QueryStmtIdCol)) blob: ?pointer = - if query.value: - sqlite3_column_blob(s, QueryStmtDataCol).some - else: - pointer.none + if handle.query.value: sqlite3_column_blob(handle.env, QueryStmtDataCol).some + else: pointer.none # detect out-of-memory error # see the conversion table and final paragraph of: @@ -180,58 +183,51 @@ method query*( # error it is necessary to check that the result is a null pointer and # that the result code is an error code if blob.isSome and blob.get().isNil: - let - v = sqlite3_errcode(sqlite3_db_handle(s)) + let v = sqlite3_errcode(sqlite3_db_handle(handle.env)) if not (v in [SQLITE_OK, SQLITE_ROW, SQLITE_DONE]): - iter.finished = true - return failure newException(DatastoreError, $sqlite3_errstr(v)) + handle.cancel = true + yield DbQueryResponse[K,V].failure newException(DatastoreError, $sqlite3_errstr(v)) let - dataLen = sqlite3_column_bytes(s, QueryStmtDataCol) - data = if blob.isSome: - @( - toOpenArray(cast[ptr UncheckedArray[byte]](blob.get), - 0, - dataLen - 1)) + dataLen = sqlite3_column_bytes(handle.env, QueryStmtDataCol) + data = + if blob.isSome: + let arr = cast[ptr UncheckedArray[byte]](blob) + V.toVal(arr.toOpenArray(0, dataLen-1)) else: - @[] + var empty: array[0, byte] + V.toVal(empty.toOpenArray(0,-1)) - return success (key.some, data) + yield success (key.some, data) of SQLITE_DONE: - iter.finished = true - return success (Key.none, EmptyBytes) + handle.close() + break else: - iter.finished = true - return failure newException(DatastoreError, $sqlite3_errstr(v)) + handle.cancel = true + yield DbQueryResponse[K,V].failure newException(DatastoreError, $sqlite3_errstr(v)) + break + + handle.close() + - iter.dispose = proc(): Future[?!void] {.async.} = - discard sqlite3_reset(s) - discard sqlite3_clear_bindings(s) - s.dispose - return success() +proc contains*[K,V](self: SQLiteBackend[K,V], key: K): bool = + return self.has(key).get() - iter.next = next - return success iter -proc new*( - T: type SQLiteDatastore, - path: string, - readOnly = false): ?!T = +proc newSQLiteBackend*[K,V]( + path: string, + readOnly = false): ?!SQLiteBackend[K,V] = let flags = if readOnly: SQLITE_OPEN_READONLY else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE - success T( - db: ? SQLiteDsDb.open(path, flags), - readOnly: readOnly) + success SQLiteBackend[K,V](db: ? SQLiteDsDb[K,V].open(path, flags)) + -proc new*( - T: type SQLiteDatastore, - db: SQLiteDsDb): ?!T = +proc newSQLiteBackend*[K,V]( + db: SQLiteDsDb[K,V]): ?!SQLiteBackend[K,V] = - success T( - db: db, - readOnly: db.readOnly) + success SQLiteBackend[K,V](db: db) diff --git a/datastore/sql/sqlitedsdb.nim b/datastore/sql/sqlitedsdb.nim index 503dea45..ae381e1e 100644 --- a/datastore/sql/sqlitedsdb.nim +++ b/datastore/sql/sqlitedsdb.nim @@ -4,36 +4,37 @@ import pkg/questionable import pkg/questionable/results import pkg/upraises +import ../backend import ./sqliteutils export sqliteutils type BoundIdCol* = proc (): string {.closure, gcsafe, upraises: [].} - BoundDataCol* = proc (): seq[byte] {.closure, gcsafe, upraises: [].} + BoundDataCol* = proc (): DataBuffer {.closure, gcsafe, upraises: [].} BoundTimestampCol* = proc (): int64 {.closure, gcsafe, upraises: [].} # feels odd to use `void` for prepared statements corresponding to SELECT # queries but it fits with the rest of the SQLite wrapper adapted from # status-im/nwaku, at least in its current form in ./sqlite - ContainsStmt* = SQLiteStmt[(string), void] - DeleteStmt* = SQLiteStmt[(string), void] - GetStmt* = SQLiteStmt[(string), void] - PutStmt* = SQLiteStmt[(string, seq[byte], int64), void] + ContainsStmt*[K] = SQLiteStmt[(K), void] + DeleteStmt*[K] = SQLiteStmt[(K), void] + GetStmt*[K] = SQLiteStmt[(K), void] + PutStmt*[K, V] = SQLiteStmt[(K, V, int64), void] QueryStmt* = SQLiteStmt[(string), void] BeginStmt* = NoParamsStmt EndStmt* = NoParamsStmt RollbackStmt* = NoParamsStmt - SQLiteDsDb* = object + SQLiteDsDb*[K: DbKey, V: DbVal] = object readOnly*: bool - dbPath*: string - containsStmt*: ContainsStmt - deleteStmt*: DeleteStmt + dbPath*: DataBuffer + containsStmt*: ContainsStmt[K] + deleteStmt*: DeleteStmt[K] env*: SQLite - getDataCol*: BoundDataCol - getStmt*: GetStmt - putStmt*: PutStmt + getDataCol*: (RawStmtPtr, int) + getStmt*: GetStmt[K] + putStmt*: PutStmt[K,V] beginStmt*: BeginStmt endStmt*: EndStmt rollbackStmt*: RollbackStmt @@ -156,37 +157,38 @@ proc idCol*( return proc (): string = $sqlite3_column_text_not_null(s, index.cint) -proc dataCol*( - s: RawStmtPtr, - index: int): BoundDataCol = +proc dataCol*[V: DbVal](data: (RawStmtPtr, int)): V = + + let s = data[0] + let index = data[1] checkColMetadata(s, index, DataColName) - return proc (): seq[byte] = + let + i = index.cint + blob = sqlite3_column_blob(s, i) + + # detect out-of-memory error + # see the conversion table and final paragraph of: + # https://www.sqlite.org/c3ref/column_blob.html + # see also https://www.sqlite.org/rescode.html + + # the "data" column can be NULL so in order to detect an out-of-memory error + # it is necessary to check that the result is a null pointer and that the + # result code is an error code + if blob.isNil: let - i = index.cint - blob = sqlite3_column_blob(s, i) - - # detect out-of-memory error - # see the conversion table and final paragraph of: - # https://www.sqlite.org/c3ref/column_blob.html - # see also https://www.sqlite.org/rescode.html + v = sqlite3_errcode(sqlite3_db_handle(s)) - # the "data" column can be NULL so in order to detect an out-of-memory error - # it is necessary to check that the result is a null pointer and that the - # result code is an error code - if blob.isNil: - let - v = sqlite3_errcode(sqlite3_db_handle(s)) + if not (v in [SQLITE_OK, SQLITE_ROW, SQLITE_DONE]): + raise (ref Defect)(msg: $sqlite3_errstr(v)) - if not (v in [SQLITE_OK, SQLITE_ROW, SQLITE_DONE]): - raise (ref Defect)(msg: $sqlite3_errstr(v)) - - let - dataLen = sqlite3_column_bytes(s, i) - dataBytes = cast[ptr UncheckedArray[byte]](blob) + let + dataLen = sqlite3_column_bytes(s, i) + dataBytes = cast[ptr UncheckedArray[byte]](blob) - @(toOpenArray(dataBytes, 0, dataLen - 1)) + # copy data out, since sqlite will free it + V.toVal(toOpenArray(dataBytes, 0, dataLen - 1)) proc timestampCol*( s: RawStmtPtr, @@ -211,7 +213,7 @@ proc getDBFilePath*(path: string): ?!string = except CatchableError as exc: return failure(exc.msg) -proc close*(self: SQLiteDsDb) = +proc close*[K, V](self: SQLiteDsDb[K, V]) = self.containsStmt.dispose self.getStmt.dispose self.beginStmt.dispose @@ -226,10 +228,10 @@ proc close*(self: SQLiteDsDb) = self.env.dispose -proc open*( - T: type SQLiteDsDb, +proc open*[K,V]( + T: type SQLiteDsDb[K,V], path = Memory, - flags = SQLITE_OPEN_READONLY): ?!SQLiteDsDb = + flags = SQLITE_OPEN_READONLY): ?!SQLiteDsDb[K, V] = # make it optional to enable WAL with it enabled being the default? @@ -262,10 +264,10 @@ proc open*( checkExec(pragmaStmt) var - containsStmt: ContainsStmt - deleteStmt: DeleteStmt - getStmt: GetStmt - putStmt: PutStmt + containsStmt: ContainsStmt[K] + deleteStmt: DeleteStmt[K] + getStmt: GetStmt[K] + putStmt: PutStmt[K,V] beginStmt: BeginStmt endStmt: EndStmt rollbackStmt: RollbackStmt @@ -273,10 +275,10 @@ proc open*( if not readOnly: checkExec(env.val, CreateStmtStr) - deleteStmt = ? DeleteStmt.prepare( + deleteStmt = ? DeleteStmt[K].prepare( env.val, DeleteStmtStr, SQLITE_PREPARE_PERSISTENT) - putStmt = ? PutStmt.prepare( + putStmt = ? PutStmt[K,V].prepare( env.val, PutStmtStr, SQLITE_PREPARE_PERSISTENT) beginStmt = ? BeginStmt.prepare( @@ -288,10 +290,10 @@ proc open*( rollbackStmt = ? RollbackStmt.prepare( env.val, RollbackTransactionStr, SQLITE_PREPARE_PERSISTENT) - containsStmt = ? ContainsStmt.prepare( + containsStmt = ? ContainsStmt[K].prepare( env.val, ContainsStmtStr, SQLITE_PREPARE_PERSISTENT) - getStmt = ? GetStmt.prepare( + getStmt = ? GetStmt[K].prepare( env.val, GetStmtStr, SQLITE_PREPARE_PERSISTENT) # if a readOnly/existing database does not satisfy the expected schema @@ -299,11 +301,11 @@ proc open*( # "SQL logic error" let - getDataCol = dataCol(RawStmtPtr(getStmt), GetStmtDataCol) + getDataCol = (RawStmtPtr(getStmt), GetStmtDataCol) - success T( + success SQLiteDsDb[K,V]( readOnly: readOnly, - dbPath: path, + dbPath: DataBuffer.new path, containsStmt: containsStmt, deleteStmt: deleteStmt, env: env.release, diff --git a/datastore/sql/sqliteutils.nim b/datastore/sql/sqliteutils.nim index bffb6e8c..884527b7 100644 --- a/datastore/sql/sqliteutils.nim +++ b/datastore/sql/sqliteutils.nim @@ -3,6 +3,8 @@ import pkg/questionable/results import pkg/sqlite3_abi import pkg/upraises +import ../backend + export sqlite3_abi # Adapted from: @@ -20,7 +22,7 @@ type AutoDisposed*[T: ptr|ref] = object val*: T - DataProc* = proc(s: RawStmtPtr) {.closure, gcsafe.} + DataProc* = proc(s: RawStmtPtr) {.gcsafe.} NoParams* = tuple # empty tuple @@ -44,13 +46,14 @@ proc bindParam( n: int, val: auto): cint = - when val is openArray[byte]|seq[byte]: + when val is openArray[byte]|seq[byte]|DataBuffer: if val.len > 0: # `SQLITE_TRANSIENT` "indicate[s] that the object is to be copied prior # to the return from sqlite3_bind_*(). The object and pointer to it # must remain valid until then. SQLite will then manage the lifetime of # its private copy." - sqlite3_bind_blob(s, n.cint, unsafeAddr val[0], val.len.cint, + var val = val + sqlite3_bind_blob(s, n.cint, addr val[0], val.len.cint, SQLITE_TRANSIENT) else: sqlite3_bind_null(s, n.cint) @@ -66,7 +69,10 @@ proc bindParam( # to the return from sqlite3_bind_*(). The object and pointer to it must # remain valid until then. SQLite will then manage the lifetime of its # private copy." - sqlite3_bind_text(s, n.cint, val.cstring, -1.cint, SQLITE_TRANSIENT) + sqlite3_bind_text(s, n.cint, val.cstring, val.len().cint, SQLITE_TRANSIENT) + elif val is KeyId: + # same as previous + sqlite3_bind_text(s, n.cint, cast[cstring](baseAddr val.data), val.data.len().cint, SQLITE_TRANSIENT) else: {.fatal: "Please add support for the '" & $typeof(val) & "' type".} diff --git a/datastore/threads/asyncsemaphore.nim b/datastore/threads/asyncsemaphore.nim new file mode 100644 index 00000000..b561882c --- /dev/null +++ b/datastore/threads/asyncsemaphore.nim @@ -0,0 +1,101 @@ +# Nim-Libp2p +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +{.push raises: [].} + +import sequtils +import chronos, chronicles + +# TODO: this should probably go in chronos + +logScope: + topics = "datastore semaphore" + +type + AsyncSemaphore* = ref object of RootObj + size*: int + count: int + exit: bool + queue: seq[Future[void]] + +func new*(_: type AsyncSemaphore, size: int): AsyncSemaphore = + AsyncSemaphore(size: size, count: size) + +proc `count`*(s: AsyncSemaphore): int = s.count + +proc closeAll*(s: AsyncSemaphore) {.async.} = + s.exit = true + await allFutures(s.queue) + +proc tryAcquire*(s: AsyncSemaphore): bool = + ## Attempts to acquire a resource, if successful + ## returns true, otherwise false + ## + + if s.count > 0 and s.queue.len == 0 and not s.exit: + s.count.dec + trace "Acquired slot", available = s.count, queue = s.queue.len + return true + +proc acquire*(s: AsyncSemaphore): Future[void] = + ## Acquire a resource and decrement the resource + ## counter. If no more resources are available, + ## the returned future will not complete until + ## the resource count goes above 0. + ## + + let fut = newFuture[void]("AsyncSemaphore.acquire") + if s.tryAcquire(): + fut.complete() + return fut + + proc cancellation(udata: pointer) {.gcsafe.} = + fut.cancelCallback = nil + if not fut.finished: + s.queue.keepItIf( it != fut ) + + fut.cancelCallback = cancellation + + s.queue.add(fut) + + trace "Queued slot", available = s.count, queue = s.queue.len + return fut + +proc forceAcquire*(s: AsyncSemaphore) = + ## ForceAcquire will always succeed, + ## creating a temporary slot if required. + ## This temporary slot will stay usable until + ## there is less `acquire`s than `release`s + s.count.dec + +proc release*(s: AsyncSemaphore) = + ## Release a resource from the semaphore, + ## by picking the first future from the queue + ## and completing it and incrementing the + ## internal resource count + ## + + doAssert(s.count <= s.size) + + if s.count < s.size: + trace "Releasing slot", available = s.count, + queue = s.queue.len + + s.count.inc + while s.queue.len > 0: + var fut = s.queue[0] + s.queue.delete(0) + if not fut.finished(): + s.count.dec + fut.complete() + break + + trace "Released slot", available = s.count, + queue = s.queue.len + return diff --git a/datastore/threads/databuffer.nim b/datastore/threads/databuffer.nim new file mode 100644 index 00000000..5ea7b4be --- /dev/null +++ b/datastore/threads/databuffer.nim @@ -0,0 +1,138 @@ +import threading/smartptrs +import std/hashes +import pkg/stew/ptrops + +export hashes + +type + DataBufferOpt* = enum + dbNullTerminate + + DataBufferHolder* = object + buf: ptr UncheckedArray[byte] + size: int + cap: int + + DataBuffer* = SharedPtr[DataBufferHolder] ##\ + ## A fixed length data buffer using a SharedPtr. + ## It is thread safe even with `refc` since + ## it doesn't use string or seq types internally. + ## + +proc `=destroy`*(x: var DataBufferHolder) = + ## copy pointer implementation + ## + + if x.buf != nil: + # echo "buffer: FREE: ", repr x.buf.pointer + deallocShared(x.buf) + +proc len*(a: DataBuffer): int = + if a.isNil: 0 else: a[].size +proc capacity*(a: DataBuffer): int = + if a.isNil: 0 else: a[].cap + +proc isNil*(a: DataBuffer): bool = smartptrs.isNil(a) + +proc hash*(a: DataBuffer): Hash = + a[].buf.toOpenArray(0, a[].size-1).hash() + +proc `[]`*(db: DataBuffer, idx: int): var byte = + if idx >= db.len(): + raise newException(IndexDefect, "index out of bounds") + db[].buf[idx] + +template `==`*[T: char | byte](a: DataBuffer, b: openArray[T]): bool = + if a.isNil: false + elif a[].size != b.len: false + else: a.hash() == b.hash() + +proc new(tp: type DataBuffer, capacity: int = 0): DataBuffer = + ## allocate new buffer with given capacity + ## + + newSharedPtr(DataBufferHolder( + buf: cast[typeof(result[].buf)](allocShared0(capacity)), + size: 0, + cap: capacity, + )) + +proc new*[T: byte | char](tp: type DataBuffer, data: openArray[T], opts: set[DataBufferOpt] = {}): DataBuffer = + ## allocate new buffer and copies indata from openArray + ## + let dataCap = + if dbNullTerminate in opts: data.len() + 1 + else: data.len() + result = DataBuffer.new(dataCap) + if data.len() > 0: + copyMem(result[].buf, baseAddr data, data.len()) + result[].size = data.len() + +# proc new*(tp: type DataBuffer, data: pointer, first, last: int): DataBuffer = +# DataBuffer.new(toOpenArray(cast[ptr UncheckedArray[byte]](data), first, last)) + +proc baseAddr*(db: DataBuffer): pointer = + db[].buf + +proc clear*(db: DataBuffer) = + zeroMem(db[].buf, db[].cap) + db[].size = 0 + +proc setData*[T: byte | char](db: DataBuffer, data: openArray[T]) = + ## allocate new buffer and copies indata from openArray + ## + if data.len() > db[].cap: + raise newException(IndexDefect, "data too large for buffer") + db.clear() # this is expensive, but we can optimize later + copyMem(db[].buf, baseAddr data, data.len()) + db[].size = data.len() + +proc toSeq*(self: DataBuffer): seq[byte] = + ## convert buffer to a seq type using copy and either a byte or char + ## + + result = newSeq[byte](self.len) + if self.len() > 0: + copyMem(addr result[0], addr self[].buf[0], self.len) + +proc `@`*(self: DataBuffer): seq[byte] = + ## Convert a buffer to a seq type using copy and + ## either a byte or char + ## + + self.toSeq() + +proc toString*(data: DataBuffer): string = + ## convert buffer to string type using copy + ## + + if data.isNil: return "" + result = newString(data.len()) + if data.len() > 0: + copyMem(addr result[0], addr data[].buf[0], data.len) + +proc `$`*(data: DataBuffer): string = + ## convert buffer to string type using copy + ## + + data.toString() + +proc `==`*(a, b: DataBuffer): bool = + if a.isNil and b.isNil: result = true + elif a.isNil or b.isNil: result = false + elif a[].size != b[].size: result = false + elif a[].buf == b[].buf: result = true + else: result = a.hash() == b.hash() + +converter toBuffer*(err: ref CatchableError): DataBuffer = + ## convert exception to an object with StringBuffer + ## + + return DataBuffer.new(err.msg) + +template toOpenArray*[T: byte | char](data: DataBuffer, t: typedesc[T]): openArray[T] = + ## get openArray from DataBuffer as char + ## + ## this is explicit since sqlite treats string differently from openArray[byte] + let bf = cast[ptr UncheckedArray[T]](data[].buf) + bf.toOpenArray(0, data[].size-1) diff --git a/datastore/threads/semaphore.nim b/datastore/threads/semaphore.nim new file mode 100644 index 00000000..5087e3f9 --- /dev/null +++ b/datastore/threads/semaphore.nim @@ -0,0 +1,58 @@ +import std/atomics +import std/locks + +type + Semaphore* = object + count: int + size: int + lock {.align: 64.}: Lock + cond: Cond + +func `=`*(dst: var Semaphore, src: Semaphore) {.error: "A semaphore cannot be copied".} +func `=sink`*(dst: var Semaphore, src: Semaphore) {.error: "An semaphore cannot be moved".} + +proc init*(_: type Semaphore, count: uint): Semaphore = + var + lock: Lock + cond: Cond + + lock.initLock() + cond.initCond() + + Semaphore(count: count.int, size: count.int, lock: lock, cond: cond) + +proc `=destroy`*(self: var Semaphore) = + self.lock.deinitLock() + self.cond.deinitCond() + +proc count*(self: var Semaphore): int = + self.count + +proc size*(self: var Semaphore): int = + self.size + +proc acquire*(self: var Semaphore) {.inline.} = + self.lock.acquire() + while self.count <= 0: + self.cond.wait(self.lock) + + self.count -= 1 + self.lock.release() + +proc release*(self: var Semaphore) {.inline.} = + self.lock.acquire() + if self.count <= 0: + self.count += 1 + self.cond.signal() + + doAssert not (self.count > self.size), + "Semaphore count is greather than size: " & $self.size & " count is: " & $self.count + + self.lock.release() + +template withSemaphore*(self: var Semaphore, body: untyped) = + self.acquire() + try: + body + finally: + self.release() diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim new file mode 100644 index 00000000..dcc16800 --- /dev/null +++ b/datastore/threads/threadproxyds.nim @@ -0,0 +1,380 @@ + +when not compileOption("threads"): + {.error: "This module requires --threads:on compilation flag".} + +import pkg/upraises + +push: {.upraises: [].} + +import std/tables +import std/locks +import std/sugar + + +import pkg/chronos +import pkg/chronos/threadsync +import pkg/questionable +import pkg/questionable/results +import pkg/taskpools +import pkg/chronicles +import pkg/threading/smartptrs + +import ../key +import ../query +import ../datastore +import ../backend +import ../sql/sqliteds + +import ./asyncsemaphore +import ./databuffer +import ./threadresult + +export threadresult + +logScope: + topics = "datastore threadproxyds" + +type + + ThreadBackendKinds* = enum + Sqlite + # Filesystem + + ThreadBackend* = object + ## backend case type to avoid needing to make ThreadDatastore generic + case kind*: ThreadBackendKinds + of Sqlite: + sql*: SQLiteBackend[KeyId,DataBuffer] + + TaskCtxObj*[T: ThreadTypes] = object + res: ThreadResult[T] + signal: ThreadSignalPtr + running: bool ## used to mark when a task worker is running + cancelled: bool ## used to cancel a task before it's started + + TaskCtx*[T] = SharedPtr[TaskCtxObj[T]] + ## Task context object. + ## This is a SharedPtr to make the query iter simpler + + ThreadDatastore* = ref object of Datastore + tp: Taskpool + backend: ThreadBackend + semaphore: AsyncSemaphore # semaphore is used for backpressure \ + # to avoid exhausting file descriptors + +var ctxLock: Lock +ctxLock.initLock() + +proc setCancelled[T](ctx: TaskCtx[T]) = + # withLock(ctxLock): + ctx[].cancelled = true + +proc setRunning[T](ctx: TaskCtx[T]): bool = + # withLock(ctxLock): + if ctx[].cancelled: + return false + ctx[].running = true + return true +proc setDone[T](ctx: TaskCtx[T]) = + # withLock(ctxLock): + ctx[].running = false + +proc acquireSignal(): ?!ThreadSignalPtr = + let signal = ThreadSignalPtr.new() + if signal.isErr(): + failure (ref CatchableError)(msg: "failed to aquire ThreadSignalPtr: " & signal.error()) + else: + success signal.get() + +template executeTask[T](ctx: TaskCtx[T], blk: untyped) = + ## executes a task on a thread work and handles cleanup after cancels/errors + ## + try: + if not ctx.setRunning(): + return + + ## run backend command + let res = `blk` + if res.isOk(): + when T is void: + ctx[].res.ok() + else: + ctx[].res.ok(res.get()) + else: + ctx[].res.err res.error().toThreadErr() + + except CatchableError as exc: + trace "Unexpected exception thrown in async task", exc = exc.msg + ctx[].res.err exc.toThreadErr() + # except Exception as exc: + # trace "Unexpected defect thrown in async task", exc = exc.msg + # ctx[].res.err exc.toThreadErr() + finally: + ctx.setDone() + discard ctx[].signal.fireSync() + +template dispatchTaskWrap[T](self: ThreadDatastore, + signal: ThreadSignalPtr, + blk: untyped + ): auto = + case self.backend.kind: + of Sqlite: + var ds {.used, inject.} = self.backend.sql + proc runTask() = + `blk` + runTask() + await wait(ctx[].signal) + +template dispatchTask[T](self: ThreadDatastore, + signal: ThreadSignalPtr, + blk: untyped + ): auto = + ## handles dispatching a task from an async context + ## `blk` is the actions, it has `ctx` and `ds` variables in scope. + ## note that `ds` is a generic + let ctx {.inject.} = newSharedPtr(TaskCtxObj[T](signal: signal)) + try: + dispatchTaskWrap[T](self, signal, blk) + except CancelledError as exc: + trace "Cancelling thread future!", exc = exc.msg + ctx.setCancelled() + raise exc + finally: + discard ctx[].signal.close() + self.semaphore.release() + + +proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} = + ## run backend command + executeTask(ctx): + has(ds, key) + +method has*(self: ThreadDatastore, + key: Key): Future[?!bool] {.async.} = + await self.semaphore.acquire() + without signal =? acquireSignal(), err: + return failure err + + let key = KeyId.new key.id() + dispatchTask[bool](self, signal): + self.tp.spawn hasTask(ctx, ds, key) + return ctx[].res.toRes(v => v) + + +proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB; + key: KeyId) {.gcsafe.} = + ## run backend command + executeTask(ctx): + delete(ds, key) + +method delete*(self: ThreadDatastore, + key: Key): Future[?!void] {.async.} = + ## delete key + await self.semaphore.acquire() + without signal =? acquireSignal(), err: + return failure err + + let key = KeyId.new key.id() + dispatchTask[void](self, signal): + self.tp.spawn deleteTask(ctx, ds, key) + + return ctx[].res.toRes() + +method delete*(self: ThreadDatastore, + keys: seq[Key]): Future[?!void] {.async.} = + ## delete batch + for key in keys: + if err =? (await self.delete(key)).errorOption: + return failure err + + return success() + + +proc putTask[T, DB](ctx: TaskCtx[T], ds: DB; + key: KeyId, + data: DataBuffer) {.gcsafe, nimcall.} = + executeTask(ctx): + put(ds, key, data) + +method put*(self: ThreadDatastore, + key: Key, + data: seq[byte]): Future[?!void] {.async.} = + ## put key with data + await self.semaphore.acquire() + without signal =? acquireSignal(), err: + return failure err + + dispatchTask[void](self, signal): + let key = KeyId.new key.id() + let data = DataBuffer.new data + self.tp.spawn putTask(ctx, ds, key, data) + + return ctx[].res.toRes() + +method put*( + self: ThreadDatastore, + batch: seq[BatchEntry]): Future[?!void] {.async.} = + ## put batch data + for entry in batch: + if err =? (await self.put(entry.key, entry.data)).errorOption: + return failure err + + return success() + + +proc getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB; + key: KeyId) {.gcsafe, nimcall.} = + ## run backend command + executeTask(ctx): + let res = get(ds, key) + res + +method get*(self: ThreadDatastore, + key: Key, + ): Future[?!seq[byte]] {.async.} = + await self.semaphore.acquire() + without signal =? acquireSignal(), err: + return failure err + + let key = KeyId.new key.id() + dispatchTask[DataBuffer](self, signal): + self.tp.spawn getTask(ctx, ds, key) + + return ctx[].res.toRes(v => v.toSeq()) + +method close*(self: ThreadDatastore): Future[?!void] {.async.} = + await self.semaphore.closeAll() + case self.backend.kind: + of Sqlite: + self.backend.sql.close() + +type + QResult = DbQueryResponse[KeyId, DataBuffer] + +import os + +proc queryTask[DB]( + ctx: TaskCtx[QResult], + ds: DB, + query: DbQuery[KeyId], + nextSignal: ThreadSignalPtr +) {.gcsafe, nimcall.} = + ## run query command + executeTask(ctx): + # we execute this all inside `executeTask` + # so we need to return a final result + let handleRes = ds.query(query) + if handleRes.isErr(): + # set error and exit executeTask, which will fire final signal + (?!QResult).err(handleRes.error()) + else: + # otherwise manually an set empty ok result + ctx[].res.ok (KeyId.none, DataBuffer(), ) + discard ctx[].signal.fireSync() + if not nextSignal.waitSync(10.seconds).get(): + raise newException(DeadThreadDefect, "query task timeout; possible deadlock!") + + var handle = handleRes.get() + for item in handle.iter(): + # wait for next request from async thread + + if ctx[].cancelled: + # cancel iter, then run next cycle so it'll finish and close + handle.cancel = true + continue + else: + ctx[].res = item.mapErr() do(exc: ref CatchableError) -> ThreadResErr: + exc + + discard ctx[].signal.fireSync() + + discard nextSignal.waitSync().get() + + # set final result + (?!QResult).ok((KeyId.none, DataBuffer())) + +method query*(self: ThreadDatastore, + q: Query + ): Future[?!QueryIter] {.async.} = + ## performs async query + ## keeps one thread running queryTask until finished + ## + await self.semaphore.acquire() + without signal =? acquireSignal(), err: + return failure err + without nextSignal =? acquireSignal(), err: + return failure err + + try: + let query = dbQuery( + key= KeyId.new q.key.id(), + value=q.value, limit=q.limit, offset=q.offset, sort=q.sort) + + # setup initial queryTask + let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal)) + dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal): + self.tp.spawn queryTask(ctx, ds, query, nextSignal) + await nextSignal.fire() + + var + lock = newAsyncLock() # serialize querying under threads + iter = QueryIter.new() + + proc next(): Future[?!QueryResponse] {.async.} = + let ctx = ctx + try: + trace "About to query" + if lock.locked: + return failure (ref DatastoreError)(msg: "Should always await query features") + if iter.finished == true: + return failure (ref QueryEndedError)(msg: "Calling next on a finished query!") + + await wait(ctx[].signal) + + if not ctx[].running: + iter.finished = true + + defer: + await nextSignal.fire() + + if ctx[].res.isErr(): + return err(ctx[].res.error()) + else: + let qres = ctx[].res.get() + let key = qres.key.map(proc (k: KeyId): Key = k.toKey()) + let data = qres.data.toSeq() + return (?!QueryResponse).ok((key: key, data: data)) + except CancelledError as exc: + trace "Cancelling thread future!", exc = exc.msg + ctx.setCancelled() + discard ctx[].signal.close() + discard nextSignal.close() + self.semaphore.release() + raise exc + + iter.next = next + return success iter + except CancelledError as exc: + trace "Cancelling thread future!", exc = exc.msg + discard signal.close() + discard nextSignal.close() + self.semaphore.release() + raise exc + +proc new*[DB](self: type ThreadDatastore, + db: DB, + withLocks = static false, + tp: Taskpool + ): ?!ThreadDatastore = + doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads" + + when DB is SQLiteBackend[KeyId,DataBuffer]: + let backend = ThreadBackend(kind: Sqlite, sql: db) + else: + {.error: "unsupported backend: " & $typeof(db).} + + success ThreadDatastore( + tp: tp, + backend: backend, + semaphore: AsyncSemaphore.new(tp.numThreads - 1) + ) diff --git a/datastore/threads/threadresult.nim b/datastore/threads/threadresult.nim new file mode 100644 index 00000000..da0df6dc --- /dev/null +++ b/datastore/threads/threadresult.nim @@ -0,0 +1,53 @@ +import std/atomics +import std/options + +import pkg/questionable/results +import pkg/results + +import ../types +import ../query +import ../key + +import ../backend +import ./databuffer + +type + ErrorEnum* {.pure.} = enum + DatastoreErr, + DatastoreKeyNotFoundErr, + QueryEndedErr, + CatchableErr, + DefectErr + + ThreadTypes* = void | bool | SomeInteger | DataBuffer | tuple | Atomic + ThreadResErr* = (ErrorEnum, DataBuffer) + ThreadResult*[T: ThreadTypes] = Result[T, ThreadResErr] + + +converter toThreadErr*(e: ref Exception): ThreadResErr {.inline, raises: [].} = + if e of DatastoreKeyNotFound: (ErrorEnum.DatastoreKeyNotFoundErr, DataBuffer.new(e.msg)) + elif e of QueryEndedError: (ErrorEnum.QueryEndedErr, DataBuffer.new(e.msg)) + elif e of DatastoreError: (DatastoreErr, DataBuffer.new(e.msg)) + elif e of CatchableError: (CatchableErr, DataBuffer.new(e.msg)) + elif e of Defect: (DefectErr, DataBuffer.new(e.msg)) + else: raise (ref Defect)(msg: e.msg) + +converter toExc*(e: ThreadResErr): ref CatchableError = + case e[0]: + of ErrorEnum.DatastoreKeyNotFoundErr: (ref DatastoreKeyNotFound)(msg: $e[1]) + of ErrorEnum.QueryEndedErr: (ref QueryEndedError)(msg: $e[1]) + of ErrorEnum.DatastoreErr: (ref DatastoreError)(msg: $e[1]) + of ErrorEnum.CatchableErr: (ref CatchableError)(msg: $e[1]) + of ErrorEnum.DefectErr: (ref CatchableError)(msg: "defect: " & $e[1]) + +proc toRes*(res: ThreadResult[void]): ?!void = + res.mapErr() do(e: ThreadResErr) -> ref CatchableError: + e.toExc() + +proc toRes*[T,S](res: ThreadResult[T], + m: proc(v: T): S = proc(v: T): T = v): ?!S = + # todo: cleaner way to do this? + if res.isErr(): + result.err res.error().toExc() + else: + result.ok m(res.get()) diff --git a/datastore/tieredds.nim b/datastore/tieredds.nim index 4eca23be..8448f9bf 100644 --- a/datastore/tieredds.nim +++ b/datastore/tieredds.nim @@ -74,13 +74,16 @@ method get*( bytes: seq[byte] for store in self.stores: - without bytes =? (await store.get(key)): - continue + without bytes =? (await store.get(key)), err: + if err of DatastoreKeyNotFound: + continue + else: + return failure(err) if bytes.len <= 0: continue - # put found data into stores logically in front of the current store + # put found data into stores in front of the current store for s in self.stores: if s == store: break if( @@ -88,7 +91,10 @@ method get*( res.isErr): return failure res.error - return success bytes + if bytes.len > 0: + return success bytes + + return failure (ref DatastoreKeyNotFound)(msg: "Key not found") method put*( self: TieredDatastore, diff --git a/datastore/types.nim b/datastore/types.nim index b019cdb0..473db527 100644 --- a/datastore/types.nim +++ b/datastore/types.nim @@ -6,5 +6,6 @@ const type DatastoreError* = object of CatchableError DatastoreKeyNotFound* = object of DatastoreError + QueryEndedError* = object of DatastoreError Datastore* = ref object of RootObj diff --git a/nimble.lock b/nimble.lock new file mode 100644 index 00000000..3a208e32 --- /dev/null +++ b/nimble.lock @@ -0,0 +1,214 @@ +{ + "version": 2, + "packages": { + "upraises": { + "version": "0.1.0", + "vcsRevision": "d9f268db1021959fe0f2c7a5e49fba741f9932a0", + "url": "https://github.com/markspanbroek/upraises", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "176234f808b44a0be763df706ed634d6e8df17bb" + } + }, + "results": { + "version": "0.4.0", + "vcsRevision": "f3c666a272c69d70cb41e7245e7f6844797303ad", + "url": "https://github.com/arnetheduck/nim-results", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "51e08ca9524db98dc909fb39192272cc2b5451c7" + } + }, + "unittest2": { + "version": "0.1.0", + "vcsRevision": "2300fa9924a76e6c96bc4ea79d043e3a0f27120c", + "url": "https://github.com/status-im/nim-unittest2", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "914cf9a380c83b2ae40697981e5d94903505e87e" + } + }, + "stew": { + "version": "0.1.0", + "vcsRevision": "3159137d9a3110edb4024145ce0ba778975de40e", + "url": "https://github.com/status-im/nim-stew", + "downloadMethod": "git", + "dependencies": [ + "results", + "unittest2" + ], + "checksums": { + "sha1": "4ab494e272e997011853faddebe9e55183613776" + } + }, + "faststreams": { + "version": "0.3.0", + "vcsRevision": "720fc5e5c8e428d9d0af618e1e27c44b42350309", + "url": "https://github.com/status-im/nim-faststreams", + "downloadMethod": "git", + "dependencies": [ + "stew", + "unittest2" + ], + "checksums": { + "sha1": "ab178ba25970b95d953434b5d86b4d60396ccb64" + } + }, + "serialization": { + "version": "0.2.0", + "vcsRevision": "4bdbc29e54fe54049950e352bb969aab97173b35", + "url": "https://github.com/status-im/nim-serialization", + "downloadMethod": "git", + "dependencies": [ + "faststreams", + "unittest2", + "stew" + ], + "checksums": { + "sha1": "c8c99a387aae488e7008aded909ebfe662e74450" + } + }, + "sqlite3_abi": { + "version": "3.40.1.1", + "vcsRevision": "362e1bd9f689ad9f5380d9d27f0705b3d4dfc7d3", + "url": "https://github.com/arnetheduck/nim-sqlite3-abi", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "8e91db8156a82383d9c48f53b33e48f4e93077b1" + } + }, + "testutils": { + "version": "0.5.0", + "vcsRevision": "dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34", + "url": "https://github.com/status-im/nim-testutils", + "downloadMethod": "git", + "dependencies": [ + "unittest2" + ], + "checksums": { + "sha1": "756d0757c4dd06a068f9d38c7f238576ba5ee897" + } + }, + "json_serialization": { + "version": "0.1.5", + "vcsRevision": "85b7ea093cb85ee4f433a617b97571bd709d30df", + "url": "https://github.com/status-im/nim-json-serialization", + "downloadMethod": "git", + "dependencies": [ + "serialization", + "stew" + ], + "checksums": { + "sha1": "c6b30565292acf199b8be1c62114726e354af59e" + } + }, + "chronicles": { + "version": "0.10.3", + "vcsRevision": "32ac8679680ea699f7dbc046e8e0131cac97d41a", + "url": "https://github.com/status-im/nim-chronicles", + "downloadMethod": "git", + "dependencies": [ + "testutils", + "json_serialization" + ], + "checksums": { + "sha1": "79f09526d4d9b9196dd2f6a75310d71a890c4f88" + } + }, + "asynctest": { + "version": "0.3.2", + "vcsRevision": "a236a5f0f3031573ac2cb082b63dbf6e170e06e7", + "url": "https://github.com/markspanbroek/asynctest", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "0ef50d086659835b0a23a4beb77cb11747695448" + } + }, + "httputils": { + "version": "0.3.0", + "vcsRevision": "87b7cbf032c90b9e6b446081f4a647e950362cec", + "url": "https://github.com/status-im/nim-http-utils", + "downloadMethod": "git", + "dependencies": [ + "stew", + "unittest2" + ], + "checksums": { + "sha1": "72a138157a9951f0986a9c4afc8c9a83ce3979a8" + } + }, + "taskpools": { + "version": "0.0.4", + "vcsRevision": "b3673c7a7a959ccacb393bd9b47e997bbd177f5a", + "url": "https://github.com/status-im/nim-taskpools", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "e43c5170d4e9ef1b27dd0956ffa0db46f992f9a6" + } + }, + "pretty": { + "version": "0.1.0", + "vcsRevision": "60850f8c595d4f0b9e55f3a99c7c471244a3182a", + "url": "https://github.com/treeform/pretty", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "b0a1cf8607d169050acd2bdca8d76fe850c23648" + } + }, + "bearssl": { + "version": "0.2.1", + "vcsRevision": "e4157639db180e52727712a47deaefcbbac6ec86", + "url": "https://github.com/status-im/nim-bearssl", + "downloadMethod": "git", + "dependencies": [ + "unittest2" + ], + "checksums": { + "sha1": "a5086fd5c0af2b852f34c0cc6e4cff93a98f97ec" + } + }, + "chronos": { + "version": "3.2.0", + "vcsRevision": "0277b65be2c7a365ac13df002fba6e172be55537", + "url": "https://github.com/status-im/nim-chronos", + "downloadMethod": "git", + "dependencies": [ + "stew", + "bearssl", + "httputils", + "unittest2" + ], + "checksums": { + "sha1": "78a41db7fb05b937196d4fa2f1e3fb4353b36a07" + } + }, + "questionable": { + "version": "0.10.10", + "vcsRevision": "b3cf35ac450fd42c9ea83dc084f5cba2efc55da3", + "url": "https://github.com/markspanbroek/questionable", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "8bb23a05d7f21619010471aa009e27d3fa73d93a" + } + }, + "threading": { + "version": "0.2.0", + "vcsRevision": "bcc284991ba928d1ed299a81a93b7113cc7de04f", + "url": "https://github.com/nim-lang/threading", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "08dfc46cedc3fc202e5de6ef80a655598331eb89" + } + } + }, + "tasks": {} +} diff --git a/tests/datastore/dscommontests.nim b/tests/datastore/dscommontests.nim index 1e0353cb..beda975e 100644 --- a/tests/datastore/dscommontests.nim +++ b/tests/datastore/dscommontests.nim @@ -3,8 +3,9 @@ import std/options import pkg/asynctest import pkg/chronos import pkg/stew/results +import pkg/questionable/results -import pkg/datastore +import pkg/datastore/datastore proc basicStoreTests*( ds: Datastore, @@ -56,3 +57,9 @@ proc basicStoreTests*( for k in batch: check: not (await ds.has(k)).tryGet + + test "handle missing key": + let key = Key.init("/missing/key").tryGet() + + expect(DatastoreKeyNotFound): + discard (await ds.get(key)).tryGet() # non existing key diff --git a/tests/datastore/querycommontests.nim b/tests/datastore/querycommontests.nim index 4f0a15d8..460804b7 100644 --- a/tests/datastore/querycommontests.nim +++ b/tests/datastore/querycommontests.nim @@ -36,9 +36,14 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var res: seq[QueryResponse] + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + res.add((key, val)) + res check: res.len == 3 @@ -63,9 +68,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res check: res.len == 3 @@ -90,9 +106,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res check: res.len == 2 @@ -117,9 +144,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = iter = (await ds.query(q)).tryGet var - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res res.sort do (a, b: QueryResponse) -> int: cmp(a.key.get.id, b.key.get.id) @@ -152,9 +190,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res check: res.len == 10 @@ -175,9 +224,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res check: res.len == 10 @@ -198,9 +258,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res check: res.len == 5 @@ -237,9 +308,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = kvs = kvs.reversed let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res check: res.len == 100 diff --git a/tests/datastore/sql/testsqlite.nim b/tests/datastore/sql/testsqlite.nim new file mode 100644 index 00000000..b63bac7c --- /dev/null +++ b/tests/datastore/sql/testsqlite.nim @@ -0,0 +1,90 @@ +import std/options +import std/os +import std/sequtils +from std/algorithm import sort, reversed + +import pkg/asynctest +import pkg/chronos +import pkg/stew/results +import pkg/stew/byteutils + +import pkg/datastore/sql + +import ../dscommontests +import ../querycommontests + +suite "Test Basic SQLiteDatastore": + let + ds = SQLiteDatastore.new(Memory).tryGet() + key = Key.init("a:b/c/d:e").tryGet() + bytes = "some bytes".toBytes + otherBytes = "some other bytes".toBytes + + teardownAll: + (await ds.close()).tryGet() + + basicStoreTests(ds, key, bytes, otherBytes) + + +suite "Test Read Only SQLiteDatastore": + let + path = currentSourcePath() # get this file's name + basePath = "tests_data" + basePathAbs = path.parentDir / basePath + filename = "test_store" & DbExt + dbPathAbs = basePathAbs / filename + key = Key.init("a:b/c/d:e").tryGet() + bytes = "some bytes".toBytes + + var + dsDb: SQLiteDatastore + readOnlyDb: SQLiteDatastore + + setupAll: + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) + createDir(basePathAbs) + + dsDb = SQLiteDatastore.new(path = dbPathAbs).tryGet() + readOnlyDb = SQLiteDatastore.new(path = dbPathAbs, readOnly = true).tryGet() + + teardownAll: + (await dsDb.close()).tryGet() + (await readOnlyDb.close()).tryGet() + + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) + + test "put": + check: + (await readOnlyDb.put(key, bytes)).isErr + + (await dsDb.put(key, bytes)).tryGet() + + test "get": + check: + (await readOnlyDb.get(key)).tryGet() == bytes + (await dsDb.get(key)).tryGet() == bytes + + test "delete": + check: + (await readOnlyDb.delete(key)).isErr + + (await dsDb.delete(key)).tryGet() + + test "contains": + check: + not (await readOnlyDb.has(key)).tryGet() + not (await dsDb.has(key)).tryGet() + +# suite "Test Query": +# var +# ds: SQLiteDatastore + +# setup: +# ds = SQLiteDatastore.new(Memory).tryGet() + +# teardown: +# (await ds.close()).tryGet + +# queryTests(ds) diff --git a/tests/datastore/sql/testsqliteds.nim b/tests/datastore/sql/testsqliteds.nim index c629eb0c..1309dcd3 100644 --- a/tests/datastore/sql/testsqliteds.nim +++ b/tests/datastore/sql/testsqliteds.nim @@ -3,87 +3,348 @@ import std/os import std/sequtils from std/algorithm import sort, reversed -import pkg/asynctest +import pkg/unittest2 import pkg/chronos import pkg/stew/results import pkg/stew/byteutils import pkg/datastore/sql/sqliteds +import pkg/datastore/key import ../dscommontests -import ../querycommontests + +proc testBasic[K, V]( + ds: SQLiteBackend[K,V], + key: K, + bytes: V, + otherBytes: V, + batch: seq[DbBatchEntry[K, V]], + extended = true +) = + + test "put": + ds.put(key, bytes).tryGet() + + test "get": + check: + ds.get(key).tryGet() == bytes + + test "put update": + ds.put(key, otherBytes).tryGet() + + test "get updated": + check: + ds.get(key).tryGet() == otherBytes + + test "delete": + ds.delete(key).tryGet() + + test "contains": + check key notin ds + + test "put batch": + + ds.put(batch).tryGet + + for (k, v) in batch: + check: ds.has(k).tryGet + + test "delete batch": + var keys: seq[K] + for (k, v) in batch: + keys.add(k) + + ds.delete(keys).tryGet + + for (k, v) in batch: + check: not ds.has(k).tryGet + + test "handle missing key": + when K is KeyId: + let key = KeyId.new Key.init("/missing/key").tryGet().id() + elif K is string: + let key = $KeyId.new Key.init("/missing/key").tryGet().id() + + expect(DatastoreKeyNotFound): + discard ds.get(key).tryGet() # non existing key suite "Test Basic SQLiteDatastore": let - ds = SQLiteDatastore.new(Memory).tryGet() - key = Key.init("a:b/c/d:e").tryGet() + ds = newSQLiteBackend[string, seq[byte]](path=Memory).tryGet() + keyFull = Key.init("a:b/c/d:e").tryGet() + key = keyFull.id() bytes = "some bytes".toBytes otherBytes = "some other bytes".toBytes - teardownAll: - (await ds.close()).tryGet() + var batch: seq[tuple[key: string, data: seq[byte]]] + for k in 0..<100: + let kk = Key.init(key, $k).tryGet().id() + batch.add( (kk, @[k.byte]) ) + + suiteTeardown: + ds.close().tryGet() - basicStoreTests(ds, key, bytes, otherBytes) + testBasic(ds, key, bytes, otherBytes, batch) -suite "Test Read Only SQLiteDatastore": +suite "Test DataBuffer SQLiteDatastore": let - path = currentSourcePath() # get this file's name - basePath = "tests_data" - basePathAbs = path.parentDir / basePath - filename = "test_store" & DbExt - dbPathAbs = basePathAbs / filename - key = Key.init("a:b/c/d:e").tryGet() - bytes = "some bytes".toBytes + ds = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() + keyFull = Key.init("a:b/c/d:e").tryGet() + key = KeyId.new keyFull.id() + bytes = DataBuffer.new "some bytes" + otherBytes = DataBuffer.new "some other bytes" - var - dsDb: SQLiteDatastore - readOnlyDb: SQLiteDatastore + var batch: seq[tuple[key: KeyId, data: DataBuffer]] + for k in 0..<100: + let kk = Key.init(keyFull.id(), $k).tryGet().id() + batch.add( (KeyId.new kk, DataBuffer.new @[k.byte]) ) - setupAll: - removeDir(basePathAbs) - require(not dirExists(basePathAbs)) - createDir(basePathAbs) + suiteTeardown: + ds.close().tryGet() - dsDb = SQLiteDatastore.new(path = dbPathAbs).tryGet() - readOnlyDb = SQLiteDatastore.new(path = dbPathAbs, readOnly = true).tryGet() + testBasic(ds, key, bytes, otherBytes, batch) - teardownAll: - (await dsDb.close()).tryGet() - (await readOnlyDb.close()).tryGet() +suite "queryTests": - removeDir(basePathAbs) - require(not dirExists(basePathAbs)) + setup: + let + ds = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() + key1 = KeyId.new "/a" + key2 = KeyId.new "/a/b" + key3 = KeyId.new "/a/b/c" + val1 = DataBuffer.new "value for 1" + val2 = DataBuffer.new "value for 2" + val3 = DataBuffer.new "value for 3" + + test "Key should query all keys and all it's children": + let + q = dbQuery(key=key1, value=true) + + ds.put(key1, val1).tryGet + ds.put(key2, val2).tryGet + ds.put(key3, val3).tryGet + + var + handle = ds.query(q).tryGet + res = handle.iter().toSeq().mapIt(it.tryGet()) - test "put": check: - (await readOnlyDb.put(key, bytes)).isErr + res.len == 3 + res[0].key.get == key1 + res[0].data == val1 - (await dsDb.put(key, bytes)).tryGet() + res[1].key.get == key2 + res[1].data == val2 + + res[2].key.get == key3 + res[2].data == val3 + + test "query should cancel": + let + q = dbQuery(key= key1, value= true) + + ds.put(key1, val1).tryGet + ds.put(key2, val2).tryGet + ds.put(key3, val3).tryGet + + var + handle = ds.query(q).tryGet + + var res: seq[DbQueryResponse[KeyId, DataBuffer]] + var cnt = 0 + for item in handle.iter(): + cnt.inc + res.insert(item.tryGet(), 0) + if cnt > 1: + handle.cancel = true - test "get": check: - (await readOnlyDb.get(key)).tryGet() == bytes - (await dsDb.get(key)).tryGet() == bytes + handle.cancel == true + handle.closed == true + res.len == 2 - test "delete": + res[0].key.get == key2 + res[0].data == val2 + + res[1].key.get == key1 + res[1].data == val1 + + test "Key should query all keys without values": + let + q = dbQuery(key= key1, value= false) + + ds.put(key1, val1).tryGet + ds.put(key2, val2).tryGet + ds.put(key3, val3).tryGet + + var + handle = ds.query(q).tryGet + let + res = handle.iter().toSeq().mapIt(it.tryGet()) + check: - (await readOnlyDb.delete(key)).isErr + res.len == 3 + res[0].key.get == key1 + res[0].data.len == 0 - (await dsDb.delete(key)).tryGet() + res[1].key.get == key2 + res[1].data.len == 0 + + res[2].key.get == key3 + res[2].data.len == 0 + + + test "Key should not query parent": + let + q = dbQuery(key= key2, value= true) + + ds.put(key1, val1).tryGet + ds.put(key2, val2).tryGet + ds.put(key3, val3).tryGet + + var + handle = ds.query(q).tryGet + let + res = handle.iter().toSeq().mapIt(it.tryGet()) - test "contains": check: - not (await readOnlyDb.has(key)).tryGet() - not (await dsDb.has(key)).tryGet() + res.len == 2 + res[0].key.get == key2 + res[0].data == val2 -suite "Test Query": - var - ds: SQLiteDatastore + res[1].key.get == key3 + res[1].data == val3 - setup: - ds = SQLiteDatastore.new(Memory).tryGet() + test "Key should all list all keys at the same level": + let + queryKey = Key.init("/a").tryGet + q = dbQuery(key= key1, value= true) + + ds.put(key1, val1).tryGet + ds.put(key2, val2).tryGet + ds.put(key3, val3).tryGet + + var + handle = ds.query(q).tryGet + res = handle.iter().toSeq().mapIt(it.tryGet()) + + res.sort do (a, b: DbQueryResponse[KeyId, DataBuffer]) -> int: + cmp($a.key.get, $b.key.get) + + check: + res.len == 3 + res[0].key.get == key1 + res[0].data == val1 + + res[1].key.get == key2 + res[1].data == val2 + + res[2].key.get == key3 + res[2].data == val3 + + test "Should apply limit": + let + key = Key.init("/a").tryGet + q = dbQuery(key= key1, limit= 10, value= false) + + for i in 0..<100: + let + key = KeyId.new $Key.init(key, Key.init("/" & $i).tryGet).tryGet + val = DataBuffer.new("val " & $i) + + ds.put(key, val).tryGet + + var + handle = ds.query(q).tryGet + let + res = handle.iter().toSeq().mapIt(it.tryGet()) + + check: + res.len == 10 + + test "Should not apply offset": + let + key = Key.init("/a").tryGet + keyId = KeyId.new $key + q = dbQuery(key= keyId, offset= 90) + + for i in 0..<100: + let + key = KeyId.new $Key.init(key, Key.init("/" & $i).tryGet).tryGet + val = DataBuffer.new("val " & $i) + + ds.put(key, val).tryGet + + var + qr = ds.query(q) + # echo "RES: ", qr.repr + + var + handle = ds.query(q).tryGet + let + res = handle.iter().toSeq().mapIt(it.tryGet()) + + # echo "RES: ", res.mapIt(it.key) + check: + res.len == 10 + + test "Should not apply offset and limit": + let + key = Key.init("/a").tryGet + keyId = KeyId.new $key + q = dbQuery(key= keyId, offset= 95, limit= 5) + + for i in 0..<100: + let + key = KeyId.new $Key.init(key, Key.init("/" & $i).tryGet).tryGet + val = DataBuffer.new("val " & $i) + + ds.put(key, val).tryGet + + var + handle = ds.query(q).tryGet + res = handle.iter().toSeq().mapIt(it.tryGet()) + + check: + res.len == 5 + + for i in 0.. int: + cmp($a.key.get, $b.key.get) + + kvs = kvs.reversed + var + handle = ds.query(q).tryGet + res = handle.iter().toSeq().mapIt(it.tryGet()) - teardown: - (await ds.close()).tryGet + check: + res.len == 100 - queryTests(ds) + for i, r in res[1..^1]: + check: + res[i].key.get == kvs[i].key.get + res[i].data == kvs[i].data diff --git a/tests/datastore/sql/testsqlitedsdb.nim b/tests/datastore/sql/testsqlitedsdb.nim index d1049331..03fd98fe 100644 --- a/tests/datastore/sql/testsqlitedsdb.nim +++ b/tests/datastore/sql/testsqlitedsdb.nim @@ -28,7 +28,7 @@ suite "Test Open SQLite Datastore DB": test "Should create and open datastore DB": let - dsDb = SQLiteDsDb.open( + dsDb = SQLiteDsDb[string, seq[byte]].open( path = dbPathAbs, flags = SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE).tryGet() @@ -40,7 +40,7 @@ suite "Test Open SQLite Datastore DB": test "Should open existing DB": let - dsDb = SQLiteDsDb.open( + dsDb = SQLiteDsDb[string, seq[byte]].open( path = dbPathAbs, flags = SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE).tryGet() @@ -55,7 +55,7 @@ suite "Test Open SQLite Datastore DB": fileExists(dbPathAbs) let - dsDb = SQLiteDsDb.open( + dsDb = SQLiteDsDb[string, seq[byte]].open( path = dbPathAbs, flags = SQLITE_OPEN_READONLY).tryGet() @@ -66,7 +66,7 @@ suite "Test Open SQLite Datastore DB": removeDir(basePathAbs) check: not fileExists(dbPathAbs) - SQLiteDsDb.open(path = dbPathAbs).isErr + SQLiteDsDb[string, seq[byte]].open(path = dbPathAbs).isErr suite "Test SQLite Datastore DB operations": let @@ -81,19 +81,19 @@ suite "Test SQLite Datastore DB operations": otherData = "some other data".toBytes var - dsDb: SQLiteDsDb - readOnlyDb: SQLiteDsDb + dsDb: SQLiteDsDb[string, seq[byte]] + readOnlyDb: SQLiteDsDb[string, seq[byte]] setupAll: removeDir(basePathAbs) require(not dirExists(basePathAbs)) createDir(basePathAbs) - dsDb = SQLiteDsDb.open( + dsDb = SQLiteDsDb[string, seq[byte]].open( path = dbPathAbs, flags = SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE).tryGet() - readOnlyDb = SQLiteDsDb.open( + readOnlyDb = SQLiteDsDb[string, seq[byte]].open( path = dbPathAbs, flags = SQLITE_OPEN_READONLY).tryGet() @@ -111,12 +111,10 @@ suite "Test SQLite Datastore DB operations": dsDb.putStmt.exec((key.id, data, timestamp())).tryGet() test "Should select key": - let - dataCol = dsDb.getDataCol var bytes: seq[byte] proc onData(s: RawStmtPtr) = - bytes = dataCol() + bytes = dataCol[seq[byte]](dsDb.getDataCol) check: dsDb.getStmt.query((key.id), onData).tryGet() @@ -129,12 +127,10 @@ suite "Test SQLite Datastore DB operations": dsDb.putStmt.exec((key.id, otherData, timestamp())).tryGet() test "Should select updated key": - let - dataCol = dsDb.getDataCol var bytes: seq[byte] proc onData(s: RawStmtPtr) = - bytes = dataCol() + bytes = dataCol[seq[byte]](dsDb.getDataCol) check: dsDb.getStmt.query((key.id), onData).tryGet() diff --git a/tests/datastore/testasyncsemaphore.nim b/tests/datastore/testasyncsemaphore.nim new file mode 100644 index 00000000..3c8bb84b --- /dev/null +++ b/tests/datastore/testasyncsemaphore.nim @@ -0,0 +1,206 @@ +{.used.} + +# Nim-Libp2p +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +import random + +import pkg/chronos +import pkg/asynctest + +import pkg/datastore/threads/asyncsemaphore + +randomize() + +suite "AsyncSemaphore": + test "Should acquire": + let sema = AsyncSemaphore.new(3) + + await sema.acquire() + await sema.acquire() + await sema.acquire() + + check sema.count == 0 + + test "Should release": + let sema = AsyncSemaphore.new(3) + + await sema.acquire() + await sema.acquire() + await sema.acquire() + + check sema.count == 0 + sema.release() + sema.release() + sema.release() + check sema.count == 3 + + test "Should queue acquire": + let sema = AsyncSemaphore.new(1) + + await sema.acquire() + let fut = sema.acquire() + + check sema.count == 0 + sema.release() + sema.release() + check sema.count == 1 + + await sleepAsync(10.millis) + check fut.finished() + + test "Should keep count == size": + let sema = AsyncSemaphore.new(1) + sema.release() + sema.release() + sema.release() + check sema.count == 1 + + test "Should tryAcquire": + let sema = AsyncSemaphore.new(1) + await sema.acquire() + check sema.tryAcquire() == false + + test "Should tryAcquire and acquire": + let sema = AsyncSemaphore.new(4) + check sema.tryAcquire() == true + check sema.tryAcquire() == true + check sema.tryAcquire() == true + check sema.tryAcquire() == true + check sema.count == 0 + + let fut = sema.acquire() + check fut.finished == false + check sema.count == 0 + + sema.release() + sema.release() + sema.release() + sema.release() + sema.release() + + check fut.finished == true + check sema.count == 4 + + test "Should restrict resource access": + let sema = AsyncSemaphore.new(3) + var resource = 0 + + proc task() {.async.} = + try: + await sema.acquire() + resource.inc() + check resource > 0 and resource <= 3 + let sleep = rand(0..10).millis + # echo sleep + await sleepAsync(sleep) + finally: + resource.dec() + sema.release() + + var tasks: seq[Future[void]] + for i in 0..<10: + tasks.add(task()) + + await allFutures(tasks) + + test "Should cancel sequential semaphore slot": + let sema = AsyncSemaphore.new(1) + + await sema.acquire() + + let + tmp = sema.acquire() + tmp2 = sema.acquire() + check: + not tmp.finished() + not tmp2.finished() + + tmp.cancel() + sema.release() + + check tmp2.finished() + + sema.release() + + check await sema.acquire().withTimeout(10.millis) + + test "Should handle out of order cancellations": + let sema = AsyncSemaphore.new(1) + + await sema.acquire() # 1st acquire + let tmp1 = sema.acquire() # 2nd acquire + check not tmp1.finished() + + let tmp2 = sema.acquire() # 3rd acquire + check not tmp2.finished() + + let tmp3 = sema.acquire() # 4th acquire + check not tmp3.finished() + + # up to this point, we've called acquire 4 times + tmp1.cancel() # 1st release (implicit) + tmp2.cancel() # 2nd release (implicit) + + check not tmp3.finished() # check that we didn't release the wrong slot + + sema.release() # 3rd release (explicit) + check tmp3.finished() + + sema.release() # 4th release + check await sema.acquire().withTimeout(10.millis) + + test "Should properly handle timeouts and cancellations": + let sema = AsyncSemaphore.new(1) + + await sema.acquire() + check not(await sema.acquire().withTimeout(1.millis)) # Should not acquire but cancel + sema.release() + + check await sema.acquire().withTimeout(10.millis) + + test "Should handle forceAcquire properly": + let sema = AsyncSemaphore.new(1) + + await sema.acquire() + check not(await sema.acquire().withTimeout(1.millis)) # Should not acquire but cancel + + let + fut1 = sema.acquire() + fut2 = sema.acquire() + + sema.forceAcquire() + sema.release() + + await fut1 or fut2 or sleepAsync(1.millis) + check: + fut1.finished() + not fut2.finished() + + sema.release() + await fut1 or fut2 or sleepAsync(1.millis) + check: + fut1.finished() + fut2.finished() + + + sema.forceAcquire() + sema.forceAcquire() + + let + fut3 = sema.acquire() + fut4 = sema.acquire() + fut5 = sema.acquire() + sema.release() + sema.release() + await sleepAsync(1.millis) + check: + fut3.finished() + fut4.finished() + not fut5.finished() diff --git a/tests/datastore/testdatabuffer.nim b/tests/datastore/testdatabuffer.nim new file mode 100644 index 00000000..eb979317 --- /dev/null +++ b/tests/datastore/testdatabuffer.nim @@ -0,0 +1,139 @@ +import std/options +import std/sequtils +import std/algorithm +import std/locks +import std/os +import pkg/stew/byteutils +import pkg/unittest2 +import pkg/questionable +import pkg/questionable/results +import pkg/datastore/key + +include ../../datastore/threads/databuffer + +var + shareVal: DataBuffer + lock: Lock + cond: Cond + +var threads: array[2,Thread[int]] + +proc thread1(val: int) {.thread.} = + echo "thread1" + {.cast(gcsafe).}: + for i in 0..