diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0e2e106d..3d85c89b 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -134,3 +134,10 @@ jobs: nim --version echo nimble --verbose test + + - name: Build and run tests + run: | + source "${HOME}/.bash_env" + nim --version + echo + nimble --verbose -d:datastoreUseAsync=false test diff --git a/.gitignore b/.gitignore index 1a87a994..de000360 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,6 @@ coverage datastore.nims nimcache TODO +nim.cfg +nimble.develop +nimble.paths diff --git a/config.nims b/config.nims index 052a576c..83d02dcc 100644 --- a/config.nims +++ b/config.nims @@ -3,6 +3,8 @@ --styleCheck:usages --styleCheck:error +# --d:"datastoreUseAsync=false" + when (NimMajor, NimMinor) == (1, 2): switch("hint", "Processing:off") switch("hint", "XDeclaredButNotUsed:off") @@ -10,3 +12,7 @@ when (NimMajor, NimMinor) == (1, 2): when (NimMajor, NimMinor) > (1, 2): switch("hint", "XCannotRaiseY:off") +# begin Nimble config (version 2) +when withDir(thisDir(), system.fileExists("nimble.paths")): + include "nimble.paths" +# end Nimble config diff --git a/datastore.nim b/datastore.nim index 6d43a209..47c80bf3 100644 --- a/datastore.nim +++ b/datastore.nim @@ -4,4 +4,4 @@ import ./datastore/sql import ./datastore/mountedds import ./datastore/tieredds -export datastore, fsds, mountedds, tieredds, sql +export datastore, fsds, sql, mountedds, tieredds diff --git a/datastore.nimble b/datastore.nimble index 6252c6d1..4a75697f 100644 --- a/datastore.nimble +++ b/datastore.nimble @@ -6,14 +6,18 @@ author = "Status Research & Development GmbH" description = "Simple, unified API for multiple data stores" license = "Apache License 2.0 or MIT" -requires "nim >= 1.2.0", +requires "nim >= 1.6.14", "asynctest >= 0.3.1 & < 0.4.0", - "chronos", - "questionable >= 0.10.3 & < 0.11.0", + "chronos#0277b65be2c7a365ac13df002fba6e172be55537", + "questionable#head", "sqlite3_abi", "stew", "unittest2", - "upraises >= 0.1.0 & < 0.2.0" + "pretty", + "threading#7c033b9135b4baae84e6c89af7548848bba7dc02", # or more recent + "taskpools", + "upraises >= 0.1.0 & < 0.2.0", + "chronicles" task coverage, "generates code coverage report": var (output, exitCode) = gorgeEx("which lcov") diff --git a/datastore/datastore.nim b/datastore/datastore.nim index d7fa45fd..d570be1e 100644 --- a/datastore/datastore.nim +++ b/datastore/datastore.nim @@ -8,37 +8,44 @@ import ./types export key, query, types +const datastoreUseAsync* {.booldefine.} = true + push: {.upraises: [].} type BatchEntry* = tuple[key: Key, data: seq[byte]] -method has*(self: Datastore, key: Key): Future[?!bool] {.base, locks: "unknown".} = +method has*(self: Datastore, key: Key): Future[?!bool] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method delete*(self: Datastore, key: Key): Future[?!void] {.base, locks: "unknown".} = +method delete*(self: Datastore, key: Key): Future[?!void] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method delete*(self: Datastore, keys: seq[Key]): Future[?!void] {.base, locks: "unknown".} = +method delete*(self: Datastore, keys: seq[Key]): Future[?!void] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method get*(self: Datastore, key: Key): Future[?!seq[byte]] {.base, locks: "unknown".} = +method get*(self: Datastore, key: Key): Future[?!seq[byte]] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method put*(self: Datastore, key: Key, data: seq[byte]): Future[?!void] {.base, locks: "unknown".} = +method put*(self: Datastore, key: Key, data: seq[byte]): Future[?!void] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, locks: "unknown".} = +method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method close*(self: Datastore): Future[?!void] {.base, async, locks: "unknown".} = +method close*(self: Datastore): Future[?!void] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method query*( - self: Datastore, - query: Query): Future[?!QueryIter] {.base, gcsafe.} = +method query*(self: Datastore, + query: Query + ): Future[?!QueryIter] {.base, gcsafe, raises: [].} = + + raiseAssert("Not implemented!") +method queryIter*(self: Datastore, + query: Query + ): ?!(iterator(): ?!QueryResponse) {.base, gcsafe, raises: [].} = raiseAssert("Not implemented!") -proc contains*(self: Datastore, key: Key): Future[bool] {.async.} = +proc contains*(self: Datastore, key: Key): Future[bool] {.async, raises: [].} = return (await self.has(key)) |? false diff --git a/datastore/fsds.nim b/datastore/fsds.nim index 163a52ab..e22788ff 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -2,237 +2,298 @@ import std/os import std/options import std/strutils -import pkg/chronos import pkg/questionable import pkg/questionable/results from pkg/stew/results as stewResults import get, isErr import pkg/upraises +import pkg/chronos +import pkg/taskpools +import ./threads/fsbackend +import ./threads/threadproxy import ./datastore -export datastore +export datastore, threadproxy, fsbackend, Taskpool push: {.upraises: [].} -type - FSDatastore* = ref object of Datastore - root*: string - ignoreProtected: bool - depth: int +when datastoreUseAsync: + type + FSDatastore* = ref object of Datastore + db: ThreadProxy[FSBackend[KeyId, DataBuffer]] + + proc validDepth*(self: FSDatastore, key: Key): bool = + key.len <= self.db.backend.depth + + method has*(self: FSDatastore, + key: Key): Future[?!bool] {.async.} = + await self.db.has(key) + + method delete*(self: FSDatastore, + key: Key): Future[?!void] {.async.} = + await self.db.delete(key) + + method delete*(self: FSDatastore, + keys: seq[Key]): Future[?!void] {.async.} = + await self.db.delete(keys) + + method get*(self: FSDatastore, + key: Key): Future[?!seq[byte]] {.async.} = + await self.db.get(key) + + method put*(self: FSDatastore, + key: Key, + data: seq[byte]): Future[?!void] {.async.} = + await self.db.put(key, data) + + method put*(self: FSDatastore, + batch: seq[BatchEntry]): Future[?!void] {.async.} = + await self.db.put(batch) -proc validDepth*(self: FSDatastore, key: Key): bool = - key.len <= self.depth + method query*(self: FSDatastore, + q: Query): Future[?!QueryIter] {.async.} = + await self.db.query(q) -proc isRootSubdir*(self: FSDatastore, path: string): bool = - path.startsWith(self.root) + method close*(self: FSDatastore): Future[?!void] {.async.} = + await self.db.close() -proc path*(self: FSDatastore, key: Key): ?!string = - ## Return filename corresponding to the key - ## or failure if the key doesn't correspond to a valid filename - ## + proc new*( + T: type FSDatastore, + root: string, + tp: Taskpool = Taskpool.new(4), + depth = 2, + caseSensitive = true, + ignoreProtected = false + ): ?!FSDatastore = - if not self.validDepth(key): - return failure "Path has invalid depth!" + let + backend = ? newFSBackend[KeyId, DataBuffer]( + root = root, depth = depth, caseSensitive = caseSensitive, + ignoreProtected = ignoreProtected) + db = ? ThreadProxy.new(backend, tp = tp) + success FSDatastore(db: db) + +else: + type + FSDatastore* = ref object of Datastore + root*: string + ignoreProtected: bool + depth: int + + proc validDepth*(self: FSDatastore, key: Key): bool = + key.len <= self.depth - var - segments: seq[string] + proc isRootSubdir*(self: FSDatastore, path: string): bool = + path.startsWith(self.root) - for ns in key: - let basename = ns.value.extractFilename - if basename == "" or not basename.isValidFilename: - return failure "Filename contains invalid chars!" + proc path*(self: FSDatastore, key: Key): ?!string = + ## Return filename corresponding to the key + ## or failure if the key doesn't correspond to a valid filename + ## - if ns.field == "": - segments.add(ns.value) - else: - let basename = ns.field.extractFilename + if not self.validDepth(key): + return failure "Path has invalid depth!" + + var + segments: seq[string] + + for ns in key: + let basename = ns.value.extractFilename if basename == "" or not basename.isValidFilename: return failure "Filename contains invalid chars!" - # `:` are replaced with `/` - segments.add(ns.field / ns.value) + if ns.field == "": + segments.add(ns.value) + else: + let basename = ns.field.extractFilename + if basename == "" or not basename.isValidFilename: + return failure "Filename contains invalid chars!" - let - fullname = (self.root / segments.joinPath()) - .absolutePath() - .catch() - .get() - .addFileExt(FileExt) + # `:` are replaced with `/` + segments.add(ns.field / ns.value) - if not self.isRootSubdir(fullname): - return failure "Path is outside of `root` directory!" + let + fullname = (self.root / segments.joinPath()) + .absolutePath() + .catch() + .get() + .addFileExt(FileExt) - return success fullname + if not self.isRootSubdir(fullname): + return failure "Path is outside of `root` directory!" -method has*(self: FSDatastore, key: Key): Future[?!bool] {.async.} = - return self.path(key).?fileExists() + return success fullname -method delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} = - without path =? self.path(key), error: - return failure error + method has*(self: FSDatastore, key: Key): Future[?!bool] {.async.} = + return self.path(key).?fileExists() - if not path.fileExists(): - return success() + method delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} = + without path =? self.path(key), error: + return failure error - try: - removeFile(path) - except OSError as e: - return failure e + if not path.fileExists(): + return success() - return success() + try: + removeFile(path) + except OSError as e: + return failure e -method delete*(self: FSDatastore, keys: seq[Key]): Future[?!void] {.async.} = - for key in keys: - if err =? (await self.delete(key)).errorOption: - return failure err + return success() - return success() + method delete*(self: FSDatastore, keys: seq[Key]): Future[?!void] {.async.} = + for key in keys: + if err =? (await self.delete(key)).errorOption: + return failure err -proc readFile*(self: FSDatastore, path: string): ?!seq[byte] = - var - file: File + return success() - defer: - file.close + proc readFile*(self: FSDatastore, path: string): ?!seq[byte] = + var + file: File - if not file.open(path): - return failure "unable to open file!" + defer: + file.close - try: - let - size = file.getFileSize + if not file.open(path): + return failure "unable to open file!" - var - bytes = newSeq[byte](size) - read = 0 + try: + let + size = file.getFileSize - while read < size: - read += file.readBytes(bytes, read, size) + var + bytes = newSeq[byte](size) + read = 0 - if read < size: - return failure $read & " bytes were read from " & path & - " but " & $size & " bytes were expected" + while read < size: + read += file.readBytes(bytes, read, size) - return success bytes + if read < size: + return failure $read & " bytes were read from " & path & + " but " & $size & " bytes were expected" - except CatchableError as e: - return failure e + return success bytes -method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async.} = - without path =? self.path(key), error: - return failure error + except CatchableError as e: + return failure e - if not path.fileExists(): - return failure( - newException(DatastoreKeyNotFound, "Key doesn't exist")) + method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async.} = + without path =? self.path(key), error: + return failure error - return self.readFile(path) + if not path.fileExists(): + return failure( + newException(DatastoreKeyNotFound, "Key doesn't exist")) -method put*( - self: FSDatastore, - key: Key, - data: seq[byte]): Future[?!void] {.async.} = + return self.readFile(path) - without path =? self.path(key), error: - return failure error + method put*( + self: FSDatastore, + key: Key, + data: seq[byte]): Future[?!void] {.async.} = - try: - createDir(parentDir(path)) - writeFile(path, data) - except CatchableError as e: - return failure e + without path =? self.path(key), error: + return failure error - return success() + try: + createDir(parentDir(path)) + writeFile(path, data) + except CatchableError as e: + return failure e -method put*( - self: FSDatastore, - batch: seq[BatchEntry]): Future[?!void] {.async.} = + return success() - for entry in batch: - if err =? (await self.put(entry.key, entry.data)).errorOption: - return failure err + method put*( + self: FSDatastore, + batch: seq[BatchEntry]): Future[?!void] {.async.} = - return success() + for entry in batch: + if err =? (await self.put(entry.key, entry.data)).errorOption: + return failure err -proc dirWalker(path: string): iterator: string {.gcsafe.} = - return iterator(): string = - try: - for p in path.walkDirRec(yieldFilter = {pcFile}, relative = true): - yield p - except CatchableError as exc: - raise newException(Defect, exc.msg) - -method close*(self: FSDatastore): Future[?!void] {.async.} = - return success() - -method query*( - self: FSDatastore, - query: Query): Future[?!QueryIter] {.async.} = - - without path =? self.path(query.key), error: - return failure error - - let basePath = - # it there is a file in the directory - # with the same name then list the contents - # of the directory, otherwise recurse - # into subdirectories - if path.fileExists: - path.parentDir - else: - path.changeFileExt("") - - let - walker = dirWalker(basePath) - - var - iter = QueryIter.new() - - proc next(): Future[?!QueryResponse] {.async.} = - let - path = walker() + return success() - if finished(walker): - iter.finished = true - return success (Key.none, EmptyBytes) + proc dirWalker(path: string): iterator: string {.gcsafe.} = + return iterator(): string = + try: + for p in path.walkDirRec(yieldFilter = {pcFile}, relative = true): + yield p + except CatchableError as exc: + raise newException(Defect, exc.msg) - var - keyPath = basePath + method close*(self: FSDatastore): Future[?!void] {.async.} = + return success() - keyPath.removePrefix(self.root) - keyPath = keyPath / path.changeFileExt("") - keyPath = keyPath.replace("\\", "/") + method query*( + self: FSDatastore, + query: Query): Future[?!QueryIter] {.async.} = + + without path =? self.path(query.key), error: + return failure error + + let basePath = + # it there is a file in the directory + # with the same name then list the contents + # of the directory, otherwise recurse + # into subdirectories + if path.fileExists: + path.parentDir + else: + path.changeFileExt("") let - key = Key.init(keyPath).expect("should not fail") - data = - if query.value: - self.readFile((basePath / path).absolutePath) - .expect("Should read file") - else: - @[] - - return success (key.some, data) - - iter.next = next - return success iter - -proc new*( - T: type FSDatastore, - root: string, - depth = 2, - caseSensitive = true, - ignoreProtected = false): ?!T = - - let root = ? ( - block: - if root.isAbsolute: root - else: getCurrentDir() / root).catch - - if not dirExists(root): - return failure "directory does not exist: " & root - - success T( - root: root, - ignoreProtected: ignoreProtected, - depth: depth) + walker = dirWalker(basePath) + + var + iter = QueryIter.new() + + proc next(): Future[?!QueryResponse] {.async.} = + let + path = walker() + + if finished(walker): + iter.finished = true + return success (Key.none, EmptyBytes) + + var + keyPath = basePath + + keyPath.removePrefix(self.root) + keyPath = keyPath / path.changeFileExt("") + keyPath = keyPath.replace("\\", "/") + + let + key = Key.init(keyPath).expect("should not fail") + data = + if query.value: + self.readFile((basePath / path).absolutePath) + .expect("Should read file") + else: + @[] + + return success (key.some, data) + + iter.next = next + return success iter + + proc new*( + T: type FSDatastore, + root: string, + tp: Taskpool = nil, + depth = 2, + caseSensitive = true, + ignoreProtected = false): ?!FSDatastore = + + let root = ? ( + block: + if root.isAbsolute: root + else: getCurrentDir() / root).catch + + if not dirExists(root): + return failure "directory does not exist: " & root + + success FSDatastore( + root: root, + ignoreProtected: ignoreProtected, + depth: depth) diff --git a/datastore/query.nim b/datastore/query.nim index 1c51a6e2..f10376fe 100644 --- a/datastore/query.nim +++ b/datastore/query.nim @@ -1,27 +1,24 @@ import pkg/upraises +import std/algorithm import pkg/chronos import pkg/questionable import pkg/questionable/results import ./key import ./types +import ./threads/backend + +export types +export options, SortOrder type - SortOrder* {.pure.} = enum - Assending, - Descending - Query* = object - key*: Key # Key to be queried - value*: bool # Flag to indicate if data should be returned - limit*: int # Max items to return - not available in all backends - offset*: int # Offset from which to start querying - not available in all backends - sort*: SortOrder # Sort order - not available in all backends + ## Front end types + Query* = DbQuery[Key] - QueryResponse* = tuple[key: ?Key, data: seq[byte]] - QueryEndedError* = object of DatastoreError + QueryResponse* = DbQueryResponse[Key, seq[byte]] - GetNext* = proc(): Future[?!QueryResponse] {.upraises: [], gcsafe, closure.} + GetNext* = proc(): Future[?!QueryResponse] {.upraises: [], gcsafe.} IterDispose* = proc(): Future[?!void] {.upraises: [], gcsafe.} QueryIter* = ref object finished*: bool @@ -38,17 +35,13 @@ proc defaultDispose(): Future[?!void] {.upraises: [], gcsafe, async.} = proc new*(T: type QueryIter, dispose = defaultDispose): T = QueryIter(dispose: dispose) -proc init*( - T: type Query, - key: Key, - value = true, - sort = SortOrder.Assending, - offset = 0, - limit = -1): T = - - T( - key: key, - value: value, - sort: sort, - offset: offset, - limit: limit) +proc init*(T: type Query, + key: Key, + value = true, + sort = SortOrder.Ascending, + offset = 0, + limit = -1): Query = + dbQuery[Key](key, value, sort, offset, limit) + +proc toKey*(key: KeyId): Key {.inline, raises: [].} = + Key.init($key.data).expect("expected valid key here for but got `" & $key.data & "`") diff --git a/datastore/sql.nim b/datastore/sql.nim index 9c30c9e5..fa71d848 100644 --- a/datastore/sql.nim +++ b/datastore/sql.nim @@ -1,3 +1,295 @@ -import ./sql/sqliteds -export sqliteds +import std/os +import std/options +import std/strutils + +import pkg/questionable +import pkg/questionable/results +from pkg/stew/results as stewResults import get, isErr +import pkg/upraises +import pkg/chronos +import pkg/taskpools + +import ./threads/sqlbackend +import ./threads/threadproxy +import ./datastore + +export datastore, keys, query, Taskpool, Memory, DbExt + +push: {.upraises: [].} + +when datastoreUseAsync: + + type + SQLiteDatastore* = ref object of Datastore + db: ThreadProxy[SQLiteBackend[KeyId, DataBuffer]] + + proc path*(self: SQLiteDatastore): string = + self.db.backend.path() + + proc readOnly*(self: SQLiteDatastore): bool = + self.db.backend.readOnly() + + method has*(self: SQLiteDatastore, + key: Key): Future[?!bool] {.async.} = + await self.db.has(key) + + method delete*(self: SQLiteDatastore, + key: Key): Future[?!void] {.async.} = + await self.db.delete(key) + + method delete*(self: SQLiteDatastore, + keys: seq[Key]): Future[?!void] {.async.} = + await self.db.delete(keys) + + method get*(self: SQLiteDatastore, + key: Key): Future[?!seq[byte]] {.async.} = + await self.db.get(key) + + method put*(self: SQLiteDatastore, + key: Key, + data: seq[byte]): Future[?!void] {.async.} = + await self.db.put(key, data) + + method put*(self: SQLiteDatastore, + batch: seq[BatchEntry]): Future[?!void] {.async.} = + await self.db.put(batch) + + method close*(self: SQLiteDatastore): Future[?!void] {.async.} = + await self.db.close() + + method query*(self: SQLiteDatastore, + q: Query): Future[?!QueryIter] {.async.} = + await self.db.query(q) + + proc new*( + T: type SQLiteDatastore, + path: string, + tp: Taskpool = Taskpool.new(4), + readOnly = false): ?!SQLiteDatastore = + + let + backend = ? newSQLiteBackend[KeyId, DataBuffer](path, readOnly) + db = ? ThreadProxy.new(backend, tp = tp) + success SQLiteDatastore(db: db) + +else: + + type + SQLiteDatastore* = ref object of Datastore + readOnly: bool + db: SQLiteDsDb[string, seq[byte]] + + proc path*(self: SQLiteDatastore): string = + $self.db.dbPath + + proc `readOnly=`*(self: SQLiteDatastore): bool + {.error: "readOnly should not be assigned".} + + method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} = + var + exists = false + + proc onData(s: RawStmtPtr) = + exists = sqlite3_column_int64(s, ContainsStmtExistsCol.cint).bool + + if err =? self.db.containsStmt.query((key.id), onData).errorOption: + return failure err + + return success exists + + method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async.} = + return self.db.deleteStmt.exec((key.id)) + + method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.} = + if err =? self.db.beginStmt.exec().errorOption: + return failure(err) + + for key in keys: + if err =? self.db.deleteStmt.exec((key.id)).errorOption: + if err =? self.db.rollbackStmt.exec().errorOption: + return failure err.msg + + return failure err.msg + + if err =? self.db.endStmt.exec().errorOption: + return failure err.msg + + return success() + + method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} = + # see comment in ./filesystem_datastore re: finer control of memory + # allocation in `method get`, could apply here as well if bytes were read + # incrementally with `sqlite3_blob_read` + + var + bytes: seq[byte] + + proc onData(s: RawStmtPtr) = + bytes = dataCol[seq[byte]](self.db.getDataCol) + + if err =? self.db.getStmt.query((key.id), onData).errorOption: + return failure(err) + + if bytes.len <= 0: + return failure( + newException(DatastoreKeyNotFound, "Key doesn't exist")) + + return success bytes + + method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} = + return self.db.putStmt.exec((key.id, data, timestamp())) + + method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} = + if err =? self.db.beginStmt.exec().errorOption: + return failure err + + for entry in batch: + if err =? self.db.putStmt.exec((entry.key.id, entry.data, timestamp())).errorOption: + if err =? self.db.rollbackStmt.exec().errorOption: + return failure err + + return failure err + + if err =? self.db.endStmt.exec().errorOption: + return failure err + + return success() + + method close*(self: SQLiteDatastore): Future[?!void] {.async.} = + self.db.close() + + return success() + + method query*( + self: SQLiteDatastore, + query: Query): Future[?!QueryIter] {.async.} = + + var + iter = QueryIter() + queryStr = if query.value: + QueryStmtDataIdStr + else: + QueryStmtIdStr + + if query.sort == SortOrder.Descending: + queryStr &= QueryStmtOrderDescending + else: + queryStr &= QueryStmtOrderAscending + + if query.limit != 0: + queryStr &= QueryStmtLimit + + if query.offset != 0: + queryStr &= QueryStmtOffset + + let + queryStmt = QueryStmt.prepare( + self.db.env, queryStr).expect("should not fail") + + s = RawStmtPtr(queryStmt) + + var + v = sqlite3_bind_text( + s, 1.cint, (query.key.id & "*").cstring, -1.cint, SQLITE_TRANSIENT_GCSAFE) + + if not (v == SQLITE_OK): + return failure newException(DatastoreError, $sqlite3_errstr(v)) + + if query.limit != 0: + v = sqlite3_bind_int(s, 2.cint, query.limit.cint) + + if not (v == SQLITE_OK): + return failure newException(DatastoreError, $sqlite3_errstr(v)) + + if query.offset != 0: + v = sqlite3_bind_int(s, 3.cint, query.offset.cint) + + if not (v == SQLITE_OK): + return failure newException(DatastoreError, $sqlite3_errstr(v)) + + proc next(): Future[?!QueryResponse] {.async.} = + if iter.finished: + return failure(newException(QueryEndedError, "Calling next on a finished query!")) + + let + v = sqlite3_step(s) + + case v + of SQLITE_ROW: + let + key = Key.init( + $sqlite3_column_text_not_null(s, QueryStmtIdCol)) + .expect("should not fail") + + blob: ?pointer = + if query.value: + sqlite3_column_blob(s, QueryStmtDataCol).some + else: + pointer.none + + # detect out-of-memory error + # see the conversion table and final paragraph of: + # https://www.sqlite.org/c3ref/column_blob.html + # see also https://www.sqlite.org/rescode.html + + # the "data" column can be NULL so in order to detect an out-of-memory + # error it is necessary to check that the result is a null pointer and + # that the result code is an error code + if blob.isSome and blob.get().isNil: + let + v = sqlite3_errcode(sqlite3_db_handle(s)) + + if not (v in [SQLITE_OK, SQLITE_ROW, SQLITE_DONE]): + iter.finished = true + return failure newException(DatastoreError, $sqlite3_errstr(v)) + + let + dataLen = sqlite3_column_bytes(s, QueryStmtDataCol) + data = if blob.isSome: + @( + toOpenArray(cast[ptr UncheckedArray[byte]](blob.get), + 0, + dataLen - 1)) + else: + @[] + + return success (key.some, data) + of SQLITE_DONE: + iter.finished = true + return success (Key.none, EmptyBytes) + else: + iter.finished = true + return failure newException(DatastoreError, $sqlite3_errstr(v)) + + iter.dispose = proc(): Future[?!void] {.async.} = + discard sqlite3_reset(s) + discard sqlite3_clear_bindings(s) + s.dispose + return success() + + iter.next = next + return success iter + + proc new*( + T: type SQLiteDatastore, + path: string, + tp: Taskpool = nil, + readOnly = false): ?!T = + + let + flags = + if readOnly: SQLITE_OPEN_READONLY + else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE + + success SQLiteDatastore( + db: ? SQLiteDsDb[string, seq[byte]].open(path, flags), + readOnly: readOnly) + + proc new*( + T: type SQLiteDatastore, + db: SQLiteDsDb): ?!SQLiteDatastore = + + success SQLiteDatastore( + db: db, + readOnly: db.readOnly) \ No newline at end of file diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim deleted file mode 100644 index aa632747..00000000 --- a/datastore/sql/sqliteds.nim +++ /dev/null @@ -1,237 +0,0 @@ -import std/times -import std/options - -import pkg/chronos -import pkg/questionable -import pkg/questionable/results -import pkg/sqlite3_abi -from pkg/stew/results as stewResults import isErr -import pkg/upraises - -import ../datastore -import ./sqlitedsdb - -export datastore, sqlitedsdb - -push: {.upraises: [].} - -type - SQLiteDatastore* = ref object of Datastore - readOnly: bool - db: SQLiteDsDb - -proc path*(self: SQLiteDatastore): string = - self.db.dbPath - -proc `readOnly=`*(self: SQLiteDatastore): bool - {.error: "readOnly should not be assigned".} - -proc timestamp*(t = epochTime()): int64 = - (t * 1_000_000).int64 - -method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} = - var - exists = false - - proc onData(s: RawStmtPtr) = - exists = sqlite3_column_int64(s, ContainsStmtExistsCol.cint).bool - - if err =? self.db.containsStmt.query((key.id), onData).errorOption: - return failure err - - return success exists - -method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async.} = - return self.db.deleteStmt.exec((key.id)) - -method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.} = - if err =? self.db.beginStmt.exec().errorOption: - return failure(err) - - for key in keys: - if err =? self.db.deleteStmt.exec((key.id)).errorOption: - if err =? self.db.rollbackStmt.exec().errorOption: - return failure err.msg - - return failure err.msg - - if err =? self.db.endStmt.exec().errorOption: - return failure err.msg - - return success() - -method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} = - # see comment in ./filesystem_datastore re: finer control of memory - # allocation in `method get`, could apply here as well if bytes were read - # incrementally with `sqlite3_blob_read` - - var - bytes: seq[byte] - - proc onData(s: RawStmtPtr) = - bytes = self.db.getDataCol() - - if err =? self.db.getStmt.query((key.id), onData).errorOption: - return failure(err) - - if bytes.len <= 0: - return failure( - newException(DatastoreKeyNotFound, "Key doesn't exist")) - - return success bytes - -method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} = - return self.db.putStmt.exec((key.id, data, timestamp())) - -method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} = - if err =? self.db.beginStmt.exec().errorOption: - return failure err - - for entry in batch: - if err =? self.db.putStmt.exec((entry.key.id, entry.data, timestamp())).errorOption: - if err =? self.db.rollbackStmt.exec().errorOption: - return failure err - - return failure err - - if err =? self.db.endStmt.exec().errorOption: - return failure err - - return success() - -method close*(self: SQLiteDatastore): Future[?!void] {.async.} = - self.db.close() - - return success() - -method query*( - self: SQLiteDatastore, - query: Query): Future[?!QueryIter] {.async.} = - - var - iter = QueryIter() - queryStr = if query.value: - QueryStmtDataIdStr - else: - QueryStmtIdStr - - if query.sort == SortOrder.Descending: - queryStr &= QueryStmtOrderDescending - else: - queryStr &= QueryStmtOrderAscending - - if query.limit != 0: - queryStr &= QueryStmtLimit - - if query.offset != 0: - queryStr &= QueryStmtOffset - - let - queryStmt = QueryStmt.prepare( - self.db.env, queryStr).expect("should not fail") - - s = RawStmtPtr(queryStmt) - - var - v = sqlite3_bind_text( - s, 1.cint, (query.key.id & "*").cstring, -1.cint, SQLITE_TRANSIENT_GCSAFE) - - if not (v == SQLITE_OK): - return failure newException(DatastoreError, $sqlite3_errstr(v)) - - if query.limit != 0: - v = sqlite3_bind_int(s, 2.cint, query.limit.cint) - - if not (v == SQLITE_OK): - return failure newException(DatastoreError, $sqlite3_errstr(v)) - - if query.offset != 0: - v = sqlite3_bind_int(s, 3.cint, query.offset.cint) - - if not (v == SQLITE_OK): - return failure newException(DatastoreError, $sqlite3_errstr(v)) - - proc next(): Future[?!QueryResponse] {.async.} = - if iter.finished: - return failure(newException(QueryEndedError, "Calling next on a finished query!")) - - let - v = sqlite3_step(s) - - case v - of SQLITE_ROW: - let - key = Key.init( - $sqlite3_column_text_not_null(s, QueryStmtIdCol)) - .expect("should not fail") - - blob: ?pointer = - if query.value: - sqlite3_column_blob(s, QueryStmtDataCol).some - else: - pointer.none - - # detect out-of-memory error - # see the conversion table and final paragraph of: - # https://www.sqlite.org/c3ref/column_blob.html - # see also https://www.sqlite.org/rescode.html - - # the "data" column can be NULL so in order to detect an out-of-memory - # error it is necessary to check that the result is a null pointer and - # that the result code is an error code - if blob.isSome and blob.get().isNil: - let - v = sqlite3_errcode(sqlite3_db_handle(s)) - - if not (v in [SQLITE_OK, SQLITE_ROW, SQLITE_DONE]): - iter.finished = true - return failure newException(DatastoreError, $sqlite3_errstr(v)) - - let - dataLen = sqlite3_column_bytes(s, QueryStmtDataCol) - data = if blob.isSome: - @( - toOpenArray(cast[ptr UncheckedArray[byte]](blob.get), - 0, - dataLen - 1)) - else: - @[] - - return success (key.some, data) - of SQLITE_DONE: - iter.finished = true - return success (Key.none, EmptyBytes) - else: - iter.finished = true - return failure newException(DatastoreError, $sqlite3_errstr(v)) - - iter.dispose = proc(): Future[?!void] {.async.} = - discard sqlite3_reset(s) - discard sqlite3_clear_bindings(s) - s.dispose - return success() - - iter.next = next - return success iter - -proc new*( - T: type SQLiteDatastore, - path: string, - readOnly = false): ?!T = - - let - flags = - if readOnly: SQLITE_OPEN_READONLY - else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE - - success T( - db: ? SQLiteDsDb.open(path, flags), - readOnly: readOnly) - -proc new*( - T: type SQLiteDatastore, - db: SQLiteDsDb): ?!T = - - success T( - db: db, - readOnly: db.readOnly) diff --git a/datastore/sql/sqlitedsdb.nim b/datastore/sql/sqlitedsdb.nim index 503dea45..06e1dcff 100644 --- a/datastore/sql/sqlitedsdb.nim +++ b/datastore/sql/sqlitedsdb.nim @@ -4,36 +4,37 @@ import pkg/questionable import pkg/questionable/results import pkg/upraises +import ../threads/backend import ./sqliteutils export sqliteutils type BoundIdCol* = proc (): string {.closure, gcsafe, upraises: [].} - BoundDataCol* = proc (): seq[byte] {.closure, gcsafe, upraises: [].} + BoundDataCol* = proc (): DataBuffer {.closure, gcsafe, upraises: [].} BoundTimestampCol* = proc (): int64 {.closure, gcsafe, upraises: [].} # feels odd to use `void` for prepared statements corresponding to SELECT # queries but it fits with the rest of the SQLite wrapper adapted from # status-im/nwaku, at least in its current form in ./sqlite - ContainsStmt* = SQLiteStmt[(string), void] - DeleteStmt* = SQLiteStmt[(string), void] - GetStmt* = SQLiteStmt[(string), void] - PutStmt* = SQLiteStmt[(string, seq[byte], int64), void] + ContainsStmt*[K] = SQLiteStmt[(K), void] + DeleteStmt*[K] = SQLiteStmt[(K), void] + GetStmt*[K] = SQLiteStmt[(K), void] + PutStmt*[K, V] = SQLiteStmt[(K, V, int64), void] QueryStmt* = SQLiteStmt[(string), void] BeginStmt* = NoParamsStmt EndStmt* = NoParamsStmt RollbackStmt* = NoParamsStmt - SQLiteDsDb* = object + SQLiteDsDb*[K: DbKey, V: DbVal] = object readOnly*: bool - dbPath*: string - containsStmt*: ContainsStmt - deleteStmt*: DeleteStmt + dbPath*: DataBuffer + containsStmt*: ContainsStmt[K] + deleteStmt*: DeleteStmt[K] env*: SQLite - getDataCol*: BoundDataCol - getStmt*: GetStmt - putStmt*: PutStmt + getDataCol*: (RawStmtPtr, int) + getStmt*: GetStmt[K] + putStmt*: PutStmt[K,V] beginStmt*: BeginStmt endStmt*: EndStmt rollbackStmt*: RollbackStmt @@ -156,37 +157,38 @@ proc idCol*( return proc (): string = $sqlite3_column_text_not_null(s, index.cint) -proc dataCol*( - s: RawStmtPtr, - index: int): BoundDataCol = +proc dataCol*[V: DbVal](data: (RawStmtPtr, int)): V = + + let s = data[0] + let index = data[1] checkColMetadata(s, index, DataColName) - return proc (): seq[byte] = + let + i = index.cint + blob = sqlite3_column_blob(s, i) + + # detect out-of-memory error + # see the conversion table and final paragraph of: + # https://www.sqlite.org/c3ref/column_blob.html + # see also https://www.sqlite.org/rescode.html + + # the "data" column can be NULL so in order to detect an out-of-memory error + # it is necessary to check that the result is a null pointer and that the + # result code is an error code + if blob.isNil: let - i = index.cint - blob = sqlite3_column_blob(s, i) - - # detect out-of-memory error - # see the conversion table and final paragraph of: - # https://www.sqlite.org/c3ref/column_blob.html - # see also https://www.sqlite.org/rescode.html + v = sqlite3_errcode(sqlite3_db_handle(s)) - # the "data" column can be NULL so in order to detect an out-of-memory error - # it is necessary to check that the result is a null pointer and that the - # result code is an error code - if blob.isNil: - let - v = sqlite3_errcode(sqlite3_db_handle(s)) + if not (v in [SQLITE_OK, SQLITE_ROW, SQLITE_DONE]): + raise (ref Defect)(msg: $sqlite3_errstr(v)) - if not (v in [SQLITE_OK, SQLITE_ROW, SQLITE_DONE]): - raise (ref Defect)(msg: $sqlite3_errstr(v)) - - let - dataLen = sqlite3_column_bytes(s, i) - dataBytes = cast[ptr UncheckedArray[byte]](blob) + let + dataLen = sqlite3_column_bytes(s, i) + dataBytes = cast[ptr UncheckedArray[byte]](blob) - @(toOpenArray(dataBytes, 0, dataLen - 1)) + # copy data out, since sqlite will free it + V.toVal(toOpenArray(dataBytes, 0, dataLen - 1)) proc timestampCol*( s: RawStmtPtr, @@ -211,7 +213,7 @@ proc getDBFilePath*(path: string): ?!string = except CatchableError as exc: return failure(exc.msg) -proc close*(self: SQLiteDsDb) = +proc close*[K, V](self: SQLiteDsDb[K, V]) = self.containsStmt.dispose self.getStmt.dispose self.beginStmt.dispose @@ -226,10 +228,10 @@ proc close*(self: SQLiteDsDb) = self.env.dispose -proc open*( - T: type SQLiteDsDb, +proc open*[K,V]( + T: type SQLiteDsDb[K,V], path = Memory, - flags = SQLITE_OPEN_READONLY): ?!SQLiteDsDb = + flags = SQLITE_OPEN_READONLY): ?!SQLiteDsDb[K, V] = # make it optional to enable WAL with it enabled being the default? @@ -262,10 +264,10 @@ proc open*( checkExec(pragmaStmt) var - containsStmt: ContainsStmt - deleteStmt: DeleteStmt - getStmt: GetStmt - putStmt: PutStmt + containsStmt: ContainsStmt[K] + deleteStmt: DeleteStmt[K] + getStmt: GetStmt[K] + putStmt: PutStmt[K,V] beginStmt: BeginStmt endStmt: EndStmt rollbackStmt: RollbackStmt @@ -273,10 +275,10 @@ proc open*( if not readOnly: checkExec(env.val, CreateStmtStr) - deleteStmt = ? DeleteStmt.prepare( + deleteStmt = ? DeleteStmt[K].prepare( env.val, DeleteStmtStr, SQLITE_PREPARE_PERSISTENT) - putStmt = ? PutStmt.prepare( + putStmt = ? PutStmt[K,V].prepare( env.val, PutStmtStr, SQLITE_PREPARE_PERSISTENT) beginStmt = ? BeginStmt.prepare( @@ -288,10 +290,10 @@ proc open*( rollbackStmt = ? RollbackStmt.prepare( env.val, RollbackTransactionStr, SQLITE_PREPARE_PERSISTENT) - containsStmt = ? ContainsStmt.prepare( + containsStmt = ? ContainsStmt[K].prepare( env.val, ContainsStmtStr, SQLITE_PREPARE_PERSISTENT) - getStmt = ? GetStmt.prepare( + getStmt = ? GetStmt[K].prepare( env.val, GetStmtStr, SQLITE_PREPARE_PERSISTENT) # if a readOnly/existing database does not satisfy the expected schema @@ -299,11 +301,11 @@ proc open*( # "SQL logic error" let - getDataCol = dataCol(RawStmtPtr(getStmt), GetStmtDataCol) + getDataCol = (RawStmtPtr(getStmt), GetStmtDataCol) - success T( + success SQLiteDsDb[K,V]( readOnly: readOnly, - dbPath: path, + dbPath: DataBuffer.new path, containsStmt: containsStmt, deleteStmt: deleteStmt, env: env.release, diff --git a/datastore/sql/sqliteutils.nim b/datastore/sql/sqliteutils.nim index bffb6e8c..ea9bbcb1 100644 --- a/datastore/sql/sqliteutils.nim +++ b/datastore/sql/sqliteutils.nim @@ -3,6 +3,8 @@ import pkg/questionable/results import pkg/sqlite3_abi import pkg/upraises +import ../threads/backend + export sqlite3_abi # Adapted from: @@ -20,7 +22,7 @@ type AutoDisposed*[T: ptr|ref] = object val*: T - DataProc* = proc(s: RawStmtPtr) {.closure, gcsafe.} + DataProc* = proc(s: RawStmtPtr) {.gcsafe.} NoParams* = tuple # empty tuple @@ -44,13 +46,14 @@ proc bindParam( n: int, val: auto): cint = - when val is openArray[byte]|seq[byte]: + when val is openArray[byte]|seq[byte]|DataBuffer: if val.len > 0: # `SQLITE_TRANSIENT` "indicate[s] that the object is to be copied prior # to the return from sqlite3_bind_*(). The object and pointer to it # must remain valid until then. SQLite will then manage the lifetime of # its private copy." - sqlite3_bind_blob(s, n.cint, unsafeAddr val[0], val.len.cint, + var val = val + sqlite3_bind_blob(s, n.cint, addr val[0], val.len.cint, SQLITE_TRANSIENT) else: sqlite3_bind_null(s, n.cint) @@ -66,7 +69,10 @@ proc bindParam( # to the return from sqlite3_bind_*(). The object and pointer to it must # remain valid until then. SQLite will then manage the lifetime of its # private copy." - sqlite3_bind_text(s, n.cint, val.cstring, -1.cint, SQLITE_TRANSIENT) + sqlite3_bind_text(s, n.cint, val.cstring, val.len().cint, SQLITE_TRANSIENT) + elif val is KeyId: + # same as previous + sqlite3_bind_text(s, n.cint, cast[cstring](baseAddr val.data), val.data.len().cint, SQLITE_TRANSIENT) else: {.fatal: "Please add support for the '" & $typeof(val) & "' type".} diff --git a/datastore/threads/asyncsemaphore.nim b/datastore/threads/asyncsemaphore.nim new file mode 100644 index 00000000..b561882c --- /dev/null +++ b/datastore/threads/asyncsemaphore.nim @@ -0,0 +1,101 @@ +# Nim-Libp2p +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +{.push raises: [].} + +import sequtils +import chronos, chronicles + +# TODO: this should probably go in chronos + +logScope: + topics = "datastore semaphore" + +type + AsyncSemaphore* = ref object of RootObj + size*: int + count: int + exit: bool + queue: seq[Future[void]] + +func new*(_: type AsyncSemaphore, size: int): AsyncSemaphore = + AsyncSemaphore(size: size, count: size) + +proc `count`*(s: AsyncSemaphore): int = s.count + +proc closeAll*(s: AsyncSemaphore) {.async.} = + s.exit = true + await allFutures(s.queue) + +proc tryAcquire*(s: AsyncSemaphore): bool = + ## Attempts to acquire a resource, if successful + ## returns true, otherwise false + ## + + if s.count > 0 and s.queue.len == 0 and not s.exit: + s.count.dec + trace "Acquired slot", available = s.count, queue = s.queue.len + return true + +proc acquire*(s: AsyncSemaphore): Future[void] = + ## Acquire a resource and decrement the resource + ## counter. If no more resources are available, + ## the returned future will not complete until + ## the resource count goes above 0. + ## + + let fut = newFuture[void]("AsyncSemaphore.acquire") + if s.tryAcquire(): + fut.complete() + return fut + + proc cancellation(udata: pointer) {.gcsafe.} = + fut.cancelCallback = nil + if not fut.finished: + s.queue.keepItIf( it != fut ) + + fut.cancelCallback = cancellation + + s.queue.add(fut) + + trace "Queued slot", available = s.count, queue = s.queue.len + return fut + +proc forceAcquire*(s: AsyncSemaphore) = + ## ForceAcquire will always succeed, + ## creating a temporary slot if required. + ## This temporary slot will stay usable until + ## there is less `acquire`s than `release`s + s.count.dec + +proc release*(s: AsyncSemaphore) = + ## Release a resource from the semaphore, + ## by picking the first future from the queue + ## and completing it and incrementing the + ## internal resource count + ## + + doAssert(s.count <= s.size) + + if s.count < s.size: + trace "Releasing slot", available = s.count, + queue = s.queue.len + + s.count.inc + while s.queue.len > 0: + var fut = s.queue[0] + s.queue.delete(0) + if not fut.finished(): + s.count.dec + fut.complete() + break + + trace "Released slot", available = s.count, + queue = s.queue.len + return diff --git a/datastore/threads/backend.nim b/datastore/threads/backend.nim new file mode 100644 index 00000000..2faf9b76 --- /dev/null +++ b/datastore/threads/backend.nim @@ -0,0 +1,66 @@ +import std/algorithm +import std/options + +import pkg/questionable/results + +import ./databuffer +import ../types +import ../key + +export databuffer, types, key, SortOrder + +## Types for datastore backends. +## +## These should be syncrhonous and work with both GC types +## and DataBuffer's. This makes it easier to make them threadsafe. +## + +type + DbQueryResponse*[K, V] = tuple[key: Option[K], data: V] + + DbQuery*[K] = object + key*: K # Key to be queried + value*: bool # Flag to indicate if data should be returned + limit*: int # Max items to return - not available in all backends + offset*: int # Offset from which to start querying - not available in all backends + sort*: SortOrder # Sort order - not available in all backends + + KeyId* = object + ## serialized Key ID, equivalent to `key.id()` + data*: DataBuffer + + ## Accepted backend key and value types + DbKey* = string | KeyId + DbVal* = seq[byte] | DataBuffer + + DbBatchEntry*[K, V] = tuple[key: K, data: V] + + DbQueryHandle*[K, V, T] = object + query*: DbQuery[K] + cancel*: bool + closed*: bool + env*: T + +proc dbQuery*[K]( + key: K, + value = false, + sort = SortOrder.Ascending, + offset = 0, + limit = -1 +): DbQuery[K] = + DbQuery[K](key: key, value: value, sort: sort, offset: offset, limit: limit) + +proc `$`*(id: KeyId): string = $(id.data) + +proc toKey*(tp: typedesc[KeyId], id: string|cstring): KeyId = KeyId.new($id) +proc toKey*(tp: typedesc[string], id: string|cstring): string = $(id) +# proc toKey*(tp: typedesc[Key], id: string|cstring): KeyId = Key.init($id).expect("valid key") + +template toVal*(tp: typedesc[DataBuffer], id: openArray[byte]): DataBuffer = DataBuffer.new(id) +template toVal*(tp: typedesc[seq[byte]], id: openArray[byte]): seq[byte] = @(id) + +proc new*(tp: typedesc[KeyId], id: cstring): KeyId = + KeyId(data: DataBuffer.new(id.toOpenArray(0, id.len()-1))) + +proc new*(tp: typedesc[KeyId], id: string): KeyId = + KeyId(data: DataBuffer.new(id)) diff --git a/datastore/threads/databuffer.nim b/datastore/threads/databuffer.nim new file mode 100644 index 00000000..dc8b927d --- /dev/null +++ b/datastore/threads/databuffer.nim @@ -0,0 +1,141 @@ +import threading/smartptrs +import std/hashes +import pkg/stew/ptrops + +export hashes + +type + DataBufferOpt* = enum + dbNullTerminate + + DataBufferHolder* = object + buf: ptr UncheckedArray[byte] + size: int + cap: int + + DataBuffer* = SharedPtr[DataBufferHolder] ##\ + ## A fixed length data buffer using a SharedPtr. + ## It is thread safe even with `refc` since + ## it doesn't use string or seq types internally. + ## + +proc `=destroy`*(x: var DataBufferHolder) = + ## copy pointer implementation + ## + + if x.buf != nil: + # echo "buffer: FREE: ", repr x.buf.pointer + deallocShared(x.buf) + +proc len*(a: DataBuffer): int = + if a.isNil: 0 else: a[].size +proc capacity*(a: DataBuffer): int = + if a.isNil: 0 else: a[].cap + +proc isNil*(a: DataBuffer): bool = smartptrs.isNil(a) + +proc hash*(a: DataBuffer): Hash = + a[].buf.toOpenArray(0, a[].size-1).hash() + +proc `[]`*(db: DataBuffer, idx: int): var byte = + if idx >= db.len(): + raise newException(IndexDefect, "index out of bounds") + db[].buf[idx] + +template `==`*[T: char | byte](a: DataBuffer, b: openArray[T]): bool = + if a.isNil: false + elif a[].size != b.len: false + else: a.hash() == b.hash() + +proc new*(tp: type DataBuffer, size: int = 0): DataBuffer = + ## allocate new buffer with given capacity + ## + + newSharedPtr(DataBufferHolder( + buf: cast[typeof(result[].buf)](allocShared0(size)), + size: size, + cap: size, + )) + +proc new*[T: byte | char](tp: type DataBuffer, data: openArray[T], opts: set[DataBufferOpt] = {}): DataBuffer = + ## allocate new buffer and copies indata from openArray + ## + let dataCap = + if dbNullTerminate in opts: data.len() + 1 + else: data.len() + result = DataBuffer.new(dataCap) + if data.len() > 0: + copyMem(result[].buf, baseAddr data, data.len()) + result[].size = data.len() + +# proc new*(tp: type DataBuffer, data: pointer, first, last: int): DataBuffer = +# DataBuffer.new(toOpenArray(cast[ptr UncheckedArray[byte]](data), first, last)) + +proc baseAddr*(db: DataBuffer): pointer = + db[].buf + +proc clear*(db: DataBuffer) = + zeroMem(db[].buf, db[].cap) + db[].size = 0 + +proc setData*[T: byte | char](db: DataBuffer, data: openArray[T]) = + ## allocate new buffer and copies indata from openArray + ## + if data.len() > db[].cap: + raise newException(IndexDefect, "data too large for buffer") + db.clear() # this is expensive, but we can optimize later + copyMem(db[].buf, baseAddr data, data.len()) + db[].size = data.len() + +proc toSequence*(self: DataBuffer): seq[byte] = + ## convert buffer to a seq type using copy and either a byte or char + ## + + result = newSeq[byte](self.len) + if self.len() > 0: + copyMem(addr result[0], addr self[].buf[0], self.len) + +proc `@`*(self: DataBuffer): seq[byte] = + ## Convert a buffer to a seq type using copy and + ## either a byte or char + ## + + self.toSequence() + +proc toString*(data: DataBuffer): string = + ## convert buffer to string type using copy + ## + + if data.isNil: return "" + result = newString(data.len()) + if data.len() > 0: + copyMem(addr result[0], addr data[].buf[0], data.len) + +proc `$`*(data: DataBuffer): string = + ## convert buffer to string type using copy + ## + + data.toString() + +proc `==`*(a, b: DataBuffer): bool = + if a.isNil and b.isNil: result = true + elif a.isNil or b.isNil: result = false + elif a[].size != b[].size: result = false + elif a[].buf == b[].buf: result = true + else: result = a.hash() == b.hash() + +converter toBuffer*(err: ref CatchableError): DataBuffer = + ## convert exception to an object with StringBuffer + ## + + return DataBuffer.new(err.msg) + +template toOpenArray*[T: byte | char](data: var DataBuffer, t: typedesc[T]): var openArray[T] = + ## get openArray from DataBuffer as char + ## + ## this is explicit since sqlite treats string differently from openArray[byte] + var bf = cast[ptr UncheckedArray[T]](data[].buf) + bf.toOpenArray(0, data[].size-1) + +template toOpenArray*(data: var DataBuffer, first, last: int): var openArray[byte] = + toOpenArray(data, byte).toOpenArray(first, last) diff --git a/datastore/threads/fsbackend.nim b/datastore/threads/fsbackend.nim new file mode 100644 index 00000000..9a22f25b --- /dev/null +++ b/datastore/threads/fsbackend.nim @@ -0,0 +1,317 @@ +import std/os +import std/options +import std/strutils +import std/tempfiles + +import pkg/questionable +import pkg/questionable/results +from pkg/stew/results as stewResults import get, isErr +import pkg/upraises + +import ./backend + +export backend + +push: {.upraises: [].} + +import std/sharedtables + +type + KeyLock = tuple[locked: bool] + +var keyTable: SharedTable[KeyId, KeyLock] +keyTable.init() + +template lockKeyImpl(key: KeyId, blk: untyped): untyped = + var hasLock = false + try: + while not hasLock: + keyTable.withKey(key) do (k: KeyId, klock: var KeyLock, exists: var bool): + if not exists or not klock.locked: + klock.locked = true + exists = true + hasLock = klock.locked + + `blk` + finally: + if hasLock: + keyTable.withKey(key) do (k: KeyId, klock: var KeyLock, exists: var bool): + assert exists and klock.locked + klock.locked = false + exists = false + +template withReadLock(key: KeyId, blk: untyped): untyped = + lockKeyImpl(key, blk) + +template withWriteLock(key: KeyId, blk: untyped): untyped = + lockKeyImpl(key, blk) + +type + FSBackend*[K, V] = object + root*: DataBuffer + ignoreProtected: bool + depth*: int + +proc isRootSubdir*(root, path: string): bool = + path.startsWith(root) + +proc validDepth*(self: FSBackend, key: Key): bool = + key.len <= self.depth + +proc findPath*[K,V](self: FSBackend[K,V], key: K): ?!string = + ## Return filename corresponding to the key + ## or failure if the key doesn't correspond to a valid filename + ## + let root = $self.root + let key = Key.init($key).get() + if not self.validDepth(key): + return failure "Path has invalid depth!" + + var + segments: seq[string] + + for ns in key: + let basename = ns.value.extractFilename + if basename == "" or not basename.isValidFilename: + return failure "Filename contains invalid chars!" + + if ns.field == "": + segments.add(ns.value) + else: + let basename = ns.field.extractFilename + if basename == "" or not basename.isValidFilename: + return failure "Filename contains invalid chars!" + + # `:` are replaced with `/` + segments.add(ns.field / ns.value) + + let + fullname = (root / segments.joinPath()) + .absolutePath() + .catch() + .get() + .addFileExt(FileExt) + + if not root.isRootSubdir(fullname): + return failure "Path is outside of `root` directory!" + + return success fullname + +proc has*[K,V](self: FSBackend[K,V], key: K): ?!bool = + without path =? self.findPath(key), error: + return failure error + withReadLock(key): + success path.fileExists() + +proc contains*[K](self: FSBackend, key: K): bool = + return self.has(key).get() + +proc delete*[K,V](self: FSBackend[K,V], key: K): ?!void = + without path =? self.findPath(key), error: + return failure error + + if not path.fileExists(): + return success() + + try: + withWriteLock(key): + removeFile(path) + except OSError as e: + return failure e + + return success() + +proc delete*[K,V](self: FSBackend[K,V], keys: openArray[K]): ?!void = + for key in keys: + if err =? self.delete(key).errorOption: + return failure err + + return success() + +proc readFile[K, V](self: FSBackend, key: K, path: string): ?!V = + var + file: File + + defer: + file.close + + withReadLock(key): + if not file.open(path): + return failure "unable to open file! path: " & path + + try: + let + size = file.getFileSize().int + + when V is seq[byte]: + var bytes = newSeq[byte](size) + elif V is V: + var bytes = V.new(size=size) + else: + {.error: "unhandled result type".} + var + read = 0 + + # echo "BYTES: ", bytes.repr + while read < size: + read += file.readBytes(bytes.toOpenArray(0, size-1), read, size) + + if read < size: + return failure $read & " bytes were read from " & path & + " but " & $size & " bytes were expected" + + return success bytes + + except CatchableError as e: + return failure e + +proc get*[K,V](self: FSBackend[K,V], key: K): ?!V = + without path =? self.findPath(key), error: + return failure error + + if not path.fileExists(): + return failure( + newException(DatastoreKeyNotFound, "Key doesn't exist")) + + return readFile[K, V](self, key, path) + +proc put*[K,V](self: FSBackend[K,V], + key: K, + data: V + ): ?!void = + + without path =? self.findPath(key), error: + return failure error + + try: + var data = data + withWriteLock(KeyId.new path): + createDir(parentDir(path)) + + let tmpPath = genTempPath("temp", path.splitPath.tail) + writeFile(tmpPath, data.toOpenArray(0, data.len()-1)) + + withWriteLock(key): + try: + moveFile(tmpPath, path) + except Exception as e: + return failure e.msg + except CatchableError as e: + return failure e + + return success() + +proc put*[K,V]( + self: FSBackend, + batch: seq[DbBatchEntry[K, V]]): ?!void = + + for entry in batch: + if err =? self.put(entry.key, entry.data).errorOption: + return failure err + + return success() + +iterator dirIter(path: string): string {.gcsafe.} = + try: + for p in path.walkDirRec(yieldFilter = {pcFile}, relative = true): + yield p + except CatchableError as exc: + raise newException(Defect, exc.msg) + +proc close*[K,V](self: FSBackend[K,V]): ?!void = + return success() + +type + FsQueryHandle*[K, V] = object + query*: DbQuery[K] + cancel*: bool + closed*: bool + env*: FsQueryEnv[K,V] + + FsQueryEnv*[K,V] = object + self: FSBackend[K,V] + basePath: DataBuffer + +proc query*[K,V]( + self: FSBackend[K,V], + query: DbQuery[K], +): Result[FsQueryHandle[K, V], ref CatchableError] = + + let key = query.key + without path =? self.findPath(key), error: + return failure error + + let basePath = + # it there is a file in the directory + # with the same name then list the contents + # of the directory, otherwise recurse + # into subdirectories + if path.fileExists: + path.parentDir + else: + path.changeFileExt("") + + let env = FsQueryEnv[K,V](self: self, basePath: DataBuffer.new(basePath)) + success FsQueryHandle[K, V](query: query, env: env) + +proc close*[K,V](handle: var FsQueryHandle[K,V]) = + if not handle.closed: + handle.closed = true + +iterator queryIter*[K, V]( + handle: var FsQueryHandle[K, V] +): ?!DbQueryResponse[K, V] = + let root = $(handle.env.self.root) + let basePath = $(handle.env.basePath) + + for path in basePath.dirIter(): + if handle.cancel: + break + + var + basePath = $handle.env.basePath + keyPath = basePath + + keyPath.removePrefix(root) + keyPath = keyPath / path.changeFileExt("") + keyPath = keyPath.replace("\\", "/") + + let + flres = (basePath / path).absolutePath().catch + if flres.isErr(): + yield DbQueryResponse[K,V].failure flres.error() + continue + + let + key = K.toKey($Key.init(keyPath).expect("valid key")) + data = + if handle.query.value: + let res = readFile[K, V](handle.env.self, key, flres.get) + if res.isErr(): + yield DbQueryResponse[K,V].failure res.error() + continue + res.get() + else: + V.new() + + yield success (key.some, data) + handle.close() + +proc newFSBackend*[K,V](root: string, + depth = 2, + caseSensitive = true, + ignoreProtected = false + ): ?!FSBackend[K,V] = + + let root = ? ( + block: + if root.isAbsolute: root + else: getCurrentDir() / root).catch + + if not dirExists(root): + return failure "directory does not exist: " & root + + success FSBackend[K,V]( + root: DataBuffer.new root, + ignoreProtected: ignoreProtected, + depth: depth) diff --git a/datastore/threads/semaphore.nim b/datastore/threads/semaphore.nim new file mode 100644 index 00000000..5087e3f9 --- /dev/null +++ b/datastore/threads/semaphore.nim @@ -0,0 +1,58 @@ +import std/atomics +import std/locks + +type + Semaphore* = object + count: int + size: int + lock {.align: 64.}: Lock + cond: Cond + +func `=`*(dst: var Semaphore, src: Semaphore) {.error: "A semaphore cannot be copied".} +func `=sink`*(dst: var Semaphore, src: Semaphore) {.error: "An semaphore cannot be moved".} + +proc init*(_: type Semaphore, count: uint): Semaphore = + var + lock: Lock + cond: Cond + + lock.initLock() + cond.initCond() + + Semaphore(count: count.int, size: count.int, lock: lock, cond: cond) + +proc `=destroy`*(self: var Semaphore) = + self.lock.deinitLock() + self.cond.deinitCond() + +proc count*(self: var Semaphore): int = + self.count + +proc size*(self: var Semaphore): int = + self.size + +proc acquire*(self: var Semaphore) {.inline.} = + self.lock.acquire() + while self.count <= 0: + self.cond.wait(self.lock) + + self.count -= 1 + self.lock.release() + +proc release*(self: var Semaphore) {.inline.} = + self.lock.acquire() + if self.count <= 0: + self.count += 1 + self.cond.signal() + + doAssert not (self.count > self.size), + "Semaphore count is greather than size: " & $self.size & " count is: " & $self.count + + self.lock.release() + +template withSemaphore*(self: var Semaphore, body: untyped) = + self.acquire() + try: + body + finally: + self.release() diff --git a/datastore/threads/sqlbackend.nim b/datastore/threads/sqlbackend.nim new file mode 100644 index 00000000..fdbce970 --- /dev/null +++ b/datastore/threads/sqlbackend.nim @@ -0,0 +1,242 @@ +import std/times +import std/options + +import pkg/questionable +import pkg/questionable/results +import pkg/sqlite3_abi +from pkg/stew/results as stewResults import isErr +import pkg/upraises + +import ./backend +import ../sql/sqlitedsdb + +export backend, sqlitedsdb + +push: {.upraises: [].} + +type + SQLiteBackend*[K: DbKey, V: DbVal] = object + db: SQLiteDsDb[K, V] + +proc path*[K,V](self: SQLiteBackend[K,V]): string = + $self.db.dbPath + +proc readOnly*[K,V](self: SQLiteBackend[K,V]): bool = self.db.readOnly + +proc timestamp*(t = epochTime()): int64 = + (t * 1_000_000).int64 + +proc has*[K,V](self: SQLiteBackend[K,V], key: K): ?!bool = + var + exists = false + key = key + + proc onData(s: RawStmtPtr) = + exists = sqlite3_column_int64(s, ContainsStmtExistsCol.cint).bool + + if err =? self.db.containsStmt.query((key), onData).errorOption: + return failure err + + return success exists + +proc delete*[K,V](self: SQLiteBackend[K,V], key: K): ?!void = + return self.db.deleteStmt.exec((key)) + +proc delete*[K,V](self: SQLiteBackend[K,V], keys: openArray[K]): ?!void = + if err =? self.db.beginStmt.exec().errorOption: + return failure(err) + + for key in keys: + if err =? self.db.deleteStmt.exec((key)).errorOption: + if err =? self.db.rollbackStmt.exec().errorOption: + return failure err.msg + + return failure err.msg + + if err =? self.db.endStmt.exec().errorOption: + return failure err.msg + + return success() + +proc get*[K,V](self: SQLiteBackend[K,V], key: K): ?!V = + # see comment in ./filesystem_datastore re: finer control of memory + # allocation in `proc get`, could apply here as well if bytes were read + # incrementally with `sqlite3_blob_read` + + var bytes: V + + proc onData(s: RawStmtPtr) = + bytes = dataCol[V](self.db.getDataCol) + + if err =? self.db.getStmt.query((key), onData).errorOption: + return failure(err) + + if bytes.len <= 0: + return failure( + newException(DatastoreKeyNotFound, "key doesn't exist")) + + return success bytes + +proc put*[K,V](self: SQLiteBackend[K,V], key: K, data: V): ?!void = + return self.db.putStmt.exec((key, data, timestamp())) + +proc put*[K,V](self: SQLiteBackend[K,V], batch: openArray[DbBatchEntry[K,V]]): ?!void = + if err =? self.db.beginStmt.exec().errorOption: + return failure err + + for entry in batch: + let putStmt = self.db.putStmt + let item = (entry.key, entry.data, timestamp()) + if err =? putStmt.exec(item).errorOption: + if err =? self.db.rollbackStmt.exec().errorOption: + return failure err + + return failure err + + if err =? self.db.endStmt.exec().errorOption: + return failure err + + return success() + +proc close*[K,V](self: SQLiteBackend[K,V]): ?!void = + self.db.close() + + return success() + +type + SqQueryHandle*[K, V] = object + query*: DbQuery[K] + cancel*: bool + closed*: bool + env*: RawStmtPtr + +proc query*[K,V]( + self: SQLiteBackend[K,V], + query: DbQuery[K] +): Result[SqQueryHandle[K,V], ref CatchableError] = + + var + queryStr = if query.value: + QueryStmtDataIdStr + else: + QueryStmtIdStr + + case query.sort: + of Descending: + queryStr &= QueryStmtOrderDescending + of Ascending: + queryStr &= QueryStmtOrderAscending + + if query.limit != 0: + queryStr &= QueryStmtLimit + + if query.offset != 0: + queryStr &= QueryStmtOffset + + let + queryStmt = ? QueryStmt.prepare(self.db.env, queryStr) + + s = RawStmtPtr(queryStmt) + queryKey = $query.key & "*" + + var + v = sqlite3_bind_text( + s, 1.cint, queryKey.cstring, queryKey.len().cint, SQLITE_TRANSIENT_GCSAFE) + + if not (v == SQLITE_OK): + return failure newException(DatastoreError, $sqlite3_errstr(v)) + + if query.limit != 0: + v = sqlite3_bind_int(s, 2.cint, query.limit.cint) + + if not (v == SQLITE_OK): + return failure newException(DatastoreError, $sqlite3_errstr(v)) + + if query.offset != 0: + v = sqlite3_bind_int(s, 3.cint, query.offset.cint) + + if not (v == SQLITE_OK): + return failure newException(DatastoreError, $sqlite3_errstr(v)) + + success SqQueryHandle[K,V](query: query, env: s) + +proc close*[K,V](handle: var SqQueryHandle[K,V]) = + if not handle.closed: + handle.closed = true + discard sqlite3_reset(handle.env) + discard sqlite3_clear_bindings(handle.env) + handle.env.dispose() + +iterator queryIter*[K, V]( + handle: var SqQueryHandle[K, V] +): ?!DbQueryResponse[K, V] = + while not handle.cancel: + + let v = sqlite3_step(handle.env) + + case v + of SQLITE_ROW: + let + key = K.toKey(sqlite3_column_text_not_null(handle.env, QueryStmtIdCol)) + + blob: ?pointer = + if handle.query.value: sqlite3_column_blob(handle.env, QueryStmtDataCol).some + else: pointer.none + + # detect out-of-memory error + # see the conversion table and final paragraph of: + # https://www.sqlite.org/c3ref/column_blob.html + # see also https://www.sqlite.org/rescode.html + + # the "data" column can be NULL so in order to detect an out-of-memory + # error it is necessary to check that the result is a null pointer and + # that the result code is an error code + if blob.isSome and blob.get().isNil: + let v = sqlite3_errcode(sqlite3_db_handle(handle.env)) + + if not (v in [SQLITE_OK, SQLITE_ROW, SQLITE_DONE]): + handle.cancel = true + yield DbQueryResponse[K,V].failure newException(DatastoreError, $sqlite3_errstr(v)) + + let + dataLen = sqlite3_column_bytes(handle.env, QueryStmtDataCol) + data = + if blob.isSome: + let arr = cast[ptr UncheckedArray[byte]](blob) + V.toVal(arr.toOpenArray(0, dataLen-1)) + else: + var empty: array[0, byte] + V.toVal(empty.toOpenArray(0,-1)) + + yield success (key.some, data) + of SQLITE_DONE: + handle.close() + break + else: + handle.cancel = true + yield DbQueryResponse[K,V].failure newException(DatastoreError, $sqlite3_errstr(v)) + break + + handle.close() + + +proc contains*[K,V](self: SQLiteBackend[K,V], key: K): bool = + return self.has(key).get() + + +proc newSQLiteBackend*[K,V]( + path: string, + readOnly = false): ?!SQLiteBackend[K,V] = + + let + flags = + if readOnly: SQLITE_OPEN_READONLY + else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE + + success SQLiteBackend[K,V](db: ? SQLiteDsDb[K,V].open(path, flags)) + + +proc newSQLiteBackend*[K,V](db: SQLiteDsDb[K,V] + ): ?!SQLiteBackend[K,V] = + + success SQLiteBackend[K,V](db: db) diff --git a/datastore/threads/threadproxy.nim b/datastore/threads/threadproxy.nim new file mode 100644 index 00000000..3e2df5b9 --- /dev/null +++ b/datastore/threads/threadproxy.nim @@ -0,0 +1,381 @@ + +when not compileOption("threads"): + {.error: "This module requires --threads:on compilation flag".} + +import pkg/upraises + +push: {.upraises: [].} + +import std/tables +import std/locks +import std/sugar + + +import pkg/chronos +import pkg/chronos/threadsync +import pkg/questionable +import pkg/questionable/results +import pkg/taskpools +import std/isolation +import pkg/chronicles +import pkg/threading/smartptrs + +import ../key +import ../query +import ./backend +# import ./fsbackend +# import ./sqlbackend + +import ./asyncsemaphore +import ./databuffer +import ./threadresult + +export threadresult, smartptrs, isolation, chronicles + +logScope: + topics = "datastore threadproxy" + +type + + TaskCtxObj*[T: ThreadTypes] = object + res*: ThreadResult[T] + signal: ThreadSignalPtr + running*: bool ## used to mark when a task worker is running + cancelled*: bool ## used to cancel a task before it's started + nextSignal: ThreadSignalPtr + + TaskCtx*[T] = SharedPtr[TaskCtxObj[T]] + ## Task context object. + ## This is a SharedPtr to make the query iter simpler + + ThreadProxy*[BT] = object + tp: Taskpool + backend*: BT + semaphore: AsyncSemaphore # semaphore is used for backpressure \ + # to avoid exhausting file descriptors + +proc newTaskCtx*[T](tp: typedesc[T], + signal: ThreadSignalPtr, + nextSignal: ThreadSignalPtr = nil): TaskCtx[T] = + newSharedPtr(TaskCtxObj[T](signal: signal, nextSignal: nextSignal)) + +proc setCancelled[T](ctx: TaskCtx[T]) = + ctx[].cancelled = true + +proc setRunning[T](ctx: TaskCtx[T]): bool = + if ctx[].cancelled: + return false + ctx[].running = true + return true +proc setDone[T](ctx: TaskCtx[T]) = + ctx[].running = false + +proc acquireSignal(): ?!ThreadSignalPtr = + # echo "signal:OPEN!" + let signal = ThreadSignalPtr.new() + if signal.isErr(): + failure (ref CatchableError)(msg: "failed to aquire ThreadSignalPtr: " & + signal.error()) + else: + success signal.get() + +template executeTask*[T](ctx: TaskCtx[T], blk: untyped) = + ## executes a task on a thread work and handles cleanup after cancels/errors + ## + try: + if not ctx.setRunning(): + return + + ## run backend command + let res = `blk` + if res.isOk(): + when T is void: + ctx[].res.ok() + else: + ctx[].res.ok(res.get()) + else: + ctx[].res.err res.error().toThreadErr() + + except CatchableError as exc: + trace "Unexpected exception thrown in async task", exc = exc.msg + ctx[].res.err exc.toThreadErr() + # except Exception as exc: + # trace "Unexpected defect thrown in async task", exc = exc.msg + # ctx[].res.err exc.toThreadErr() + finally: + ctx.setDone() + discard ctx[].signal.fireSync() + +template dispatchTaskWrap[BT](self: ThreadProxy[BT], + signal: ThreadSignalPtr, + blk: untyped + ): auto = + var ds {.used, inject.} = self.backend + proc runTask() = + `blk` + runTask() + await wait(ctx[].signal) + +template dispatchTask*[BT](self: ThreadProxy[BT], + signal: ThreadSignalPtr, + blk: untyped + ): auto = + ## handles dispatching a task from an async context + ## `blk` is the actions, it has `ctx` and `ds` variables in scope. + ## note that `ds` is a generic + try: + dispatchTaskWrap[BT](self, signal, blk) + except CancelledError as exc: + trace "Cancelling thread future!", exc = exc.msg + ctx.setCancelled() + raise exc + except CatchableError as exc: + ctx.setCancelled() + raise exc + finally: + discard ctx[].signal.close() + self.semaphore.release() + +proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} = + ## run backend command + mixin has + executeTask(ctx): + has(ds, key) + +proc has*[BT](self: ThreadProxy[BT], + key: Key): Future[?!bool] {.async.} = + await self.semaphore.acquire() + let signal = acquireSignal().get() + # without signal =? acquireSignal(), err: + # return failure err + + let ctx = newTaskCtx(bool, signal = signal) + dispatchTask(self, signal): + let key = KeyId.new key.id() + self.tp.spawn hasTask(ctx, ds, key) + return ctx[].res.toRes(v => v) + +proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB; + key: KeyId) {.gcsafe.} = + ## run backend command + mixin delete + executeTask(ctx): + delete(ds, key) + +proc delete*[BT](self: ThreadProxy[BT], + key: Key): Future[?!void] {.async.} = + ## delete key + await self.semaphore.acquire() + let signal = acquireSignal().get() + # without signal =? acquireSignal(), err: + # return failure err + + let ctx = newTaskCtx(void, signal = signal) + dispatchTask(self, signal): + let key = KeyId.new key.id() + self.tp.spawn deleteTask(ctx, ds, key) + + return ctx[].res.toRes() + +proc delete*[BT](self: ThreadProxy[BT], + keys: seq[Key]): Future[?!void] {.async.} = + ## delete batch + for key in keys: + if err =? (await self.delete(key)).errorOption: + return failure err + + return success() + + +proc putTask[T, DB](ctx: TaskCtx[T], ds: DB; + key: KeyId, + data: DataBuffer) {.gcsafe, nimcall.} = + mixin put + executeTask(ctx): + put(ds, key, data) + +proc put*[BT](self: ThreadProxy[BT], + key: Key, + data: seq[byte]): Future[?!void] {.async.} = + ## put key with data + await self.semaphore.acquire() + let signal = acquireSignal().get() + # without signal =? acquireSignal(), err: + # return failure err + + let ctx = newTaskCtx(void, signal = signal) + dispatchTask(self, signal): + let key = KeyId.new key.id() + let data = DataBuffer.new data + self.tp.spawn putTask(ctx, ds, key, data) + + return ctx[].res.toRes() + +proc put*[E, DB](self: ThreadProxy[DB], + batch: seq[E]): Future[?!void] {.async.} = + ## put batch data + for entry in batch: + if err =? (await self.put(entry.key, entry.data)).errorOption: + return failure err + + return success() + + +proc getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB; + key: KeyId) {.gcsafe, nimcall.} = + ## run backend command + mixin get + executeTask(ctx): + let res = get(ds, key) + res + +proc get*[BT](self: ThreadProxy[BT], + key: Key, + ): Future[?!seq[byte]] {.async.} = + await self.semaphore.acquire() + let signal = acquireSignal().get() + # without signal =? acquireSignal(), err: + # return failure err + + let ctx = newTaskCtx(DataBuffer, signal = signal) + dispatchTask(self, signal): + let key = KeyId.new key.id() + self.tp.spawn getTask(ctx, ds, key) + + return ctx[].res.toRes(v => v.toSequence()) + +proc close*[BT](self: ThreadProxy[BT]): Future[?!void] {.async.} = + await self.semaphore.closeAll() + self.backend.close() + +type + QResult = DbQueryResponse[KeyId, DataBuffer] + +proc queryTask[DB]( + ctx: TaskCtx[QResult], + ds: DB, + query: DbQuery[KeyId], +) = + ## run query command + mixin queryIter + executeTask(ctx): + # we execute this all inside `executeTask` + # so we need to return a final result + let handleRes = query(ds, query) + if handleRes.isErr(): + # set error and exit executeTask, which will fire final signal + (?!QResult).err(handleRes.error()) + else: + # otherwise manually an set empty ok result + ctx[].res.ok (KeyId.none, DataBuffer(), ) + discard ctx[].signal.fireSync() + if not ctx[].nextSignal.waitSync(10.seconds).get(): + raise newException(DeadThreadDefect, "queryTask timed out") + + var handle = handleRes.get() + for item in handle.queryIter(): + # wait for next request from async thread + + if ctx[].cancelled: + # cancel iter, then run next cycle so it'll finish and close + handle.cancel = true + continue + else: + ctx[].res = item.mapErr() do(exc: ref CatchableError) -> ThreadResErr: + exc + + discard ctx[].signal.fireSync() + if not ctx[].nextSignal.waitSync(10.seconds).get(): + raise newException(DeadThreadDefect, "queryTask timed out") + + # set final result + (?!QResult).ok((KeyId.none, DataBuffer())) + +proc query*[BT](self: ThreadProxy[BT], + q: Query + ): Future[?!QueryIter] {.async.} = + ## performs async query + ## keeps one thread running queryTask until finished + ## + await self.semaphore.acquire() + let signal = acquireSignal().get() + # without signal =? acquireSignal(), err: + # return failure err + let nextSignal = acquireSignal().get() + # without nextSignal =? acquireSignal(), err: + # return failure err + let ctx = newTaskCtx(QResult, signal = signal, nextSignal = nextSignal) + + proc iterDispose() {.async.} = + ctx.setCancelled() + await ctx[].nextSignal.fire() + discard ctx[].signal.close() + discard ctx[].nextSignal.close() + self.semaphore.release() + + try: + let query = dbQuery( + key = KeyId.new q.key.id(), + value = q.value, limit = q.limit, offset = q.offset, sort = q.sort) + + # setup initial queryTask + dispatchTaskWrap(self, signal): + self.tp.spawn queryTask(ctx, ds, query) + await ctx[].nextSignal.fire() + + var lock = newAsyncLock() # serialize querying under threads + var iter = QueryIter.new() + iter.dispose = proc (): Future[?!void] {.async.} = + iterDispose() + success() + + iter.next = proc(): Future[?!QueryResponse] {.async.} = + let ctx = ctx + try: + trace "About to query" + if lock.locked: + return failure (ref DatastoreError)( + msg: "Should always await query features") + if iter.finished == true: + return failure (ref QueryEndedError)( + msg: "Calling next on a finished query!") + + await wait(ctx[].signal) + if not ctx[].running: + iter.finished = true + + defer: + await ctx[].nextSignal.fire() + + if ctx[].res.isErr(): + return err(ctx[].res.error()) + else: + let qres = ctx[].res.get() + let key = qres.key.map(proc (k: KeyId): Key = k.toKey()) + let data = qres.data.toSequence() + return (?!QueryResponse).ok((key: key, data: data)) + except CancelledError as exc: + trace "Cancelling thread future!", exc = exc.msg + ctx.setCancelled() + await iterDispose() # todo: is this valid? + raise exc + + return success iter + except CancelledError as exc: + trace "Cancelling thread future!", exc = exc.msg + ctx.setCancelled() + await iterDispose() + raise exc + +proc new*[DB](self: type ThreadProxy, + db: DB, + withLocks = static false, + tp: Taskpool + ): ?!ThreadProxy[DB] = + doAssert tp.numThreads > 1, "ThreadProxy requires at least 2 threads" + + success ThreadProxy[DB]( + tp: tp, + backend: db, + semaphore: AsyncSemaphore.new(tp.numThreads - 1) + ) diff --git a/datastore/threads/threadresult.nim b/datastore/threads/threadresult.nim new file mode 100644 index 00000000..6a6cb0ae --- /dev/null +++ b/datastore/threads/threadresult.nim @@ -0,0 +1,75 @@ +import std/atomics +import std/options +import std/locks + +import pkg/questionable/results +import pkg/results + +import ../types +import ../query +import ../key + +import ./backend +import ./databuffer + +type + ErrorEnum* {.pure.} = enum + DatastoreErr, + DatastoreKeyNotFoundErr, + QueryEndedErr, + CatchableErr, + DefectErr + + ThreadTypes* = void | bool | SomeInteger | DataBuffer | tuple | Atomic + ThreadResErr* = (ErrorEnum, DataBuffer) + ThreadResult*[T: ThreadTypes] = Result[T, ThreadResErr] + + +converter toThreadErr*(e: ref Exception): ThreadResErr {.inline, raises: [].} = + if e of DatastoreKeyNotFound: (ErrorEnum.DatastoreKeyNotFoundErr, DataBuffer.new(e.msg)) + elif e of QueryEndedError: (ErrorEnum.QueryEndedErr, DataBuffer.new(e.msg)) + elif e of DatastoreError: (DatastoreErr, DataBuffer.new(e.msg)) + elif e of CatchableError: (CatchableErr, DataBuffer.new(e.msg)) + elif e of Defect: (DefectErr, DataBuffer.new(e.msg)) + else: raise (ref Defect)(msg: e.msg) + +converter toExc*(e: ThreadResErr): ref CatchableError = + case e[0]: + of ErrorEnum.DatastoreKeyNotFoundErr: (ref DatastoreKeyNotFound)(msg: $e[1]) + of ErrorEnum.QueryEndedErr: (ref QueryEndedError)(msg: $e[1]) + of ErrorEnum.DatastoreErr: (ref DatastoreError)(msg: $e[1]) + of ErrorEnum.CatchableErr: (ref CatchableError)(msg: $e[1]) + of ErrorEnum.DefectErr: (ref CatchableError)(msg: "defect: " & $e[1]) + +proc toRes*(res: ThreadResult[void]): ?!void = + res.mapErr() do(e: ThreadResErr) -> ref CatchableError: + e.toExc() + +proc toRes*[T,S](res: ThreadResult[T], + m: proc(v: T): S = proc(v: T): T = v): ?!S = + # todo: cleaner way to do this? + if res.isErr(): + result.err res.error().toExc() + else: + result.ok m(res.get()) + +type + MutexSignal* = tuple[lock: Lock, cond: Cond, open: bool] + +proc init*(sig: var MutexSignal) = + sig.lock.initLock() + sig.cond.initCond() + sig.open = true + +proc wait*(sig: var MutexSignal) = + withLock(sig.lock): + wait(sig.cond, sig.lock) + +proc fire*(sig: var MutexSignal) = + withLock(sig.lock): + signal(sig.cond) + +proc close*(sig: var MutexSignal) = + if sig.open: + sig.lock.deinitLock() + sig.cond.deinitCond() diff --git a/datastore/tieredds.nim b/datastore/tieredds.nim index 4eca23be..8448f9bf 100644 --- a/datastore/tieredds.nim +++ b/datastore/tieredds.nim @@ -74,13 +74,16 @@ method get*( bytes: seq[byte] for store in self.stores: - without bytes =? (await store.get(key)): - continue + without bytes =? (await store.get(key)), err: + if err of DatastoreKeyNotFound: + continue + else: + return failure(err) if bytes.len <= 0: continue - # put found data into stores logically in front of the current store + # put found data into stores in front of the current store for s in self.stores: if s == store: break if( @@ -88,7 +91,10 @@ method get*( res.isErr): return failure res.error - return success bytes + if bytes.len > 0: + return success bytes + + return failure (ref DatastoreKeyNotFound)(msg: "Key not found") method put*( self: TieredDatastore, diff --git a/datastore/types.nim b/datastore/types.nim index b019cdb0..473db527 100644 --- a/datastore/types.nim +++ b/datastore/types.nim @@ -6,5 +6,6 @@ const type DatastoreError* = object of CatchableError DatastoreKeyNotFound* = object of DatastoreError + QueryEndedError* = object of DatastoreError Datastore* = ref object of RootObj diff --git a/nimble.lock b/nimble.lock new file mode 100644 index 00000000..3a208e32 --- /dev/null +++ b/nimble.lock @@ -0,0 +1,214 @@ +{ + "version": 2, + "packages": { + "upraises": { + "version": "0.1.0", + "vcsRevision": "d9f268db1021959fe0f2c7a5e49fba741f9932a0", + "url": "https://github.com/markspanbroek/upraises", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "176234f808b44a0be763df706ed634d6e8df17bb" + } + }, + "results": { + "version": "0.4.0", + "vcsRevision": "f3c666a272c69d70cb41e7245e7f6844797303ad", + "url": "https://github.com/arnetheduck/nim-results", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "51e08ca9524db98dc909fb39192272cc2b5451c7" + } + }, + "unittest2": { + "version": "0.1.0", + "vcsRevision": "2300fa9924a76e6c96bc4ea79d043e3a0f27120c", + "url": "https://github.com/status-im/nim-unittest2", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "914cf9a380c83b2ae40697981e5d94903505e87e" + } + }, + "stew": { + "version": "0.1.0", + "vcsRevision": "3159137d9a3110edb4024145ce0ba778975de40e", + "url": "https://github.com/status-im/nim-stew", + "downloadMethod": "git", + "dependencies": [ + "results", + "unittest2" + ], + "checksums": { + "sha1": "4ab494e272e997011853faddebe9e55183613776" + } + }, + "faststreams": { + "version": "0.3.0", + "vcsRevision": "720fc5e5c8e428d9d0af618e1e27c44b42350309", + "url": "https://github.com/status-im/nim-faststreams", + "downloadMethod": "git", + "dependencies": [ + "stew", + "unittest2" + ], + "checksums": { + "sha1": "ab178ba25970b95d953434b5d86b4d60396ccb64" + } + }, + "serialization": { + "version": "0.2.0", + "vcsRevision": "4bdbc29e54fe54049950e352bb969aab97173b35", + "url": "https://github.com/status-im/nim-serialization", + "downloadMethod": "git", + "dependencies": [ + "faststreams", + "unittest2", + "stew" + ], + "checksums": { + "sha1": "c8c99a387aae488e7008aded909ebfe662e74450" + } + }, + "sqlite3_abi": { + "version": "3.40.1.1", + "vcsRevision": "362e1bd9f689ad9f5380d9d27f0705b3d4dfc7d3", + "url": "https://github.com/arnetheduck/nim-sqlite3-abi", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "8e91db8156a82383d9c48f53b33e48f4e93077b1" + } + }, + "testutils": { + "version": "0.5.0", + "vcsRevision": "dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34", + "url": "https://github.com/status-im/nim-testutils", + "downloadMethod": "git", + "dependencies": [ + "unittest2" + ], + "checksums": { + "sha1": "756d0757c4dd06a068f9d38c7f238576ba5ee897" + } + }, + "json_serialization": { + "version": "0.1.5", + "vcsRevision": "85b7ea093cb85ee4f433a617b97571bd709d30df", + "url": "https://github.com/status-im/nim-json-serialization", + "downloadMethod": "git", + "dependencies": [ + "serialization", + "stew" + ], + "checksums": { + "sha1": "c6b30565292acf199b8be1c62114726e354af59e" + } + }, + "chronicles": { + "version": "0.10.3", + "vcsRevision": "32ac8679680ea699f7dbc046e8e0131cac97d41a", + "url": "https://github.com/status-im/nim-chronicles", + "downloadMethod": "git", + "dependencies": [ + "testutils", + "json_serialization" + ], + "checksums": { + "sha1": "79f09526d4d9b9196dd2f6a75310d71a890c4f88" + } + }, + "asynctest": { + "version": "0.3.2", + "vcsRevision": "a236a5f0f3031573ac2cb082b63dbf6e170e06e7", + "url": "https://github.com/markspanbroek/asynctest", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "0ef50d086659835b0a23a4beb77cb11747695448" + } + }, + "httputils": { + "version": "0.3.0", + "vcsRevision": "87b7cbf032c90b9e6b446081f4a647e950362cec", + "url": "https://github.com/status-im/nim-http-utils", + "downloadMethod": "git", + "dependencies": [ + "stew", + "unittest2" + ], + "checksums": { + "sha1": "72a138157a9951f0986a9c4afc8c9a83ce3979a8" + } + }, + "taskpools": { + "version": "0.0.4", + "vcsRevision": "b3673c7a7a959ccacb393bd9b47e997bbd177f5a", + "url": "https://github.com/status-im/nim-taskpools", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "e43c5170d4e9ef1b27dd0956ffa0db46f992f9a6" + } + }, + "pretty": { + "version": "0.1.0", + "vcsRevision": "60850f8c595d4f0b9e55f3a99c7c471244a3182a", + "url": "https://github.com/treeform/pretty", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "b0a1cf8607d169050acd2bdca8d76fe850c23648" + } + }, + "bearssl": { + "version": "0.2.1", + "vcsRevision": "e4157639db180e52727712a47deaefcbbac6ec86", + "url": "https://github.com/status-im/nim-bearssl", + "downloadMethod": "git", + "dependencies": [ + "unittest2" + ], + "checksums": { + "sha1": "a5086fd5c0af2b852f34c0cc6e4cff93a98f97ec" + } + }, + "chronos": { + "version": "3.2.0", + "vcsRevision": "0277b65be2c7a365ac13df002fba6e172be55537", + "url": "https://github.com/status-im/nim-chronos", + "downloadMethod": "git", + "dependencies": [ + "stew", + "bearssl", + "httputils", + "unittest2" + ], + "checksums": { + "sha1": "78a41db7fb05b937196d4fa2f1e3fb4353b36a07" + } + }, + "questionable": { + "version": "0.10.10", + "vcsRevision": "b3cf35ac450fd42c9ea83dc084f5cba2efc55da3", + "url": "https://github.com/markspanbroek/questionable", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "8bb23a05d7f21619010471aa009e27d3fa73d93a" + } + }, + "threading": { + "version": "0.2.0", + "vcsRevision": "bcc284991ba928d1ed299a81a93b7113cc7de04f", + "url": "https://github.com/nim-lang/threading", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "08dfc46cedc3fc202e5de6ef80a655598331eb89" + } + } + }, + "tasks": {} +} diff --git a/tests/datastore/dscommontests.nim b/tests/datastore/dscommontests.nim index 1e0353cb..beda975e 100644 --- a/tests/datastore/dscommontests.nim +++ b/tests/datastore/dscommontests.nim @@ -3,8 +3,9 @@ import std/options import pkg/asynctest import pkg/chronos import pkg/stew/results +import pkg/questionable/results -import pkg/datastore +import pkg/datastore/datastore proc basicStoreTests*( ds: Datastore, @@ -56,3 +57,9 @@ proc basicStoreTests*( for k in batch: check: not (await ds.has(k)).tryGet + + test "handle missing key": + let key = Key.init("/missing/key").tryGet() + + expect(DatastoreKeyNotFound): + discard (await ds.get(key)).tryGet() # non existing key diff --git a/tests/datastore/querycommontests.nim b/tests/datastore/querycommontests.nim index 4f0a15d8..0bba6e45 100644 --- a/tests/datastore/querycommontests.nim +++ b/tests/datastore/querycommontests.nim @@ -1,11 +1,4 @@ -import std/options -import std/sequtils -from std/algorithm import sort, reversed -import pkg/asynctest -import pkg/chronos -import pkg/stew/results -import pkg/stew/byteutils import pkg/datastore @@ -36,9 +29,14 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var res: seq[QueryResponse] + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + res.add((key, val)) + res check: res.len == 3 @@ -63,9 +61,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res check: res.len == 3 @@ -90,9 +99,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res check: res.len == 2 @@ -117,9 +137,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = iter = (await ds.query(q)).tryGet var - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res res.sort do (a, b: QueryResponse) -> int: cmp(a.key.get.id, b.key.get.id) @@ -152,9 +183,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res check: res.len == 10 @@ -175,9 +217,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res check: res.len == 10 @@ -198,9 +251,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res check: res.len == 5 @@ -237,9 +301,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = kvs = kvs.reversed let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res check: res.len == 100 diff --git a/tests/datastore/sql/testsqliteds.nim b/tests/datastore/sql/testsqliteds.nim deleted file mode 100644 index c629eb0c..00000000 --- a/tests/datastore/sql/testsqliteds.nim +++ /dev/null @@ -1,89 +0,0 @@ -import std/options -import std/os -import std/sequtils -from std/algorithm import sort, reversed - -import pkg/asynctest -import pkg/chronos -import pkg/stew/results -import pkg/stew/byteutils - -import pkg/datastore/sql/sqliteds - -import ../dscommontests -import ../querycommontests - -suite "Test Basic SQLiteDatastore": - let - ds = SQLiteDatastore.new(Memory).tryGet() - key = Key.init("a:b/c/d:e").tryGet() - bytes = "some bytes".toBytes - otherBytes = "some other bytes".toBytes - - teardownAll: - (await ds.close()).tryGet() - - basicStoreTests(ds, key, bytes, otherBytes) - -suite "Test Read Only SQLiteDatastore": - let - path = currentSourcePath() # get this file's name - basePath = "tests_data" - basePathAbs = path.parentDir / basePath - filename = "test_store" & DbExt - dbPathAbs = basePathAbs / filename - key = Key.init("a:b/c/d:e").tryGet() - bytes = "some bytes".toBytes - - var - dsDb: SQLiteDatastore - readOnlyDb: SQLiteDatastore - - setupAll: - removeDir(basePathAbs) - require(not dirExists(basePathAbs)) - createDir(basePathAbs) - - dsDb = SQLiteDatastore.new(path = dbPathAbs).tryGet() - readOnlyDb = SQLiteDatastore.new(path = dbPathAbs, readOnly = true).tryGet() - - teardownAll: - (await dsDb.close()).tryGet() - (await readOnlyDb.close()).tryGet() - - removeDir(basePathAbs) - require(not dirExists(basePathAbs)) - - test "put": - check: - (await readOnlyDb.put(key, bytes)).isErr - - (await dsDb.put(key, bytes)).tryGet() - - test "get": - check: - (await readOnlyDb.get(key)).tryGet() == bytes - (await dsDb.get(key)).tryGet() == bytes - - test "delete": - check: - (await readOnlyDb.delete(key)).isErr - - (await dsDb.delete(key)).tryGet() - - test "contains": - check: - not (await readOnlyDb.has(key)).tryGet() - not (await dsDb.has(key)).tryGet() - -suite "Test Query": - var - ds: SQLiteDatastore - - setup: - ds = SQLiteDatastore.new(Memory).tryGet() - - teardown: - (await ds.close()).tryGet - - queryTests(ds) diff --git a/tests/datastore/sql/testsqlitedsdb.nim b/tests/datastore/sql/testsqlitedsdb.nim index d1049331..2438d366 100644 --- a/tests/datastore/sql/testsqlitedsdb.nim +++ b/tests/datastore/sql/testsqlitedsdb.nim @@ -7,7 +7,7 @@ import pkg/stew/byteutils import pkg/sqlite3_abi import pkg/datastore/key import pkg/datastore/sql/sqlitedsdb -import pkg/datastore/sql/sqliteds +import pkg/datastore/threads/sqlbackend suite "Test Open SQLite Datastore DB": let @@ -28,7 +28,7 @@ suite "Test Open SQLite Datastore DB": test "Should create and open datastore DB": let - dsDb = SQLiteDsDb.open( + dsDb = SQLiteDsDb[string, seq[byte]].open( path = dbPathAbs, flags = SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE).tryGet() @@ -40,7 +40,7 @@ suite "Test Open SQLite Datastore DB": test "Should open existing DB": let - dsDb = SQLiteDsDb.open( + dsDb = SQLiteDsDb[string, seq[byte]].open( path = dbPathAbs, flags = SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE).tryGet() @@ -55,7 +55,7 @@ suite "Test Open SQLite Datastore DB": fileExists(dbPathAbs) let - dsDb = SQLiteDsDb.open( + dsDb = SQLiteDsDb[string, seq[byte]].open( path = dbPathAbs, flags = SQLITE_OPEN_READONLY).tryGet() @@ -66,7 +66,7 @@ suite "Test Open SQLite Datastore DB": removeDir(basePathAbs) check: not fileExists(dbPathAbs) - SQLiteDsDb.open(path = dbPathAbs).isErr + SQLiteDsDb[string, seq[byte]].open(path = dbPathAbs).isErr suite "Test SQLite Datastore DB operations": let @@ -81,19 +81,19 @@ suite "Test SQLite Datastore DB operations": otherData = "some other data".toBytes var - dsDb: SQLiteDsDb - readOnlyDb: SQLiteDsDb + dsDb: SQLiteDsDb[string, seq[byte]] + readOnlyDb: SQLiteDsDb[string, seq[byte]] setupAll: removeDir(basePathAbs) require(not dirExists(basePathAbs)) createDir(basePathAbs) - dsDb = SQLiteDsDb.open( + dsDb = SQLiteDsDb[string, seq[byte]].open( path = dbPathAbs, flags = SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE).tryGet() - readOnlyDb = SQLiteDsDb.open( + readOnlyDb = SQLiteDsDb[string, seq[byte]].open( path = dbPathAbs, flags = SQLITE_OPEN_READONLY).tryGet() @@ -111,12 +111,10 @@ suite "Test SQLite Datastore DB operations": dsDb.putStmt.exec((key.id, data, timestamp())).tryGet() test "Should select key": - let - dataCol = dsDb.getDataCol var bytes: seq[byte] proc onData(s: RawStmtPtr) = - bytes = dataCol() + bytes = dataCol[seq[byte]](dsDb.getDataCol) check: dsDb.getStmt.query((key.id), onData).tryGet() @@ -129,12 +127,10 @@ suite "Test SQLite Datastore DB operations": dsDb.putStmt.exec((key.id, otherData, timestamp())).tryGet() test "Should select updated key": - let - dataCol = dsDb.getDataCol var bytes: seq[byte] proc onData(s: RawStmtPtr) = - bytes = dataCol() + bytes = dataCol[seq[byte]](dsDb.getDataCol) check: dsDb.getStmt.query((key.id), onData).tryGet() diff --git a/tests/datastore/testasyncsemaphore.nim b/tests/datastore/testasyncsemaphore.nim new file mode 100644 index 00000000..3c8bb84b --- /dev/null +++ b/tests/datastore/testasyncsemaphore.nim @@ -0,0 +1,206 @@ +{.used.} + +# Nim-Libp2p +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +import random + +import pkg/chronos +import pkg/asynctest + +import pkg/datastore/threads/asyncsemaphore + +randomize() + +suite "AsyncSemaphore": + test "Should acquire": + let sema = AsyncSemaphore.new(3) + + await sema.acquire() + await sema.acquire() + await sema.acquire() + + check sema.count == 0 + + test "Should release": + let sema = AsyncSemaphore.new(3) + + await sema.acquire() + await sema.acquire() + await sema.acquire() + + check sema.count == 0 + sema.release() + sema.release() + sema.release() + check sema.count == 3 + + test "Should queue acquire": + let sema = AsyncSemaphore.new(1) + + await sema.acquire() + let fut = sema.acquire() + + check sema.count == 0 + sema.release() + sema.release() + check sema.count == 1 + + await sleepAsync(10.millis) + check fut.finished() + + test "Should keep count == size": + let sema = AsyncSemaphore.new(1) + sema.release() + sema.release() + sema.release() + check sema.count == 1 + + test "Should tryAcquire": + let sema = AsyncSemaphore.new(1) + await sema.acquire() + check sema.tryAcquire() == false + + test "Should tryAcquire and acquire": + let sema = AsyncSemaphore.new(4) + check sema.tryAcquire() == true + check sema.tryAcquire() == true + check sema.tryAcquire() == true + check sema.tryAcquire() == true + check sema.count == 0 + + let fut = sema.acquire() + check fut.finished == false + check sema.count == 0 + + sema.release() + sema.release() + sema.release() + sema.release() + sema.release() + + check fut.finished == true + check sema.count == 4 + + test "Should restrict resource access": + let sema = AsyncSemaphore.new(3) + var resource = 0 + + proc task() {.async.} = + try: + await sema.acquire() + resource.inc() + check resource > 0 and resource <= 3 + let sleep = rand(0..10).millis + # echo sleep + await sleepAsync(sleep) + finally: + resource.dec() + sema.release() + + var tasks: seq[Future[void]] + for i in 0..<10: + tasks.add(task()) + + await allFutures(tasks) + + test "Should cancel sequential semaphore slot": + let sema = AsyncSemaphore.new(1) + + await sema.acquire() + + let + tmp = sema.acquire() + tmp2 = sema.acquire() + check: + not tmp.finished() + not tmp2.finished() + + tmp.cancel() + sema.release() + + check tmp2.finished() + + sema.release() + + check await sema.acquire().withTimeout(10.millis) + + test "Should handle out of order cancellations": + let sema = AsyncSemaphore.new(1) + + await sema.acquire() # 1st acquire + let tmp1 = sema.acquire() # 2nd acquire + check not tmp1.finished() + + let tmp2 = sema.acquire() # 3rd acquire + check not tmp2.finished() + + let tmp3 = sema.acquire() # 4th acquire + check not tmp3.finished() + + # up to this point, we've called acquire 4 times + tmp1.cancel() # 1st release (implicit) + tmp2.cancel() # 2nd release (implicit) + + check not tmp3.finished() # check that we didn't release the wrong slot + + sema.release() # 3rd release (explicit) + check tmp3.finished() + + sema.release() # 4th release + check await sema.acquire().withTimeout(10.millis) + + test "Should properly handle timeouts and cancellations": + let sema = AsyncSemaphore.new(1) + + await sema.acquire() + check not(await sema.acquire().withTimeout(1.millis)) # Should not acquire but cancel + sema.release() + + check await sema.acquire().withTimeout(10.millis) + + test "Should handle forceAcquire properly": + let sema = AsyncSemaphore.new(1) + + await sema.acquire() + check not(await sema.acquire().withTimeout(1.millis)) # Should not acquire but cancel + + let + fut1 = sema.acquire() + fut2 = sema.acquire() + + sema.forceAcquire() + sema.release() + + await fut1 or fut2 or sleepAsync(1.millis) + check: + fut1.finished() + not fut2.finished() + + sema.release() + await fut1 or fut2 or sleepAsync(1.millis) + check: + fut1.finished() + fut2.finished() + + + sema.forceAcquire() + sema.forceAcquire() + + let + fut3 = sema.acquire() + fut4 = sema.acquire() + fut5 = sema.acquire() + sema.release() + sema.release() + await sleepAsync(1.millis) + check: + fut3.finished() + fut4.finished() + not fut5.finished() diff --git a/tests/datastore/testdatabuffer.nim b/tests/datastore/testdatabuffer.nim new file mode 100644 index 00000000..ee8bc117 --- /dev/null +++ b/tests/datastore/testdatabuffer.nim @@ -0,0 +1,139 @@ +import std/options +import std/sequtils +import std/algorithm +import std/locks +import std/os +import pkg/stew/byteutils +import pkg/unittest2 +import pkg/questionable +import pkg/questionable/results +import pkg/datastore/key + +include ../../datastore/threads/databuffer + +var + shareVal: DataBuffer + lock: Lock + cond: Cond + +var threads: array[2,Thread[int]] + +proc thread1(val: int) {.thread.} = + echo "thread1" + {.cast(gcsafe).}: + for i in 0.. 1: + handle.cancel = true + + check: + handle.cancel == true + handle.closed == true + res.len == 2 + + res[0].key.get == key2 + res[0].data == val2 + + res[1].key.get == key1 + res[1].data == val1 + + test "Key should query all keys without values": + let + q = dbQuery(key= key1, value= false) + + ds.put(key1, val1).tryGet + ds.put(key2, val2).tryGet + ds.put(key3, val3).tryGet + + var + handle = ds.query(q).tryGet + let + res = handle.queryIter().toSeq().mapIt(it.tryGet()) + + check: + res.len == 3 + res[0].key.get == key1 + res[0].data.len == 0 + + res[1].key.get == key2 + res[1].data.len == 0 + + res[2].key.get == key3 + res[2].data.len == 0 + + + test "Key should not query parent": + let + q = dbQuery(key= key2, value= true) + + ds.put(key1, val1).tryGet + ds.put(key2, val2).tryGet + ds.put(key3, val3).tryGet + + var + handle = ds.query(q).tryGet + let + res = handle.queryIter().toSeq().mapIt(it.tryGet()) + # ).filterIt(it.isOk).mapIt(it.tryGet()) + + check: + res.len == 2 + res[0].key.get == key2 + res[0].data == val2 + + res[1].key.get == key3 + res[1].data == val3 + + test "Key should all list all keys at the same level": + let + queryKey = Key.init("/a").tryGet + q = dbQuery(key= key1, value= true) + + ds.put(key1, val1).tryGet + ds.put(key2, val2).tryGet + ds.put(key3, val3).tryGet + + var + handle = ds.query(q).tryGet + res = handle.queryIter().toSeq().mapIt(it.tryGet()) + + res.sort do (a, b: DbQueryResponse[KeyId, DataBuffer]) -> int: + cmp($a.key.get, $b.key.get) + + check: + res.len == 3 + res[0].key.get == key1 + res[0].data == val1 + + res[1].key.get == key2 + res[1].data == val2 + + res[2].key.get == key3 + res[2].data == val3 + + if extended: + test "Should apply limit": + let + key = Key.init("/a").tryGet + q = dbQuery(key= key1, limit= 10, value= false) + + for i in 0..<100: + let + key = KeyId.new $Key.init(key, Key.init("/" & $i).tryGet).tryGet + val = DataBuffer.new("val " & $i) + + ds.put(key, val).tryGet + + var + handle = ds.query(q).tryGet + let + res = handle.queryIter().toSeq().mapIt(it.tryGet()) + + check: + res.len == 10 + + test "Should not apply offset": + let + key = Key.init("/a").tryGet + keyId = KeyId.new $key + q = dbQuery(key= keyId, offset= 90) + + for i in 0..<100: + let + key = KeyId.new $Key.init(key, Key.init("/" & $i).tryGet).tryGet + val = DataBuffer.new("val " & $i) + + ds.put(key, val).tryGet + + var + qr = ds.query(q) + # echo "RES: ", qr.repr + + var + handle = ds.query(q).tryGet + let + res = handle.queryIter().toSeq().mapIt(it.tryGet()) + + # echo "RES: ", res.mapIt(it.key) + check: + res.len == 10 + + test "Should not apply offset and limit": + let + key = Key.init("/a").tryGet + keyId = KeyId.new $key + q = dbQuery(key= keyId, offset= 95, limit= 5) + + for i in 0..<100: + let + key = KeyId.new $Key.init(key, Key.init("/" & $i).tryGet).tryGet + val = DataBuffer.new("val " & $i) + + ds.put(key, val).tryGet + + var + handle = ds.query(q).tryGet + res = handle.queryIter().toSeq().mapIt(it.tryGet()) + + check: + res.len == 5 + + for i in 0.. int: + cmp($a.key.get, $b.key.get) + + kvs = kvs.reversed + var + handle = ds.query(q).tryGet + res = handle.queryIter().toSeq().mapIt(it.tryGet()) + + check: + res.len == 100 + + for i, r in res[1..^1]: + check: + res[i].key.get == kvs[i].key.get + res[i].data == kvs[i].data \ No newline at end of file diff --git a/tests/datastore/threads/testsqlbackend.nim b/tests/datastore/threads/testsqlbackend.nim new file mode 100644 index 00000000..03a20cff --- /dev/null +++ b/tests/datastore/threads/testsqlbackend.nim @@ -0,0 +1,64 @@ +import std/options +import std/os +import std/sequtils +from std/algorithm import sort, reversed + +import pkg/unittest2 +import pkg/chronos +import pkg/stew/results +import pkg/stew/byteutils + +import pkg/datastore/threads/sqlbackend +import pkg/datastore/key + +import ./backendCommonTests + +suite "Test Basic SQLiteDatastore": + let + ds = newSQLiteBackend[string, seq[byte]](path=Memory).tryGet() + keyFull = Key.init("a:b/c/d:e").tryGet() + key = keyFull.id() + bytes = "some bytes".toBytes + otherBytes = "some other bytes".toBytes + + var batch: seq[tuple[key: string, data: seq[byte]]] + for k in 0..<100: + let kk = Key.init(key, $k).tryGet().id() + batch.add( (kk, @[k.byte]) ) + + suiteTeardown: + ds.close().tryGet() + + testBasicBackend(ds, key, bytes, otherBytes, batch) + +suite "Test DataBuffer SQLiteDatastore": + let + ds = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() + keyFull = Key.init("a:b/c/d:e").tryGet() + key = KeyId.new keyFull.id() + bytes = DataBuffer.new "some bytes" + otherBytes = DataBuffer.new "some other bytes" + + var batch: seq[tuple[key: KeyId, data: DataBuffer]] + for k in 0..<100: + let kk = Key.init(keyFull.id(), $k).tryGet().id() + batch.add( (KeyId.new kk, DataBuffer.new @[k.byte]) ) + + suiteTeardown: + ds.close().tryGet() + + testBasicBackend(ds, key, bytes, otherBytes, batch) + +suite "queryTests": + + let + dsNew = proc(): SQLiteBackend[KeyId, DataBuffer] = + newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() + key1 = KeyId.new "/a" + key2 = KeyId.new "/a/b" + key3 = KeyId.new "/a/b/c" + val1 = DataBuffer.new "value for 1" + val2 = DataBuffer.new "value for 2" + val3 = DataBuffer.new "value for 3" + + queryTests(dsNew, key1, key2, key3, val1, val2, val3, extended=true) diff --git a/tests/datastore/threads/testthreadproxy.nim b/tests/datastore/threads/testthreadproxy.nim new file mode 100644 index 00000000..6851566f --- /dev/null +++ b/tests/datastore/threads/testthreadproxy.nim @@ -0,0 +1,236 @@ +import std/options +import std/sequtils +import std/os +import std/cpuinfo +import std/algorithm +import std/importutils + +import pkg/asynctest +import pkg/chronos +import pkg/chronos/threadsync +import pkg/stew/results +import pkg/stew/byteutils +import pkg/taskpools +import pkg/questionable/results +import pkg/threading/atomics + +import pkg/datastore/sql +import pkg/datastore/fsds +import pkg/datastore/threads/threadproxy + +import ../dscommontests +import ../querycommontests + +const + NumThreads = 20 # IO threads aren't attached to CPU count + ThreadTestLoops {.intdefine.} = 1 + N = ThreadTestLoops + ThreadTestInnerLoops {.intdefine.} = 1 + M = ThreadTestInnerLoops + +var + taskPool: Taskpool = Taskpool.new(NumThreads) + +for i in 1..N: + suite "Test Basic ThreadDatastore with SQLite " & $i: + var + ds: SQLiteDatastore + key = Key.init("/a/b").tryGet() + bytes = "some bytes".toBytes + otherBytes = "some other bytes".toBytes + + setupAll: + ds = SQLiteDatastore.new(Memory, tp=taskPool).tryGet() + + teardown: + GC_fullCollect() + + teardownAll: + (await ds.close()).tryGet() + + for i in 1..M: + basicStoreTests(ds, key, bytes, otherBytes) + GC_fullCollect() + + +for i in 1..N: + suite "Test Query ThreadDatastore with SQLite " & $i: + + var + ds: SQLiteDatastore + + setup: + ds = SQLiteDatastore.new(Memory, tp = taskPool).tryGet() + + teardown: + GC_fullCollect() + + (await ds.close()).tryGet() + + for i in 1..M: + queryTests(ds, true) + GC_fullCollect() + +suite "Test Basic ThreadDatastore with fsds": + let + path = currentSourcePath() # get this file's name + basePath = "tests_data" + basePathAbs = path.parentDir / basePath + key = Key.init("/a/b").tryGet() + bytes = "some bytes".toBytes + otherBytes = "some other bytes".toBytes + + var + ds: FSDatastore + + setupAll: + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) + createDir(basePathAbs) + + ds = FSDatastore.new(root=basePathAbs, tp=taskPool, depth=5).tryGet() + + teardown: + GC_fullCollect() + + teardownAll: + (await ds.close()).tryGet() + + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) + + basicStoreTests(ds, key, bytes, otherBytes) + +suite "Test Query ThreadDatastore with fsds": + let + path = currentSourcePath() # get this file's name + basePath = "tests_data" + basePathAbs = path.parentDir / basePath + + var + ds: FSDatastore + + setup: + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) + createDir(basePathAbs) + + ds = FSDatastore.new(root=basePathAbs, tp = taskPool, depth=5).tryGet() + + teardown: + GC_fullCollect() + (await ds.close()).tryGet() + + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) + + queryTests(ds, false) + +when datastoreUseAsync: + for i in 1..N: + suite "Test ThreadDatastore cancelations": + + privateAccess(SQLiteDatastore) # expose private fields + privateAccess(ThreadProxy) # expose private fields + privateAccess(TaskCtx) # expose private fields + + var sds: SQLiteDatastore + + setupAll: + sds = SQLiteDatastore.new(Memory, tp = taskPool).tryGet() + + teardown: + GC_fullCollect() # run full collect after each test + + test "Should monitor signal and cancel": + var + signal = ThreadSignalPtr.new().tryGet() + + proc cancelTestTask(ctx: TaskCtx[bool]) {.gcsafe.} = + executeTask(ctx): + (?!bool).ok(true) + + let ctx = newTaskCtx(bool, signal=signal) + ctx[].cancelled = true + dispatchTask(sds.db, signal): + sds.db.tp.spawn cancelTestTask(ctx) + + check: + ctx[].res.isErr == true + ctx[].cancelled == true + ctx[].running == false + + test "Should cancel future": + + var + signal = ThreadSignalPtr.new().tryGet() + ms {.global.}: MutexSignal + flag {.global.}: Atomic[bool] + futFreed {.global.}: Atomic[bool] + ready {.global.}: Atomic[bool] + + ms.init() + + type + FutTestObj = object + val: int + TestValue = object + ThreadTestInt = (TestValue, ) + + proc `=destroy`(obj: var TestValue) = + # echo "destroy TestObj!" + flag.store(true) + + proc `=destroy`(obj: var FutTestObj) = + # echo "destroy FutTestObj!" + futFreed.store(true) + + proc wait(flag: var Atomic[bool], name = "task") = + # echo "wait for " & name & " to be ready..." + # defer: echo "" + for i in 1..100: + # stdout.write(".") + if flag.load() == true: + return + os.sleep(10) + raise newException(Defect, "timeout") + + proc errorTestTask(ctx: TaskCtx[ThreadTestInt]) {.gcsafe, nimcall.} = + executeTask(ctx): + # echo "task:exec" + discard ctx[].signal.fireSync() + ready.store(true) + ms.wait() + echo "task context memory: ", ctx[] + (?!ThreadTestInt).ok(default(ThreadTestInt)) + + proc runTestTask() {.async.} = + let obj = FutTestObj(val: 42) + await sleepAsync(1.milliseconds) + try: + let ctx = newTaskCtx(ThreadTestInt, signal=signal) + dispatchTask(sds.db, signal): + sds.db.tp.spawn errorTestTask(ctx) + ready.wait() + # echo "raise error" + raise newException(ValueError, "fake error") + finally: + # echo "fut FutTestObj: ", obj + assert obj.val == 42 # need to force future to keep ref here + try: + block: + await runTestTask() + except CatchableError as exc: + # echo "caught: ", $exc + discard + finally: + # echo "finish" + check ready.load() == true + GC_fullCollect() + futFreed.wait("futFreed") + echo "future freed it's mem!" + check futFreed.load() == true + + ms.fire() + flag.wait("flag") + check flag.load() == true diff --git a/tests/testall.nim b/tests/testall.nim index a6aca018..0ac781eb 100644 --- a/tests/testall.nim +++ b/tests/testall.nim @@ -1,9 +1,14 @@ import ./datastore/testkey, ./datastore/testdatastore, - ./datastore/testfsds, ./datastore/testsql, + ./datastore/testfsds, ./datastore/testtieredds, - ./datastore/testmountedds + ./datastore/testmountedds, + ./datastore/testdatabuffer, + ./datastore/threads/testsqlbackend, + ./datastore/threads/testthreadproxy, + ./datastore/testasyncsemaphore, + ./datastore/testsemaphore {.warning[UnusedImport]: off.}