diff --git a/datastore.nimble b/datastore.nimble index 3b5b28ef..a6505981 100644 --- a/datastore.nimble +++ b/datastore.nimble @@ -9,12 +9,12 @@ license = "Apache License 2.0 or MIT" requires "nim >= 1.6.14", "asynctest >= 0.3.1 & < 0.4.0", "chronos#0277b65be2c7a365ac13df002fba6e172be55537", - "questionable >= 0.10.3 & < 0.11.0", + "questionable#head", "sqlite3_abi", "stew", "unittest2", "pretty", - "threading", + "https://github.com/elcritch/threading#test-smartptrsleak", "taskpools", "upraises >= 0.1.0 & < 0.2.0", "chronicles" diff --git a/datastore/backend.nim b/datastore/backend.nim index 5790b121..410f5b11 100644 --- a/datastore/backend.nim +++ b/datastore/backend.nim @@ -51,8 +51,9 @@ proc dbQuery*[K]( proc `$`*(id: KeyId): string = $(id.data) -proc toKey*(tp: typedesc[KeyId], id: cstring): KeyId = KeyId.new(id) -proc toKey*(tp: typedesc[string], id: cstring): string = $(id) +proc toKey*(tp: typedesc[KeyId], id: string|cstring): KeyId = KeyId.new($id) +proc toKey*(tp: typedesc[string], id: string|cstring): string = $(id) +# proc toKey*(tp: typedesc[Key], id: string|cstring): KeyId = Key.init($id).expect("valid key") template toVal*(tp: typedesc[DataBuffer], id: openArray[byte]): DataBuffer = DataBuffer.new(id) template toVal*(tp: typedesc[seq[byte]], id: openArray[byte]): seq[byte] = @(id) @@ -62,6 +63,3 @@ proc new*(tp: typedesc[KeyId], id: cstring): KeyId = proc new*(tp: typedesc[KeyId], id: string): KeyId = KeyId(data: DataBuffer.new(id)) - -template toOpenArray*(x: DbKey): openArray[char] = - x.data.toOpenArray(char) diff --git a/datastore/fsds.nim b/datastore/fsds.nim index 65f5cdd4..e3a6464a 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -2,12 +2,12 @@ import std/os import std/options import std/strutils -import pkg/chronos import pkg/questionable import pkg/questionable/results from pkg/stew/results as stewResults import get, isErr import pkg/upraises +import ./backend import ./datastore export datastore @@ -15,22 +15,23 @@ export datastore push: {.upraises: [].} type - FSDatastore* = ref object of Datastore - root*: string + FSDatastore*[K, V] = object + root*: DataBuffer ignoreProtected: bool depth: int +proc isRootSubdir*(root, path: string): bool = + path.startsWith(root) + proc validDepth*(self: FSDatastore, key: Key): bool = key.len <= self.depth -proc isRootSubdir*(self: FSDatastore, path: string): bool = - path.startsWith(self.root) - -proc path*(self: FSDatastore, key: Key): ?!string = +proc findPath*[K,V](self: FSDatastore[K,V], key: K): ?!string = ## Return filename corresponding to the key ## or failure if the key doesn't correspond to a valid filename ## - + let root = $self.root + let key = Key.init($key).get() if not self.validDepth(key): return failure "Path has invalid depth!" @@ -53,22 +54,27 @@ proc path*(self: FSDatastore, key: Key): ?!string = segments.add(ns.field / ns.value) let - fullname = (self.root / segments.joinPath()) + fullname = (root / segments.joinPath()) .absolutePath() .catch() .get() .addFileExt(FileExt) - if not self.isRootSubdir(fullname): + if not root.isRootSubdir(fullname): return failure "Path is outside of `root` directory!" return success fullname -method has*(self: FSDatastore, key: Key): Future[?!bool] {.async.} = - return self.path(key).?fileExists() +proc has*[K,V](self: FSDatastore[K,V], key: K): ?!bool = + without path =? self.findPath(key), error: + return failure error + success path.fileExists() + +proc contains*[K](self: FSDatastore, key: K): bool = + return self.has(key).get() -method delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} = - without path =? self.path(key), error: +proc delete*[K,V](self: FSDatastore[K,V], key: K): ?!void = + without path =? self.findPath(key), error: return failure error if not path.fileExists(): @@ -81,14 +87,14 @@ method delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} = return success() -method delete*(self: FSDatastore, keys: seq[Key]): Future[?!void] {.async.} = +proc delete*[K,V](self: FSDatastore[K,V], keys: openArray[K]): ?!void = for key in keys: - if err =? (await self.delete(key)).errorOption: + if err =? self.delete(key).errorOption: return failure err return success() -proc readFile*(self: FSDatastore, path: string): ?!seq[byte] = +proc readFile[V](self: FSDatastore, path: string): ?!V = var file: File @@ -96,18 +102,24 @@ proc readFile*(self: FSDatastore, path: string): ?!seq[byte] = file.close if not file.open(path): - return failure "unable to open file!" + return failure "unable to open file! path: " & path try: let - size = file.getFileSize + size = file.getFileSize().int + when V is seq[byte]: + var bytes = newSeq[byte](size) + elif V is V: + var bytes = V.new(size=size) + else: + {.error: "unhandled result type".} var - bytes = newSeq[byte](size) read = 0 + # echo "BYTES: ", bytes.repr while read < size: - read += file.readBytes(bytes, read, size) + read += file.readBytes(bytes.toOpenArray(0, size-1), read, size) if read < size: return failure $read & " bytes were read from " & path & @@ -118,61 +130,71 @@ proc readFile*(self: FSDatastore, path: string): ?!seq[byte] = except CatchableError as e: return failure e -method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async.} = - without path =? self.path(key), error: +proc get*[K,V](self: FSDatastore[K,V], key: K): ?!V = + without path =? self.findPath(key), error: return failure error if not path.fileExists(): return failure( newException(DatastoreKeyNotFound, "Key doesn't exist")) - return self.readFile(path) + return readFile[V](self, path) -method put*( - self: FSDatastore, - key: Key, - data: seq[byte]): Future[?!void] {.async.} = +proc put*[K,V](self: FSDatastore[K,V], + key: K, + data: V + ): ?!void = - without path =? self.path(key), error: + without path =? self.findPath(key), error: return failure error try: + var data = data createDir(parentDir(path)) - writeFile(path, data) + writeFile(path, data.toOpenArray(0, data.len()-1)) except CatchableError as e: return failure e return success() -method put*( +proc put*[K,V]( self: FSDatastore, - batch: seq[BatchEntry]): Future[?!void] {.async.} = + batch: seq[DbBatchEntry[K, V]]): ?!void = for entry in batch: - if err =? (await self.put(entry.key, entry.data)).errorOption: + if err =? self.put(entry.key, entry.data).errorOption: return failure err return success() -proc dirWalker(path: string): iterator: string {.gcsafe.} = - var localPath {.threadvar.}: string - - localPath = path - return iterator(): string = - try: - for p in path.walkDirRec(yieldFilter = {pcFile}, relative = true): - yield p - except CatchableError as exc: - raise newException(Defect, exc.msg) +iterator dirIter(path: string): string {.gcsafe.} = + try: + for p in path.walkDirRec(yieldFilter = {pcFile}, relative = true): + yield p + except CatchableError as exc: + raise newException(Defect, exc.msg) -method close*(self: FSDatastore): Future[?!void] {.async.} = +proc close*[K,V](self: FSDatastore[K,V]): ?!void = return success() -method query*( - self: FSDatastore, - query: Query): Future[?!QueryIter] {.async.} = - - without path =? self.path(query.key), error: +type + FsQueryHandle*[K, V] = object + query*: DbQuery[K] + cancel*: bool + closed*: bool + env*: FsQueryEnv[K,V] + + FsQueryEnv*[K,V] = object + self: FSDatastore[K,V] + basePath: DataBuffer + +proc query*[K,V]( + self: FSDatastore[K,V], + query: DbQuery[K], +): Result[FsQueryHandle[K, V], ref CatchableError] = + + let key = query.key + without path =? self.findPath(key), error: return failure error let basePath = @@ -184,61 +206,58 @@ method query*( path.parentDir else: path.changeFileExt("") + + let env = FsQueryEnv[K,V](self: self, basePath: DataBuffer.new(basePath)) + success FsQueryHandle[K, V](query: query, env: env) - let - walker = dirWalker(basePath) - - var - iter = QueryIter.new() +proc close*[K,V](handle: var FsQueryHandle[K,V]) = + if not handle.closed: + handle.closed = true - var lock = newAsyncLock() # serialize querying under threads - proc next(): Future[?!QueryResponse] {.async.} = - defer: - if lock.locked: - lock.release() - - if lock.locked: - return failure (ref DatastoreError)(msg: "Should always await query features") - - let - path = walker() +iterator queryIter*[K, V]( + handle: var FsQueryHandle[K, V] +): ?!DbQueryResponse[K, V] = + let root = $(handle.env.self.root) + let basePath = $(handle.env.basePath) - if iter.finished: - return failure "iterator is finished" - - await lock.acquire() - - if finished(walker): - iter.finished = true - return success (Key.none, EmptyBytes) + for path in basePath.dirIter(): + if handle.cancel: + break var + basePath = $handle.env.basePath keyPath = basePath - keyPath.removePrefix(self.root) + keyPath.removePrefix(root) keyPath = keyPath / path.changeFileExt("") keyPath = keyPath.replace("\\", "/") let - key = Key.init(keyPath).expect("should not fail") + flres = (basePath / path).absolutePath().catch + if flres.isErr(): + yield DbQueryResponse[K,V].failure flres.error() + continue + + let + key = K.toKey($Key.init(keyPath).expect("valid key")) data = - if query.value: - self.readFile((basePath / path).absolutePath) - .expect("Should read file") + if handle.query.value: + let res = readFile[V](handle.env.self, flres.get) + if res.isErr(): + yield DbQueryResponse[K,V].failure res.error() + continue + res.get() else: - @[] - - return success (key.some, data) + V.new() - iter.next = next - return success iter + yield success (key.some, data) + handle.close() -proc new*( - T: type FSDatastore, - root: string, - depth = 2, - caseSensitive = true, - ignoreProtected = false): ?!T = +proc newFSDatastore*[K,V](root: string, + depth = 2, + caseSensitive = true, + ignoreProtected = false + ): ?!FSDatastore[K,V] = let root = ? ( block: @@ -248,7 +267,7 @@ proc new*( if not dirExists(root): return failure "directory does not exist: " & root - success T( - root: root, + success FSDatastore[K,V]( + root: DataBuffer.new root, ignoreProtected: ignoreProtected, depth: depth) diff --git a/datastore/sql.nim b/datastore/sql.nim index 28b1f3d0..cfe67ac6 100644 --- a/datastore/sql.nim +++ b/datastore/sql.nim @@ -60,10 +60,9 @@ method put*(self: SQLiteDatastore, method close*(self: SQLiteDatastore): Future[?!void] {.async.} = self.db.close() -method queryIter*( - self: SQLiteDatastore, - query: Query -): ?!(iterator(): ?!QueryResponse) = +method queryIter*(self: SQLiteDatastore, + query: Query + ): ?!(iterator(): ?!QueryResponse) = let dbquery = dbQuery( key= KeyId.new query.key.id(), @@ -75,7 +74,7 @@ method queryIter*( var qhandle = ? self.db.query(dbquery) let iter = iterator(): ?!QueryResponse = - for resp in qhandle.iter(): + for resp in qhandle.queryIter(): without qres =? resp, err: yield QueryResponse.failure err let k = qres.key.map() do(k: KeyId) -> Key: diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index feb0785b..79d4a31c 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -103,10 +103,17 @@ proc close*[K,V](self: SQLiteBackend[K,V]): ?!void = return success() +type + SqQueryHandle*[K, V] = object + query*: DbQuery[K] + cancel*: bool + closed*: bool + env*: RawStmtPtr + proc query*[K,V]( self: SQLiteBackend[K,V], query: DbQuery[K] -): Result[DbQueryHandle[K,V,RawStmtPtr], ref CatchableError] = +): Result[SqQueryHandle[K,V], ref CatchableError] = var queryStr = if query.value: @@ -151,16 +158,18 @@ proc query*[K,V]( if not (v == SQLITE_OK): return failure newException(DatastoreError, $sqlite3_errstr(v)) - success DbQueryHandle[K,V,RawStmtPtr](query: query, env: s) + success SqQueryHandle[K,V](query: query, env: s) -proc close*[K,V](handle: var DbQueryHandle[K,V,RawStmtPtr]) = +proc close*[K,V](handle: var SqQueryHandle[K,V]) = if not handle.closed: handle.closed = true discard sqlite3_reset(handle.env) discard sqlite3_clear_bindings(handle.env) handle.env.dispose() -iterator iter*[K, V](handle: var DbQueryHandle[K, V, RawStmtPtr]): ?!DbQueryResponse[K, V] = +iterator queryIter*[K, V]( + handle: var SqQueryHandle[K, V] +): ?!DbQueryResponse[K, V] = while not handle.cancel: let v = sqlite3_step(handle.env) @@ -227,7 +236,7 @@ proc newSQLiteBackend*[K,V]( success SQLiteBackend[K,V](db: ? SQLiteDsDb[K,V].open(path, flags)) -proc newSQLiteBackend*[K,V]( - db: SQLiteDsDb[K,V]): ?!SQLiteBackend[K,V] = +proc newSQLiteBackend*[K,V](db: SQLiteDsDb[K,V] + ): ?!SQLiteBackend[K,V] = success SQLiteBackend[K,V](db: db) diff --git a/datastore/threads/databuffer.nim b/datastore/threads/databuffer.nim index 5ea7b4be..5f80cbfe 100644 --- a/datastore/threads/databuffer.nim +++ b/datastore/threads/databuffer.nim @@ -47,14 +47,14 @@ template `==`*[T: char | byte](a: DataBuffer, b: openArray[T]): bool = elif a[].size != b.len: false else: a.hash() == b.hash() -proc new(tp: type DataBuffer, capacity: int = 0): DataBuffer = +proc new*(tp: type DataBuffer, size: int = 0): DataBuffer = ## allocate new buffer with given capacity ## newSharedPtr(DataBufferHolder( - buf: cast[typeof(result[].buf)](allocShared0(capacity)), - size: 0, - cap: capacity, + buf: cast[typeof(result[].buf)](allocShared0(size)), + size: size, + cap: size, )) proc new*[T: byte | char](tp: type DataBuffer, data: openArray[T], opts: set[DataBufferOpt] = {}): DataBuffer = @@ -130,9 +130,12 @@ converter toBuffer*(err: ref CatchableError): DataBuffer = return DataBuffer.new(err.msg) -template toOpenArray*[T: byte | char](data: DataBuffer, t: typedesc[T]): openArray[T] = +template toOpenArray*[T: byte | char](data: var DataBuffer, t: typedesc[T]): var openArray[T] = ## get openArray from DataBuffer as char ## ## this is explicit since sqlite treats string differently from openArray[byte] - let bf = cast[ptr UncheckedArray[T]](data[].buf) + var bf = cast[ptr UncheckedArray[T]](data[].buf) bf.toOpenArray(0, data[].size-1) + +template toOpenArray*(data: var DataBuffer, first, last: int): var openArray[byte] = + toOpenArray(data, byte).toOpenArray(first, last) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index dcc16800..e1a75718 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -23,6 +23,7 @@ import ../key import ../query import ../datastore import ../backend +import ../fsds import ../sql/sqliteds import ./asyncsemaphore @@ -36,57 +37,48 @@ logScope: type - ThreadBackendKinds* = enum - Sqlite - # Filesystem - - ThreadBackend* = object - ## backend case type to avoid needing to make ThreadDatastore generic - case kind*: ThreadBackendKinds - of Sqlite: - sql*: SQLiteBackend[KeyId,DataBuffer] - TaskCtxObj*[T: ThreadTypes] = object - res: ThreadResult[T] + res*: ThreadResult[T] signal: ThreadSignalPtr - running: bool ## used to mark when a task worker is running - cancelled: bool ## used to cancel a task before it's started + running*: bool ## used to mark when a task worker is running + cancelled*: bool ## used to cancel a task before it's started + nextSignal: ThreadSignalPtr TaskCtx*[T] = SharedPtr[TaskCtxObj[T]] ## Task context object. ## This is a SharedPtr to make the query iter simpler - ThreadDatastore* = ref object of Datastore + ThreadDatastore*[BT] = ref object of Datastore tp: Taskpool - backend: ThreadBackend + backend: BT semaphore: AsyncSemaphore # semaphore is used for backpressure \ # to avoid exhausting file descriptors -var ctxLock: Lock -ctxLock.initLock() +proc newTaskCtx*[T](tp: typedesc[T], + signal: ThreadSignalPtr, + nextSignal: ThreadSignalPtr = nil): TaskCtx[T] = + newSharedPtr(TaskCtxObj[T](signal: signal, nextSignal: nextSignal)) proc setCancelled[T](ctx: TaskCtx[T]) = - # withLock(ctxLock): ctx[].cancelled = true proc setRunning[T](ctx: TaskCtx[T]): bool = - # withLock(ctxLock): if ctx[].cancelled: return false ctx[].running = true return true proc setDone[T](ctx: TaskCtx[T]) = - # withLock(ctxLock): ctx[].running = false proc acquireSignal(): ?!ThreadSignalPtr = + # echo "signal:OPEN!" let signal = ThreadSignalPtr.new() if signal.isErr(): failure (ref CatchableError)(msg: "failed to aquire ThreadSignalPtr: " & signal.error()) else: success signal.get() -template executeTask[T](ctx: TaskCtx[T], blk: untyped) = +template executeTask*[T](ctx: TaskCtx[T], blk: untyped) = ## executes a task on a thread work and handles cleanup after cancels/errors ## try: @@ -113,74 +105,76 @@ template executeTask[T](ctx: TaskCtx[T], blk: untyped) = ctx.setDone() discard ctx[].signal.fireSync() -template dispatchTaskWrap[T](self: ThreadDatastore, +template dispatchTaskWrap[BT](self: ThreadDatastore[BT], + signal: ThreadSignalPtr, + blk: untyped + ): auto = + var ds {.used, inject.} = self.backend + proc runTask() = + `blk` + runTask() + await wait(ctx[].signal) + +template dispatchTask*[BT](self: ThreadDatastore[BT], signal: ThreadSignalPtr, blk: untyped - ): auto = - case self.backend.kind: - of Sqlite: - var ds {.used, inject.} = self.backend.sql - proc runTask() = - `blk` - runTask() - await wait(ctx[].signal) - -template dispatchTask[T](self: ThreadDatastore, - signal: ThreadSignalPtr, - blk: untyped - ): auto = + ): auto = ## handles dispatching a task from an async context ## `blk` is the actions, it has `ctx` and `ds` variables in scope. ## note that `ds` is a generic - let ctx {.inject.} = newSharedPtr(TaskCtxObj[T](signal: signal)) try: - dispatchTaskWrap[T](self, signal, blk) + dispatchTaskWrap[BT](self, signal, blk) except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg ctx.setCancelled() raise exc + except CatchableError as exc: + ctx.setCancelled() + raise exc finally: discard ctx[].signal.close() self.semaphore.release() - proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} = ## run backend command executeTask(ctx): has(ds, key) -method has*(self: ThreadDatastore, - key: Key): Future[?!bool] {.async.} = +method has*[BT](self: ThreadDatastore[BT], + key: Key): Future[?!bool] {.async.} = await self.semaphore.acquire() - without signal =? acquireSignal(), err: - return failure err + let signal = acquireSignal().get() + # without signal =? acquireSignal(), err: + # return failure err - let key = KeyId.new key.id() - dispatchTask[bool](self, signal): + let ctx = newTaskCtx(bool, signal=signal) + dispatchTask(self, signal): + let key = KeyId.new key.id() self.tp.spawn hasTask(ctx, ds, key) return ctx[].res.toRes(v => v) - proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB; key: KeyId) {.gcsafe.} = ## run backend command executeTask(ctx): delete(ds, key) -method delete*(self: ThreadDatastore, +method delete*[BT](self: ThreadDatastore[BT], key: Key): Future[?!void] {.async.} = ## delete key await self.semaphore.acquire() - without signal =? acquireSignal(), err: - return failure err + let signal = acquireSignal().get() + # without signal =? acquireSignal(), err: + # return failure err - let key = KeyId.new key.id() - dispatchTask[void](self, signal): + let ctx = newTaskCtx(void, signal=signal) + dispatchTask(self, signal): + let key = KeyId.new key.id() self.tp.spawn deleteTask(ctx, ds, key) return ctx[].res.toRes() -method delete*(self: ThreadDatastore, +method delete*[BT](self: ThreadDatastore[BT], keys: seq[Key]): Future[?!void] {.async.} = ## delete batch for key in keys: @@ -196,23 +190,25 @@ proc putTask[T, DB](ctx: TaskCtx[T], ds: DB; executeTask(ctx): put(ds, key, data) -method put*(self: ThreadDatastore, +method put*[BT](self: ThreadDatastore[BT], key: Key, data: seq[byte]): Future[?!void] {.async.} = ## put key with data await self.semaphore.acquire() - without signal =? acquireSignal(), err: - return failure err + let signal = acquireSignal().get() + # without signal =? acquireSignal(), err: + # return failure err - dispatchTask[void](self, signal): + let ctx = newTaskCtx(void, signal=signal) + dispatchTask(self, signal): let key = KeyId.new key.id() let data = DataBuffer.new data self.tp.spawn putTask(ctx, ds, key, data) return ctx[].res.toRes() -method put*( - self: ThreadDatastore, +method put*[DB]( + self: ThreadDatastore[DB], batch: seq[BatchEntry]): Future[?!void] {.async.} = ## put batch data for entry in batch: @@ -229,41 +225,41 @@ proc getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB; let res = get(ds, key) res -method get*(self: ThreadDatastore, - key: Key, - ): Future[?!seq[byte]] {.async.} = +method get*[BT](self: ThreadDatastore[BT], + key: Key, + ): Future[?!seq[byte]] {.async.} = await self.semaphore.acquire() - without signal =? acquireSignal(), err: - return failure err + let signal = acquireSignal().get() + # without signal =? acquireSignal(), err: + # return failure err - let key = KeyId.new key.id() - dispatchTask[DataBuffer](self, signal): + let ctx = newTaskCtx(DataBuffer, signal=signal) + dispatchTask(self, signal): + let key = KeyId.new key.id() self.tp.spawn getTask(ctx, ds, key) return ctx[].res.toRes(v => v.toSeq()) -method close*(self: ThreadDatastore): Future[?!void] {.async.} = +method close*[BT](self: ThreadDatastore[BT]): Future[?!void] {.async.} = await self.semaphore.closeAll() - case self.backend.kind: - of Sqlite: - self.backend.sql.close() + self.backend.close() type QResult = DbQueryResponse[KeyId, DataBuffer] -import os - proc queryTask[DB]( ctx: TaskCtx[QResult], ds: DB, query: DbQuery[KeyId], - nextSignal: ThreadSignalPtr -) {.gcsafe, nimcall.} = +) = ## run query command + mixin queryIter executeTask(ctx): # we execute this all inside `executeTask` # so we need to return a final result - let handleRes = ds.query(query) + let handleRes = query(ds, query) + static: + echo "HANDLE_RES: ", typeof(handleRes) if handleRes.isErr(): # set error and exit executeTask, which will fire final signal (?!QResult).err(handleRes.error()) @@ -271,11 +267,13 @@ proc queryTask[DB]( # otherwise manually an set empty ok result ctx[].res.ok (KeyId.none, DataBuffer(), ) discard ctx[].signal.fireSync() - if not nextSignal.waitSync(10.seconds).get(): - raise newException(DeadThreadDefect, "query task timeout; possible deadlock!") + if not ctx[].nextSignal.waitSync(10.seconds).get(): + raise newException(DeadThreadDefect, "queryTask timed out") var handle = handleRes.get() - for item in handle.iter(): + static: + echo "HANDLE: ", typeof(handle) + for item in handle.queryIter(): # wait for next request from async thread if ctx[].cancelled: @@ -287,23 +285,33 @@ proc queryTask[DB]( exc discard ctx[].signal.fireSync() - - discard nextSignal.waitSync().get() + if not ctx[].nextSignal.waitSync(10.seconds).get(): + raise newException(DeadThreadDefect, "queryTask timed out") # set final result (?!QResult).ok((KeyId.none, DataBuffer())) -method query*(self: ThreadDatastore, +method query*[BT](self: ThreadDatastore[BT], q: Query ): Future[?!QueryIter] {.async.} = ## performs async query ## keeps one thread running queryTask until finished ## await self.semaphore.acquire() - without signal =? acquireSignal(), err: - return failure err - without nextSignal =? acquireSignal(), err: - return failure err + let signal = acquireSignal().get() + # without signal =? acquireSignal(), err: + # return failure err + let nextSignal = acquireSignal().get() + # without nextSignal =? acquireSignal(), err: + # return failure err + let ctx = newTaskCtx(QResult, signal=signal, nextSignal=nextSignal) + + proc iterDispose() {.async.} = + ctx.setCancelled() + await ctx[].nextSignal.fire() + discard ctx[].signal.close() + discard ctx[].nextSignal.close() + self.semaphore.release() try: let query = dbQuery( @@ -311,16 +319,17 @@ method query*(self: ThreadDatastore, value=q.value, limit=q.limit, offset=q.offset, sort=q.sort) # setup initial queryTask - let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal)) - dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal): - self.tp.spawn queryTask(ctx, ds, query, nextSignal) - await nextSignal.fire() + dispatchTaskWrap(self, signal): + self.tp.spawn queryTask(ctx, ds, query) + await ctx[].nextSignal.fire() - var - lock = newAsyncLock() # serialize querying under threads - iter = QueryIter.new() + var lock = newAsyncLock() # serialize querying under threads + var iter = QueryIter.new() + iter.dispose = proc (): Future[?!void] {.async.} = + iterDispose() + success() - proc next(): Future[?!QueryResponse] {.async.} = + iter.next = proc(): Future[?!QueryResponse] {.async.} = let ctx = ctx try: trace "About to query" @@ -330,12 +339,11 @@ method query*(self: ThreadDatastore, return failure (ref QueryEndedError)(msg: "Calling next on a finished query!") await wait(ctx[].signal) - if not ctx[].running: iter.finished = true defer: - await nextSignal.fire() + await ctx[].nextSignal.fire() if ctx[].res.isErr(): return err(ctx[].res.error()) @@ -347,34 +355,25 @@ method query*(self: ThreadDatastore, except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg ctx.setCancelled() - discard ctx[].signal.close() - discard nextSignal.close() - self.semaphore.release() + await iterDispose() # todo: is this valid? raise exc - iter.next = next return success iter except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg - discard signal.close() - discard nextSignal.close() - self.semaphore.release() + ctx.setCancelled() + await iterDispose() raise exc proc new*[DB](self: type ThreadDatastore, db: DB, withLocks = static false, tp: Taskpool - ): ?!ThreadDatastore = + ): ?!ThreadDatastore[DB] = doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads" - when DB is SQLiteBackend[KeyId,DataBuffer]: - let backend = ThreadBackend(kind: Sqlite, sql: db) - else: - {.error: "unsupported backend: " & $typeof(db).} - - success ThreadDatastore( + success ThreadDatastore[DB]( tp: tp, - backend: backend, + backend: db, semaphore: AsyncSemaphore.new(tp.numThreads - 1) ) diff --git a/datastore/threads/threadresult.nim b/datastore/threads/threadresult.nim index da0df6dc..3508e9ee 100644 --- a/datastore/threads/threadresult.nim +++ b/datastore/threads/threadresult.nim @@ -1,5 +1,6 @@ import std/atomics import std/options +import std/locks import pkg/questionable/results import pkg/results @@ -51,3 +52,24 @@ proc toRes*[T,S](res: ThreadResult[T], result.err res.error().toExc() else: result.ok m(res.get()) + +type + MutexSignal* = tuple[lock: Lock, cond: Cond, open: bool] + +proc init*(sig: var MutexSignal) = + sig.lock.initLock() + sig.cond.initCond() + sig.open = true + +proc wait*(sig: var MutexSignal) = + withLock(sig.lock): + wait(sig.cond, sig.lock) + +proc fire*(sig: var MutexSignal) = + withLock(sig.lock): + signal(sig.cond) + +proc close*(sig: var MutexSignal) = + if sig.open: + sig.lock.deinitLock() + sig.cond.deinitCond() diff --git a/tests/datastore/backendCommonTests.nim b/tests/datastore/backendCommonTests.nim new file mode 100644 index 00000000..f35fd328 --- /dev/null +++ b/tests/datastore/backendCommonTests.nim @@ -0,0 +1,307 @@ + +template testBasicBackend*[K, V, DB]( + ds: DB, + key: K, + bytes: V, + otherBytes: V, + batch: untyped, + extended = true +): untyped = + + test "put": + ds.put(key, bytes).tryGet() + + test "get": + check: + ds.get(key).tryGet() == bytes + + test "put update": + ds.put(key, otherBytes).tryGet() + + test "get updated": + check: + ds.get(key).tryGet() == otherBytes + + test "delete": + ds.delete(key).tryGet() + + test "contains": + check key notin ds + + test "put batch": + + ds.put(batch).tryGet + + for (k, v) in batch: + check: ds.has(k).tryGet + + test "delete batch": + var keys: seq[K] + for (k, v) in batch: + keys.add(k) + + ds.delete(keys).tryGet + + for (k, v) in batch: + check: not ds.has(k).tryGet + + test "handle missing key": + when K is KeyId: + let key = KeyId.new Key.init("/missing/key").tryGet().id() + elif K is string: + let key = $KeyId.new Key.init("/missing/key").tryGet().id() + + expect(DatastoreKeyNotFound): + discard ds.get(key).tryGet() # non existing key + +template queryTests*( + dsNew: untyped, + key1, key2, key3: untyped, + val1, val2, val3: untyped, + extended = true +) = + + setup: + let + ds = dsNew() + key1 = key1 + key2 = key2 + key3 = key3 + val1 = val1 + val2 = val2 + val3 = val3 + + test "Key should query all keys and all it's children": + let + q = dbQuery(key=key1, value=true) + + ds.put(key1, val1).tryGet + ds.put(key2, val2).tryGet + ds.put(key3, val3).tryGet + + var + handle = ds.query(q).tryGet + res = handle.queryIter().toSeq().mapIt(it.tryGet()) + + check: + res.len == 3 + res[0].key.get == key1 + res[0].data == val1 + + res[1].key.get == key2 + res[1].data == val2 + + res[2].key.get == key3 + res[2].data == val3 + + test "query should cancel": + let + q = dbQuery(key= key1, value= true) + + ds.put(key1, val1).tryGet + ds.put(key2, val2).tryGet + ds.put(key3, val3).tryGet + + var + handle = ds.query(q).tryGet + + var res: seq[DbQueryResponse[KeyId, DataBuffer]] + var cnt = 0 + for item in handle.queryIter(): + cnt.inc + res.insert(item.tryGet(), 0) + if cnt > 1: + handle.cancel = true + + check: + handle.cancel == true + handle.closed == true + res.len == 2 + + res[0].key.get == key2 + res[0].data == val2 + + res[1].key.get == key1 + res[1].data == val1 + + test "Key should query all keys without values": + let + q = dbQuery(key= key1, value= false) + + ds.put(key1, val1).tryGet + ds.put(key2, val2).tryGet + ds.put(key3, val3).tryGet + + var + handle = ds.query(q).tryGet + let + res = handle.queryIter().toSeq().mapIt(it.tryGet()) + + check: + res.len == 3 + res[0].key.get == key1 + res[0].data.len == 0 + + res[1].key.get == key2 + res[1].data.len == 0 + + res[2].key.get == key3 + res[2].data.len == 0 + + + test "Key should not query parent": + let + q = dbQuery(key= key2, value= true) + + ds.put(key1, val1).tryGet + ds.put(key2, val2).tryGet + ds.put(key3, val3).tryGet + + var + handle = ds.query(q).tryGet + let + res = handle.queryIter().toSeq().mapIt(it.tryGet()) + # ).filterIt(it.isOk).mapIt(it.tryGet()) + + check: + res.len == 2 + res[0].key.get == key2 + res[0].data == val2 + + res[1].key.get == key3 + res[1].data == val3 + + test "Key should all list all keys at the same level": + let + queryKey = Key.init("/a").tryGet + q = dbQuery(key= key1, value= true) + + ds.put(key1, val1).tryGet + ds.put(key2, val2).tryGet + ds.put(key3, val3).tryGet + + var + handle = ds.query(q).tryGet + res = handle.queryIter().toSeq().mapIt(it.tryGet()) + + res.sort do (a, b: DbQueryResponse[KeyId, DataBuffer]) -> int: + cmp($a.key.get, $b.key.get) + + check: + res.len == 3 + res[0].key.get == key1 + res[0].data == val1 + + res[1].key.get == key2 + res[1].data == val2 + + res[2].key.get == key3 + res[2].data == val3 + + if extended: + test "Should apply limit": + let + key = Key.init("/a").tryGet + q = dbQuery(key= key1, limit= 10, value= false) + + for i in 0..<100: + let + key = KeyId.new $Key.init(key, Key.init("/" & $i).tryGet).tryGet + val = DataBuffer.new("val " & $i) + + ds.put(key, val).tryGet + + var + handle = ds.query(q).tryGet + let + res = handle.queryIter().toSeq().mapIt(it.tryGet()) + + check: + res.len == 10 + + test "Should not apply offset": + let + key = Key.init("/a").tryGet + keyId = KeyId.new $key + q = dbQuery(key= keyId, offset= 90) + + for i in 0..<100: + let + key = KeyId.new $Key.init(key, Key.init("/" & $i).tryGet).tryGet + val = DataBuffer.new("val " & $i) + + ds.put(key, val).tryGet + + var + qr = ds.query(q) + # echo "RES: ", qr.repr + + var + handle = ds.query(q).tryGet + let + res = handle.queryIter().toSeq().mapIt(it.tryGet()) + + # echo "RES: ", res.mapIt(it.key) + check: + res.len == 10 + + test "Should not apply offset and limit": + let + key = Key.init("/a").tryGet + keyId = KeyId.new $key + q = dbQuery(key= keyId, offset= 95, limit= 5) + + for i in 0..<100: + let + key = KeyId.new $Key.init(key, Key.init("/" & $i).tryGet).tryGet + val = DataBuffer.new("val " & $i) + + ds.put(key, val).tryGet + + var + handle = ds.query(q).tryGet + res = handle.queryIter().toSeq().mapIt(it.tryGet()) + + check: + res.len == 5 + + for i in 0.. int: + cmp($a.key.get, $b.key.get) + + kvs = kvs.reversed + var + handle = ds.query(q).tryGet + res = handle.queryIter().toSeq().mapIt(it.tryGet()) + + check: + res.len == 100 + + for i, r in res[1..^1]: + check: + res[i].key.get == kvs[i].key.get + res[i].data == kvs[i].data \ No newline at end of file diff --git a/tests/datastore/querycommontests.nim b/tests/datastore/querycommontests.nim index 460804b7..0bba6e45 100644 --- a/tests/datastore/querycommontests.nim +++ b/tests/datastore/querycommontests.nim @@ -1,11 +1,4 @@ -import std/options -import std/sequtils -from std/algorithm import sort, reversed - -import pkg/asynctest -import pkg/chronos -import pkg/stew/results -import pkg/stew/byteutils + import pkg/datastore diff --git a/tests/datastore/sql/testsqliteds.nim b/tests/datastore/sql/testsqliteds.nim index 1309dcd3..e52d90b1 100644 --- a/tests/datastore/sql/testsqliteds.nim +++ b/tests/datastore/sql/testsqliteds.nim @@ -11,62 +11,8 @@ import pkg/stew/byteutils import pkg/datastore/sql/sqliteds import pkg/datastore/key -import ../dscommontests +import ../backendCommonTests -proc testBasic[K, V]( - ds: SQLiteBackend[K,V], - key: K, - bytes: V, - otherBytes: V, - batch: seq[DbBatchEntry[K, V]], - extended = true -) = - - test "put": - ds.put(key, bytes).tryGet() - - test "get": - check: - ds.get(key).tryGet() == bytes - - test "put update": - ds.put(key, otherBytes).tryGet() - - test "get updated": - check: - ds.get(key).tryGet() == otherBytes - - test "delete": - ds.delete(key).tryGet() - - test "contains": - check key notin ds - - test "put batch": - - ds.put(batch).tryGet - - for (k, v) in batch: - check: ds.has(k).tryGet - - test "delete batch": - var keys: seq[K] - for (k, v) in batch: - keys.add(k) - - ds.delete(keys).tryGet - - for (k, v) in batch: - check: not ds.has(k).tryGet - - test "handle missing key": - when K is KeyId: - let key = KeyId.new Key.init("/missing/key").tryGet().id() - elif K is string: - let key = $KeyId.new Key.init("/missing/key").tryGet().id() - - expect(DatastoreKeyNotFound): - discard ds.get(key).tryGet() # non existing key suite "Test Basic SQLiteDatastore": let @@ -84,7 +30,7 @@ suite "Test Basic SQLiteDatastore": suiteTeardown: ds.close().tryGet() - testBasic(ds, key, bytes, otherBytes, batch) + testBasicBackend(ds, key, bytes, otherBytes, batch) suite "Test DataBuffer SQLiteDatastore": let @@ -102,249 +48,18 @@ suite "Test DataBuffer SQLiteDatastore": suiteTeardown: ds.close().tryGet() - testBasic(ds, key, bytes, otherBytes, batch) + testBasicBackend(ds, key, bytes, otherBytes, batch) suite "queryTests": - setup: - let - ds = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - key1 = KeyId.new "/a" - key2 = KeyId.new "/a/b" - key3 = KeyId.new "/a/b/c" - val1 = DataBuffer.new "value for 1" - val2 = DataBuffer.new "value for 2" - val3 = DataBuffer.new "value for 3" - - test "Key should query all keys and all it's children": - let - q = dbQuery(key=key1, value=true) - - ds.put(key1, val1).tryGet - ds.put(key2, val2).tryGet - ds.put(key3, val3).tryGet - - var - handle = ds.query(q).tryGet - res = handle.iter().toSeq().mapIt(it.tryGet()) - - check: - res.len == 3 - res[0].key.get == key1 - res[0].data == val1 - - res[1].key.get == key2 - res[1].data == val2 - - res[2].key.get == key3 - res[2].data == val3 - - test "query should cancel": - let - q = dbQuery(key= key1, value= true) - - ds.put(key1, val1).tryGet - ds.put(key2, val2).tryGet - ds.put(key3, val3).tryGet - - var - handle = ds.query(q).tryGet - - var res: seq[DbQueryResponse[KeyId, DataBuffer]] - var cnt = 0 - for item in handle.iter(): - cnt.inc - res.insert(item.tryGet(), 0) - if cnt > 1: - handle.cancel = true - - check: - handle.cancel == true - handle.closed == true - res.len == 2 - - res[0].key.get == key2 - res[0].data == val2 - - res[1].key.get == key1 - res[1].data == val1 - - test "Key should query all keys without values": - let - q = dbQuery(key= key1, value= false) - - ds.put(key1, val1).tryGet - ds.put(key2, val2).tryGet - ds.put(key3, val3).tryGet - - var - handle = ds.query(q).tryGet - let - res = handle.iter().toSeq().mapIt(it.tryGet()) - - check: - res.len == 3 - res[0].key.get == key1 - res[0].data.len == 0 - - res[1].key.get == key2 - res[1].data.len == 0 - - res[2].key.get == key3 - res[2].data.len == 0 - - - test "Key should not query parent": - let - q = dbQuery(key= key2, value= true) - - ds.put(key1, val1).tryGet - ds.put(key2, val2).tryGet - ds.put(key3, val3).tryGet - - var - handle = ds.query(q).tryGet - let - res = handle.iter().toSeq().mapIt(it.tryGet()) - - check: - res.len == 2 - res[0].key.get == key2 - res[0].data == val2 - - res[1].key.get == key3 - res[1].data == val3 - - test "Key should all list all keys at the same level": - let - queryKey = Key.init("/a").tryGet - q = dbQuery(key= key1, value= true) - - ds.put(key1, val1).tryGet - ds.put(key2, val2).tryGet - ds.put(key3, val3).tryGet - - var - handle = ds.query(q).tryGet - res = handle.iter().toSeq().mapIt(it.tryGet()) - - res.sort do (a, b: DbQueryResponse[KeyId, DataBuffer]) -> int: - cmp($a.key.get, $b.key.get) - - check: - res.len == 3 - res[0].key.get == key1 - res[0].data == val1 - - res[1].key.get == key2 - res[1].data == val2 - - res[2].key.get == key3 - res[2].data == val3 - - test "Should apply limit": - let - key = Key.init("/a").tryGet - q = dbQuery(key= key1, limit= 10, value= false) - - for i in 0..<100: - let - key = KeyId.new $Key.init(key, Key.init("/" & $i).tryGet).tryGet - val = DataBuffer.new("val " & $i) - - ds.put(key, val).tryGet - - var - handle = ds.query(q).tryGet - let - res = handle.iter().toSeq().mapIt(it.tryGet()) - - check: - res.len == 10 - - test "Should not apply offset": - let - key = Key.init("/a").tryGet - keyId = KeyId.new $key - q = dbQuery(key= keyId, offset= 90) - - for i in 0..<100: - let - key = KeyId.new $Key.init(key, Key.init("/" & $i).tryGet).tryGet - val = DataBuffer.new("val " & $i) - - ds.put(key, val).tryGet - - var - qr = ds.query(q) - # echo "RES: ", qr.repr - - var - handle = ds.query(q).tryGet - let - res = handle.iter().toSeq().mapIt(it.tryGet()) - - # echo "RES: ", res.mapIt(it.key) - check: - res.len == 10 - - test "Should not apply offset and limit": - let - key = Key.init("/a").tryGet - keyId = KeyId.new $key - q = dbQuery(key= keyId, offset= 95, limit= 5) - - for i in 0..<100: - let - key = KeyId.new $Key.init(key, Key.init("/" & $i).tryGet).tryGet - val = DataBuffer.new("val " & $i) - - ds.put(key, val).tryGet - - var - handle = ds.query(q).tryGet - res = handle.iter().toSeq().mapIt(it.tryGet()) - - check: - res.len == 5 - - for i in 0.. int: - cmp($a.key.get, $b.key.get) - - kvs = kvs.reversed - var - handle = ds.query(q).tryGet - res = handle.iter().toSeq().mapIt(it.tryGet()) - - check: - res.len == 100 - - for i, r in res[1..^1]: - check: - res[i].key.get == kvs[i].key.get - res[i].data == kvs[i].data + let + dsNew = proc(): SQLiteBackend[KeyId, DataBuffer] = + newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() + key1 = KeyId.new "/a" + key2 = KeyId.new "/a/b" + key3 = KeyId.new "/a/b/c" + val1 = DataBuffer.new "value for 1" + val2 = DataBuffer.new "value for 2" + val3 = DataBuffer.new "value for 3" + + queryTests(dsNew, key1, key2, key3, val1, val2, val3, extended=true) diff --git a/tests/datastore/testfsds.nim b/tests/datastore/testfsds.nim index 26336772..1d954140 100644 --- a/tests/datastore/testfsds.nim +++ b/tests/datastore/testfsds.nim @@ -3,15 +3,44 @@ import std/sequtils import std/os from std/algorithm import sort, reversed -import pkg/asynctest +import pkg/unittest2 import pkg/chronos import pkg/stew/results import pkg/stew/byteutils import pkg/datastore/fsds +import pkg/datastore/key +import pkg/datastore/backend + +import ./backendCommonTests -import ./dscommontests -import ./querycommontests + +suite "Test Basic FSDatastore": + let + path = currentSourcePath() # get this file's name + basePath = "tests_data" + basePathAbs = path.parentDir / basePath + keyFull = Key.init("/a/b").tryGet() + key = KeyId.new keyFull.id() + bytes = DataBuffer.new "some bytes" + otherBytes = DataBuffer.new "some other bytes".toBytes + + var batch: seq[tuple[key: KeyId, data: DataBuffer]] + for k in 0..<100: + let kk = Key.init($keyFull, $k).tryGet().id() + batch.add( (KeyId.new kk, DataBuffer.new @[k.byte]) ) + + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) + createDir(basePathAbs) + + var + fsStore = newFSDatastore[KeyId, DataBuffer](root = basePathAbs, depth = 3).tryGet() + + testBasicBackend(fsStore, key, bytes, otherBytes, batch) + + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) suite "Test Basic FSDatastore": let @@ -22,21 +51,22 @@ suite "Test Basic FSDatastore": bytes = "some bytes".toBytes otherBytes = "some other bytes".toBytes - var - fsStore: FSDatastore + var batch: seq[tuple[key: Key, data: seq[byte]]] + for k in 0..<100: + let kk = Key.init($key, $k).tryGet() + batch.add( (kk, @[k.byte]) ) - setupAll: - removeDir(basePathAbs) - require(not dirExists(basePathAbs)) - createDir(basePathAbs) + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) + createDir(basePathAbs) - fsStore = FSDatastore.new(root = basePathAbs, depth = 3).tryGet() + var + fsStore = newFSDatastore[Key, seq[byte]](root = basePathAbs, depth = 3).tryGet() - teardownAll: - removeDir(basePathAbs) - require(not dirExists(basePathAbs)) + testBasicBackend(fsStore, key, bytes, otherBytes, batch) - basicStoreTests(fsStore, key, bytes, otherBytes) + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) suite "Test Misc FSDatastore": let @@ -56,7 +86,7 @@ suite "Test Misc FSDatastore": test "Test validDepth()": let - fs = FSDatastore.new(root = "/", depth = 3).tryGet() + fs = newFSDatastore[Key, seq[byte]](root = basePathAbs, depth = 3).tryGet() invalid = Key.init("/a/b/c/d").tryGet() valid = Key.init("/a/b/c").tryGet() @@ -66,40 +96,40 @@ suite "Test Misc FSDatastore": test "Test invalid key (path) depth": let - fs = FSDatastore.new(root = basePathAbs, depth = 3).tryGet() + fs = newFSDatastore[Key, seq[byte]](root = basePathAbs, depth = 3).tryGet() key = Key.init("/a/b/c/d").tryGet() check: - (await fs.put(key, bytes)).isErr - (await fs.get(key)).isErr - (await fs.delete(key)).isErr - (await fs.has(key)).isErr + (fs.put(key, bytes)).isErr + (fs.get(key)).isErr + (fs.delete(key)).isErr + (fs.has(key)).isErr test "Test valid key (path) depth": let - fs = FSDatastore.new(root = basePathAbs, depth = 3).tryGet() + fs = newFSDatastore[Key, seq[byte]](root = basePathAbs, depth = 3).tryGet() key = Key.init("/a/b/c").tryGet() check: - (await fs.put(key, bytes)).isOk - (await fs.get(key)).isOk - (await fs.delete(key)).isOk - (await fs.has(key)).isOk + (fs.put(key, bytes)).isOk + (fs.get(key)).isOk + (fs.delete(key)).isOk + (fs.has(key)).isOk test "Test key cannot write outside of root": let - fs = FSDatastore.new(root = basePathAbs, depth = 3).tryGet() + fs = newFSDatastore[Key, seq[byte]](root = basePathAbs, depth = 3).tryGet() key = Key.init("/a/../../c").tryGet() check: - (await fs.put(key, bytes)).isErr - (await fs.get(key)).isErr - (await fs.delete(key)).isErr - (await fs.has(key)).isErr + (fs.put(key, bytes)).isErr + (fs.get(key)).isErr + (fs.delete(key)).isErr + (fs.has(key)).isErr test "Test key cannot convert to invalid path": let - fs = FSDatastore.new(root = basePathAbs).tryGet() + fs = newFSDatastore[Key, seq[byte]](root = basePathAbs).tryGet() for c in invalidFilenameChars: if c == ':': continue @@ -109,30 +139,57 @@ suite "Test Misc FSDatastore": key = Key.init("/" & c).tryGet() check: - (await fs.put(key, bytes)).isErr - (await fs.get(key)).isErr - (await fs.delete(key)).isErr - (await fs.has(key)).isErr + (fs.put(key, bytes)).isErr + (fs.get(key)).isErr + (fs.delete(key)).isErr + (fs.has(key)).isErr -suite "Test Query": - let - path = currentSourcePath() # get this file's name - basePath = "tests_data" - basePathAbs = path.parentDir / basePath - var - ds: FSDatastore +# suite "Test Query": +# let +# path = currentSourcePath() # get this file's name +# basePath = "tests_data" +# basePathAbs = path.parentDir / basePath - setup: - removeDir(basePathAbs) - require(not dirExists(basePathAbs)) - createDir(basePathAbs) +# var +# ds: FSDatastore - ds = FSDatastore.new(root = basePathAbs, depth = 5).tryGet() +# setup: +# removeDir(basePathAbs) +# require(not dirExists(basePathAbs)) +# createDir(basePathAbs) - teardown: +# ds = FSDatastore.new(root = basePathAbs, depth = 5).tryGet() - removeDir(basePathAbs) - require(not dirExists(basePathAbs)) +# teardown: - queryTests(ds, false) +# removeDir(basePathAbs) +# require(not dirExists(basePathAbs)) + +# queryTests(ds, false) + +suite "queryTests": + + let + path = currentSourcePath() # get this file's name + basePath = "tests_data" + basePathAbs = path.parentDir / basePath + + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) + createDir(basePathAbs) + + let + fsNew = proc(): FSDatastore[KeyId, DataBuffer] = + newFSDatastore[KeyId, DataBuffer](root = basePathAbs, depth = 3).tryGet() + key1 = KeyId.new "/a" + key2 = KeyId.new "/a/b" + key3 = KeyId.new "/a/b/c" + val1 = DataBuffer.new "value for 1" + val2 = DataBuffer.new "value for 2" + val3 = DataBuffer.new "value for 3" + + queryTests(fsNew, key1, key2, key3, val1, val2, val3, extended=false) + + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) diff --git a/tests/datastore/testmountedds.nim b/tests/datastore/testmountedds.nim index fbe0dfdc..4e6a7af1 100644 --- a/tests/datastore/testmountedds.nim +++ b/tests/datastore/testmountedds.nim @@ -28,7 +28,8 @@ suite "Test Basic Mounted Datastore": var sql: SQLiteDatastore - fs: FSDatastore + sql2: SQLiteDatastore + # fs: FSDatastore mountedDs: MountedDatastore setupAll: @@ -37,10 +38,11 @@ suite "Test Basic Mounted Datastore": createDir(rootAbs) sql = SQLiteDatastore.new(Memory).tryGet - fs = FSDatastore.new(rootAbs, depth = 5).tryGet + sql2 = SQLiteDatastore.new(Memory).tryGet + # fs = FSDatastore.new(rootAbs, depth = 5).tryGet mountedDs = MountedDatastore.new({ sqlKey: Datastore(sql), - fsKey: Datastore(fs)}.toTable) + fsKey: Datastore(sql2)}.toTable) .tryGet teardownAll: diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 14a41b69..db20c819 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -14,240 +14,236 @@ import pkg/taskpools import pkg/questionable/results import pkg/chronicles import pkg/threading/smartptrs +import pkg/threading/atomics -import pkg/datastore/sql/sqliteds import pkg/datastore/fsds +import pkg/datastore/sql/sqliteds import pkg/datastore/threads/threadproxyds {.all.} import ./dscommontests import ./querycommontests -const NumThreads = 20 # IO threads aren't attached to CPU count +const + NumThreads = 20 # IO threads aren't attached to CPU count + ThreadTestLoops {.intdefine.} = 1 + N = ThreadTestLoops + ThreadTestInnerLoops {.intdefine.} = 1 + M = ThreadTestInnerLoops -suite "Test Basic ThreadProxyDatastore": - var - sqlStore: SQLiteBackend[KeyId,DataBuffer] - ds: ThreadDatastore - taskPool: Taskpool - key = Key.init("/a").tryGet() - data = "some bytes".toBytes +var + taskPool: Taskpool = Taskpool.new(NumThreads) - setupAll: - sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - taskPool = Taskpool.new(NumThreads) - ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() +for i in 1..N: + suite "Test Basic ThreadDatastore with SQLite " & $i: - teardownAll: - echo "teardown done" - - test "check put": - echo "\n\n=== put ===" - let res1 = await ds.put(key, data) - echo "res1: ", res1.repr - check res1.isOk - - test "check get": - echo "\n\n=== get ===" - echo "get send key: ", key.repr - let res2 = await ds.get(key) - echo "get key post: ", key.repr - echo "get res2: ", res2.repr - echo res2.get() == data - var val = "" - for c in res2.get(): - val &= char(c) - echo "get res2: ", $val - -suite "Test Basic ThreadDatastore with SQLite": + var + sqlStore: SQLiteBackend[KeyId, DataBuffer] + ds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]] + key = Key.init("/a/b").tryGet() + bytes = "some bytes".toBytes + otherBytes = "some other bytes".toBytes - var - sqlStore: SQLiteBackend[KeyId,DataBuffer] - ds: ThreadDatastore - taskPool: Taskpool + setupAll: + sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() + ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() + + teardown: + GC_fullCollect() + + teardownAll: + (await ds.close()).tryGet() + + for i in 1..M: + basicStoreTests(ds, key, bytes, otherBytes) + GC_fullCollect() + + +for i in 1..N: + suite "Test Query ThreadDatastore with SQLite " & $i: + + var + sqlStore: SQLiteBackend[KeyId, DataBuffer] + # taskPool: Taskpool + ds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]] + + setup: + sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() + # taskPool = Taskpool.new(NumThreads) + ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() + + teardown: + GC_fullCollect() + + (await ds.close()).tryGet() + + for i in 1..M: + queryTests(ds, true) + GC_fullCollect() + +suite "Test Basic ThreadDatastore with fsds": + let + path = currentSourcePath() # get this file's name + basePath = "tests_data" + basePathAbs = path.parentDir / basePath key = Key.init("/a/b").tryGet() bytes = "some bytes".toBytes otherBytes = "some other bytes".toBytes + var + fsStore: FSDatastore[KeyId, DataBuffer] + ds: ThreadDatastore[FSDatastore[KeyId, DataBuffer]] + setupAll: - sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - taskPool = Taskpool.new(NumThreads) - ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) + createDir(basePathAbs) + + fsStore = newFSDatastore[KeyId, DataBuffer](root = basePathAbs, depth = 3).tryGet() + ds = ThreadDatastore.new(fsStore, tp = taskPool).tryGet() teardown: GC_fullCollect() teardownAll: (await ds.close()).tryGet() - taskPool.shutdown() + + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) basicStoreTests(ds, key, bytes, otherBytes) -suite "Test Query ThreadDatastore with SQLite": +suite "Test Query ThreadDatastore with fsds": + let + path = currentSourcePath() # get this file's name + basePath = "tests_data" + basePathAbs = path.parentDir / basePath var - sqlStore: SQLiteBackend[KeyId,DataBuffer] - ds: ThreadDatastore - taskPool: Taskpool - key = Key.init("/a/b").tryGet() - bytes = "some bytes".toBytes - otherBytes = "some other bytes".toBytes + fsStore: FSDatastore[KeyId, DataBuffer] + ds: ThreadDatastore[FSDatastore[KeyId, DataBuffer]] setup: - sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - taskPool = Taskpool.new(NumThreads) - ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) + createDir(basePathAbs) + + fsStore = newFSDatastore[KeyId, DataBuffer](root = basePathAbs, depth = 5).tryGet() + ds = ThreadDatastore.new(fsStore, tp = taskPool).tryGet() teardown: GC_fullCollect() - (await ds.close()).tryGet() - taskPool.shutdown() - - queryTests(ds, true) - -# suite "Test Basic ThreadDatastore with fsds": -# let -# path = currentSourcePath() # get this file's name -# basePath = "tests_data" -# basePathAbs = path.parentDir / basePath -# key = Key.init("/a/b").tryGet() -# bytes = "some bytes".toBytes -# otherBytes = "some other bytes".toBytes - -# var -# fsStore: FSDatastore -# ds: ThreadDatastore -# taskPool: Taskpool - -# setupAll: -# removeDir(basePathAbs) -# require(not dirExists(basePathAbs)) -# createDir(basePathAbs) - -# fsStore = FSDatastore.new(root = basePathAbs, depth = 3).tryGet() -# taskPool = Taskpool.new(NumThreads) -# ds = ThreadDatastore.new(fsStore, withLocks = true, tp = taskPool).tryGet() - -# teardown: -# GC_fullCollect() - -# teardownAll: -# (await ds.close()).tryGet() -# taskPool.shutdown() - -# removeDir(basePathAbs) -# require(not dirExists(basePathAbs)) - -# basicStoreTests(fsStore, key, bytes, otherBytes) - -# suite "Test Query ThreadDatastore with fsds": -# let -# path = currentSourcePath() # get this file's name -# basePath = "tests_data" -# basePathAbs = path.parentDir / basePath - -# var -# fsStore: FSDatastore -# ds: ThreadDatastore -# taskPool: Taskpool - -# setup: -# removeDir(basePathAbs) -# require(not dirExists(basePathAbs)) -# createDir(basePathAbs) - -# fsStore = FSDatastore.new(root = basePathAbs, depth = 5).tryGet() -# taskPool = Taskpool.new(NumThreads) -# ds = ThreadDatastore.new(fsStore, withLocks = true, tp = taskPool).tryGet() - -# teardown: -# GC_fullCollect() -# (await ds.close()).tryGet() -# taskPool.shutdown() - -# removeDir(basePathAbs) -# require(not dirExists(basePathAbs)) - -# queryTests(ds, false) - -# suite "Test ThreadDatastore cancelations": -# var -# sqlStore: SQLiteBackend[KeyId,DataBuffer] -# ds: ThreadDatastore -# taskPool: Taskpool - -# privateAccess(ThreadDatastore) # expose private fields -# privateAccess(TaskCtx) # expose private fields - -# setupAll: -# sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() -# taskPool = Taskpool.new(NumThreads) -# ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() - -# teardown: -# GC_fullCollect() # run full collect after each test - -# teardownAll: -# (await ds.close()).tryGet() -# taskPool.shutdown() - - # test "Should monitor signal and cancel": - # var - # signal = ThreadSignalPtr.new().tryGet() - # res = ThreadResult[void]() - # ctx = newSharedPtr(TaskCtxObj[void](signal: signal)) - # fut = newFuture[void]("signalMonitor") - # threadArgs = (addr ctx, addr fut) - # thread: Thread[type threadArgs] - - # proc threadTask(args: type threadArgs) = - # var (ctx, fut) = args - # proc asyncTask() {.async.} = - # let - # monitor = signalMonitor(ctx, fut[]) - - # await monitor - - # waitFor asyncTask() - - # createThread(thread, threadTask, threadArgs) - # ctx.cancelled = true - # check: ctx.signal.fireSync.tryGet - - # joinThreads(thread) - - # check: fut.cancelled - # check: ctx.signal.close().isOk - # fut = nil - - # test "Should monitor and not cancel": - # var - # signal = ThreadSignalPtr.new().tryGet() - # res = ThreadResult[void]() - # ctx = TaskCtx[void]( - # ds: sqlStore, - # res: addr res, - # signal: signal) - # fut = newFuture[void]("signalMonitor") - # threadArgs = (addr ctx, addr fut) - # thread: Thread[type threadArgs] - - # proc threadTask(args: type threadArgs) = - # var (ctx, fut) = args - # proc asyncTask() {.async.} = - # let - # monitor = signalMonitor(ctx, fut[]) - - # await monitor - - # waitFor asyncTask() - - # createThread(thread, threadTask, threadArgs) - # ctx.cancelled = false - # check: ctx.signal.fireSync.tryGet - - # joinThreads(thread) - - # check: not fut.cancelled - # check: ctx.signal.close().isOk - # fut = nil + + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) + + queryTests(ds, false) + +for i in 1..N: + suite "Test ThreadDatastore cancelations": + var + sqlStore: SQLiteBackend[KeyId,DataBuffer] + sds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]] + + privateAccess(ThreadDatastore) # expose private fields + privateAccess(TaskCtx) # expose private fields + + setupAll: + sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() + sds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() + + teardown: + GC_fullCollect() # run full collect after each test + + test "Should monitor signal and cancel": + var + signal = ThreadSignalPtr.new().tryGet() + + proc cancelTestTask(ctx: TaskCtx[bool]) {.gcsafe.} = + executeTask(ctx): + (?!bool).ok(true) + + let ctx = newTaskCtx(bool, signal=signal) + ctx[].cancelled = true + dispatchTask(sds, signal): + sds.tp.spawn cancelTestTask(ctx) + + check: + ctx[].res.isErr == true + ctx[].cancelled == true + ctx[].running == false + + test "Should cancel future": + + var + signal = ThreadSignalPtr.new().tryGet() + ms {.global.}: MutexSignal + flag {.global.}: Atomic[bool] + futFreed {.global.}: Atomic[bool] + ready {.global.}: Atomic[bool] + + ms.init() + + type + FutTestObj = object + val: int + TestValue = object + ThreadTestInt = (TestValue, ) + + proc `=destroy`(obj: var TestValue) = + # echo "destroy TestObj!" + flag.store(true) + + proc `=destroy`(obj: var FutTestObj) = + # echo "destroy FutTestObj!" + futFreed.store(true) + + proc wait(flag: var Atomic[bool], name = "task") = + # echo "wait for " & name & " to be ready..." + # defer: echo "" + for i in 1..100: + # stdout.write(".") + if flag.load() == true: + return + os.sleep(10) + raise newException(Defect, "timeout") + + proc errorTestTask(ctx: TaskCtx[ThreadTestInt]) {.gcsafe, nimcall.} = + executeTask(ctx): + # echo "task:exec" + discard ctx[].signal.fireSync() + ready.store(true) + ms.wait() + echo "task context memory: ", ctx[] + (?!ThreadTestInt).ok(default(ThreadTestInt)) + + proc runTestTask() {.async.} = + let obj = FutTestObj(val: 42) + await sleepAsync(1.milliseconds) + try: + let ctx = newTaskCtx(ThreadTestInt, signal=signal) + dispatchTask(sds, signal): + sds.tp.spawn errorTestTask(ctx) + ready.wait() + # echo "raise error" + raise newException(ValueError, "fake error") + finally: + # echo "fut FutTestObj: ", obj + assert obj.val == 42 # need to force future to keep ref here + try: + block: + await runTestTask() + except CatchableError as exc: + # echo "caught: ", $exc + discard + finally: + # echo "finish" + check ready.load() == true + GC_fullCollect() + futFreed.wait("futFreed") + echo "future freed it's mem!" + check futFreed.load() == true + + ms.fire() + flag.wait("flag") + check flag.load() == true diff --git a/tests/datastore/testtieredds.nim b/tests/datastore/testtieredds.nim index 4ea76a74..4186cd51 100644 --- a/tests/datastore/testtieredds.nim +++ b/tests/datastore/testtieredds.nim @@ -23,7 +23,8 @@ suite "Test Basic Tired Datastore": var ds1: SQLiteDatastore - ds2: FSDatastore + ds2: SQLiteDatastore + # ds2: FSDatastore tiredDs: TieredDatastore setupAll: @@ -32,8 +33,9 @@ suite "Test Basic Tired Datastore": createDir(rootAbs) ds1 = SQLiteDatastore.new(Memory).tryGet - ds2 = FSDatastore.new(rootAbs, depth = 5).tryGet - tiredDs = TieredDatastore.new(@[ds1, ds2]).tryGet + ds2 = SQLiteDatastore.new(Memory).tryGet + # ds2 = FSDatastore.new(rootAbs, depth = 5).tryGet + tiredDs = TieredDatastore.new(@[Datastore ds1, ds2]).tryGet teardownAll: removeDir(rootAbs) @@ -52,14 +54,16 @@ suite "TieredDatastore": var ds1: SQLiteDatastore - ds2: FSDatastore + ds2: SQLiteDatastore + # ds2: FSDatastore setup: removeDir(rootAbs) require(not dirExists(rootAbs)) createDir(rootAbs) ds1 = SQLiteDatastore.new(Memory).get - ds2 = FSDatastore.new(rootAbs, depth = 5).get + ds2 = SQLiteDatastore.new(Memory).get + # ds2 = FSDatastore.new(rootAbs, depth = 5).get teardown: if not ds1.isNil: @@ -76,17 +80,17 @@ suite "TieredDatastore": TieredDatastore.new([]).isErr TieredDatastore.new(@[]).isErr TieredDatastore.new(ds1, ds2).isOk - TieredDatastore.new([ds1, ds2]).isOk - TieredDatastore.new(@[ds1, ds2]).isOk + TieredDatastore.new([Datastore ds1, ds2]).isOk + TieredDatastore.new(@[Datastore ds1, ds2]).isOk test "accessors": let - stores = @[ds1, ds2] + stores = @[Datastore ds1, ds2] check: TieredDatastore.new(ds1, ds2).tryGet.stores == stores - TieredDatastore.new([ds1, ds2]).tryGet.stores == stores - TieredDatastore.new(@[ds1, ds2]).tryGet.stores == stores + TieredDatastore.new([Datastore ds1, ds2]).tryGet.stores == stores + TieredDatastore.new(@[Datastore ds1, ds2]).tryGet.stores == stores test "put": let diff --git a/tests/testall.nim b/tests/testall.nim index 83bdf381..47e2960a 100644 --- a/tests/testall.nim +++ b/tests/testall.nim @@ -1,8 +1,8 @@ import ./datastore/testkey, ./datastore/testdatastore, - ./datastore/testfsds, ./datastore/testsql, + ./datastore/testfsds, ./datastore/testtieredds, ./datastore/testmountedds, ./datastore/testdatabuffer,