diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index a9b89081..0e2e106d 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -13,7 +13,7 @@ jobs: fail-fast: false matrix: cache_nonce: [ 1 ] - nim_version: [ 1.2.18, 1.6.6 ] + nim_version: [ 1.6.14 ] platform: - { icon: 🐧, diff --git a/.gitignore b/.gitignore index 1a87a994..de000360 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,6 @@ coverage datastore.nims nimcache TODO +nim.cfg +nimble.develop +nimble.paths diff --git a/config.nims b/config.nims index 052a576c..ed3a5315 100644 --- a/config.nims +++ b/config.nims @@ -10,3 +10,7 @@ when (NimMajor, NimMinor) == (1, 2): when (NimMajor, NimMinor) > (1, 2): switch("hint", "XCannotRaiseY:off") +# begin Nimble config (version 2) +when withDir(thisDir(), system.fileExists("nimble.paths")): + include "nimble.paths" +# end Nimble config diff --git a/datastore.nim b/datastore.nim index 6d43a209..9e42c472 100644 --- a/datastore.nim +++ b/datastore.nim @@ -3,5 +3,6 @@ import ./datastore/fsds import ./datastore/sql import ./datastore/mountedds import ./datastore/tieredds +import ./datastore/threads/threadproxyds -export datastore, fsds, mountedds, tieredds, sql +export datastore, fsds, mountedds, tieredds, sql, threadproxyds diff --git a/datastore.nimble b/datastore.nimble index 6252c6d1..3b5b28ef 100644 --- a/datastore.nimble +++ b/datastore.nimble @@ -6,14 +6,18 @@ author = "Status Research & Development GmbH" description = "Simple, unified API for multiple data stores" license = "Apache License 2.0 or MIT" -requires "nim >= 1.2.0", +requires "nim >= 1.6.14", "asynctest >= 0.3.1 & < 0.4.0", - "chronos", + "chronos#0277b65be2c7a365ac13df002fba6e172be55537", "questionable >= 0.10.3 & < 0.11.0", "sqlite3_abi", "stew", "unittest2", - "upraises >= 0.1.0 & < 0.2.0" + "pretty", + "threading", + "taskpools", + "upraises >= 0.1.0 & < 0.2.0", + "chronicles" task coverage, "generates code coverage report": var (output, exitCode) = gorgeEx("which lcov") diff --git a/datastore/datastore.nim b/datastore/datastore.nim index d7fa45fd..98b95208 100644 --- a/datastore/datastore.nim +++ b/datastore/datastore.nim @@ -13,32 +13,32 @@ push: {.upraises: [].} type BatchEntry* = tuple[key: Key, data: seq[byte]] -method has*(self: Datastore, key: Key): Future[?!bool] {.base, locks: "unknown".} = +method has*(self: Datastore, key: Key): Future[?!bool] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method delete*(self: Datastore, key: Key): Future[?!void] {.base, locks: "unknown".} = +method delete*(self: Datastore, key: Key): Future[?!void] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method delete*(self: Datastore, keys: seq[Key]): Future[?!void] {.base, locks: "unknown".} = +method delete*(self: Datastore, keys: seq[Key]): Future[?!void] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method get*(self: Datastore, key: Key): Future[?!seq[byte]] {.base, locks: "unknown".} = +method get*(self: Datastore, key: Key): Future[?!seq[byte]] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method put*(self: Datastore, key: Key, data: seq[byte]): Future[?!void] {.base, locks: "unknown".} = +method put*(self: Datastore, key: Key, data: seq[byte]): Future[?!void] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, locks: "unknown".} = +method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") -method close*(self: Datastore): Future[?!void] {.base, async, locks: "unknown".} = +method close*(self: Datastore): Future[?!void] {.base, locks: "unknown", raises: [].} = raiseAssert("Not implemented!") method query*( self: Datastore, - query: Query): Future[?!QueryIter] {.base, gcsafe.} = + query: Query): Future[?!QueryIter] {.base, gcsafe, raises: [].} = raiseAssert("Not implemented!") -proc contains*(self: Datastore, key: Key): Future[bool] {.async.} = +proc contains*(self: Datastore, key: Key): Future[bool] {.async, raises: [].} = return (await self.has(key)) |? false diff --git a/datastore/fsds.nim b/datastore/fsds.nim index 163a52ab..65f5cdd4 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -155,6 +155,9 @@ method put*( return success() proc dirWalker(path: string): iterator: string {.gcsafe.} = + var localPath {.threadvar.}: string + + localPath = path return iterator(): string = try: for p in path.walkDirRec(yieldFilter = {pcFile}, relative = true): @@ -188,10 +191,23 @@ method query*( var iter = QueryIter.new() + var lock = newAsyncLock() # serialize querying under threads proc next(): Future[?!QueryResponse] {.async.} = + defer: + if lock.locked: + lock.release() + + if lock.locked: + return failure (ref DatastoreError)(msg: "Should always await query features") + let path = walker() + if iter.finished: + return failure "iterator is finished" + + await lock.acquire() + if finished(walker): iter.finished = true return success (Key.none, EmptyBytes) diff --git a/datastore/query.nim b/datastore/query.nim index 1c51a6e2..fe6b18f0 100644 --- a/datastore/query.nim +++ b/datastore/query.nim @@ -1,4 +1,5 @@ import pkg/upraises +import std/algorithm import pkg/chronos import pkg/questionable import pkg/questionable/results @@ -6,11 +7,10 @@ import pkg/questionable/results import ./key import ./types -type - SortOrder* {.pure.} = enum - Assending, - Descending +export types +export options, SortOrder +type Query* = object key*: Key # Key to be queried value*: bool # Flag to indicate if data should be returned @@ -19,11 +19,10 @@ type sort*: SortOrder # Sort order - not available in all backends QueryResponse* = tuple[key: ?Key, data: seq[byte]] - QueryEndedError* = object of DatastoreError - GetNext* = proc(): Future[?!QueryResponse] {.upraises: [], gcsafe, closure.} + GetNext* = proc(): Future[?!QueryResponse] {.upraises: [], gcsafe.} IterDispose* = proc(): Future[?!void] {.upraises: [], gcsafe.} - QueryIter* = ref object + QueryIter* {.acyclic.} = ref object finished*: bool next*: GetNext dispose*: IterDispose @@ -42,7 +41,7 @@ proc init*( T: type Query, key: Key, value = true, - sort = SortOrder.Assending, + sort = SortOrder.Ascending, offset = 0, limit = -1): T = diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index aa632747..e5fad22d 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -151,9 +151,19 @@ method query*( if not (v == SQLITE_OK): return failure newException(DatastoreError, $sqlite3_errstr(v)) + let lock = newAsyncLock() proc next(): Future[?!QueryResponse] {.async.} = + defer: + if lock.locked: + lock.release() + + if lock.locked: + return failure (ref DatastoreError)(msg: "Should always await query features") + if iter.finished: - return failure(newException(QueryEndedError, "Calling next on a finished query!")) + return failure((ref QueryEndedError)(msg: "Calling next on a finished query!")) + + await lock.acquire() let v = sqlite3_step(s) diff --git a/datastore/sql/sqliteutils.nim b/datastore/sql/sqliteutils.nim index bffb6e8c..4d2254ab 100644 --- a/datastore/sql/sqliteutils.nim +++ b/datastore/sql/sqliteutils.nim @@ -50,7 +50,8 @@ proc bindParam( # to the return from sqlite3_bind_*(). The object and pointer to it # must remain valid until then. SQLite will then manage the lifetime of # its private copy." - sqlite3_bind_blob(s, n.cint, unsafeAddr val[0], val.len.cint, + var val = val + sqlite3_bind_blob(s, n.cint, addr val[0], val.len.cint, SQLITE_TRANSIENT) else: sqlite3_bind_null(s, n.cint) diff --git a/datastore/threads/asyncsemaphore.nim b/datastore/threads/asyncsemaphore.nim new file mode 100644 index 00000000..c9a76276 --- /dev/null +++ b/datastore/threads/asyncsemaphore.nim @@ -0,0 +1,96 @@ +# Nim-Libp2p +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +{.push raises: [].} + +import sequtils +import chronos, chronicles + +# TODO: this should probably go in chronos + +logScope: + topics = "datastore semaphore" + +type + AsyncSemaphore* = ref object of RootObj + size*: int + count: int + queue: seq[Future[void]] + +func new*(_: type AsyncSemaphore, size: int): AsyncSemaphore = + AsyncSemaphore(size: size, count: size) + +proc `count`*(s: AsyncSemaphore): int = s.count + +proc tryAcquire*(s: AsyncSemaphore): bool = + ## Attempts to acquire a resource, if successful + ## returns true, otherwise false + ## + + if s.count > 0 and s.queue.len == 0: + s.count.dec + trace "Acquired slot", available = s.count, queue = s.queue.len + return true + +proc acquire*(s: AsyncSemaphore): Future[void] = + ## Acquire a resource and decrement the resource + ## counter. If no more resources are available, + ## the returned future will not complete until + ## the resource count goes above 0. + ## + + let fut = newFuture[void]("AsyncSemaphore.acquire") + if s.tryAcquire(): + fut.complete() + return fut + + proc cancellation(udata: pointer) {.gcsafe.} = + fut.cancelCallback = nil + if not fut.finished: + s.queue.keepItIf( it != fut ) + + fut.cancelCallback = cancellation + + s.queue.add(fut) + + trace "Queued slot", available = s.count, queue = s.queue.len + return fut + +proc forceAcquire*(s: AsyncSemaphore) = + ## ForceAcquire will always succeed, + ## creating a temporary slot if required. + ## This temporary slot will stay usable until + ## there is less `acquire`s than `release`s + s.count.dec + +proc release*(s: AsyncSemaphore) = + ## Release a resource from the semaphore, + ## by picking the first future from the queue + ## and completing it and incrementing the + ## internal resource count + ## + + doAssert(s.count <= s.size) + + if s.count < s.size: + trace "Releasing slot", available = s.count, + queue = s.queue.len + + s.count.inc + while s.queue.len > 0: + var fut = s.queue[0] + s.queue.delete(0) + if not fut.finished(): + s.count.dec + fut.complete() + break + + trace "Released slot", available = s.count, + queue = s.queue.len + return diff --git a/datastore/threads/databuffer.nim b/datastore/threads/databuffer.nim new file mode 100644 index 00000000..66bb7e66 --- /dev/null +++ b/datastore/threads/databuffer.nim @@ -0,0 +1,92 @@ +import threading/smartptrs +import std/hashes +import pkg/stew/ptrops + +export hashes + +type + DataBufferHolder* = object + buf: ptr UncheckedArray[byte] + size: int + + DataBuffer* = SharedPtr[DataBufferHolder] ##\ + ## A fixed length data buffer using a SharedPtr. + ## It is thread safe even with `refc` since + ## it doesn't use string or seq types internally. + ## + +proc `=destroy`*(x: var DataBufferHolder) = + ## copy pointer implementation + ## + + if x.buf != nil: + # echo "buffer: FREE: ", repr x.buf.pointer + deallocShared(x.buf) + +proc len*(a: DataBuffer): int = a[].size + +proc isNil*(a: DataBuffer): bool = smartptrs.isNil(a) + +proc hash*(a: DataBuffer): Hash = + a[].buf.toOpenArray(0, a[].size-1).hash() + +proc `==`*(a, b: DataBuffer): bool = + if a.isNil and b.isNil: return true + elif a.isNil or b.isNil: return false + elif a[].size != b[].size: return false + elif a[].buf == b[].buf: return true + else: a.hash() == b.hash() + +proc new*(tp: type DataBuffer, size: int = 0): DataBuffer = + ## allocate new buffer with given size + ## + + newSharedPtr(DataBufferHolder( + buf: cast[typeof(result[].buf)](allocShared0(size)), + size: size, + )) + +proc new*[T: byte | char](tp: type DataBuffer, data: openArray[T]): DataBuffer = + ## allocate new buffer and copies indata from openArray + ## + result = DataBuffer.new(data.len) + if data.len() > 0: + # TODO: we might want to copy data, otherwise the GC might + # release it on stack-unwind + copyMem(result[].buf, baseAddr data, data.len) + +converter toSeq*(self: DataBuffer): seq[byte] = + ## convert buffer to a seq type using copy and either a byte or char + ## + + result = newSeq[byte](self.len) + if self.len() > 0: + copyMem(addr result[0], addr self[].buf[0], self.len) + +proc `@`*(self: DataBuffer): seq[byte] = + ## Convert a buffer to a seq type using copy and + ## either a byte or char + ## + + self.toSeq() + +converter toString*(data: DataBuffer): string = + ## convert buffer to string type using copy + ## + + if data.isNil: return "" + result = newString(data.len()) + if data.len() > 0: + copyMem(addr result[0], addr data[].buf[0], data.len) + +proc `$`*(data: DataBuffer): string = + ## convert buffer to string type using copy + ## + + data.toString() + +converter toBuffer*(err: ref CatchableError): DataBuffer = + ## convert exception to an object with StringBuffer + ## + + return DataBuffer.new(err.msg) diff --git a/datastore/threads/semaphore.nim b/datastore/threads/semaphore.nim new file mode 100644 index 00000000..5087e3f9 --- /dev/null +++ b/datastore/threads/semaphore.nim @@ -0,0 +1,58 @@ +import std/atomics +import std/locks + +type + Semaphore* = object + count: int + size: int + lock {.align: 64.}: Lock + cond: Cond + +func `=`*(dst: var Semaphore, src: Semaphore) {.error: "A semaphore cannot be copied".} +func `=sink`*(dst: var Semaphore, src: Semaphore) {.error: "An semaphore cannot be moved".} + +proc init*(_: type Semaphore, count: uint): Semaphore = + var + lock: Lock + cond: Cond + + lock.initLock() + cond.initCond() + + Semaphore(count: count.int, size: count.int, lock: lock, cond: cond) + +proc `=destroy`*(self: var Semaphore) = + self.lock.deinitLock() + self.cond.deinitCond() + +proc count*(self: var Semaphore): int = + self.count + +proc size*(self: var Semaphore): int = + self.size + +proc acquire*(self: var Semaphore) {.inline.} = + self.lock.acquire() + while self.count <= 0: + self.cond.wait(self.lock) + + self.count -= 1 + self.lock.release() + +proc release*(self: var Semaphore) {.inline.} = + self.lock.acquire() + if self.count <= 0: + self.count += 1 + self.cond.signal() + + doAssert not (self.count > self.size), + "Semaphore count is greather than size: " & $self.size & " count is: " & $self.count + + self.lock.release() + +template withSemaphore*(self: var Semaphore, body: untyped) = + self.acquire() + try: + body + finally: + self.release() diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim new file mode 100644 index 00000000..31a65baf --- /dev/null +++ b/datastore/threads/threadproxyds.nim @@ -0,0 +1,455 @@ + +when not compileOption("threads"): + {.error: "This module requires --threads:on compilation flag".} + +import pkg/upraises + +push: {.upraises: [].} + +import std/atomics +import std/strutils +import std/tables +import std/sequtils + +import pkg/chronos +import pkg/chronos/threadsync +import pkg/questionable +import pkg/questionable/results +import pkg/stew/ptrops +import pkg/taskpools +import pkg/stew/byteutils +import pkg/chronicles + +import ../key +import ../query +import ../datastore + +import ./asyncsemaphore +import ./databuffer +import ./threadresult + +export threadresult + +logScope: + topics = "datastore threadproxyds" + +type + TaskCtx[T: ThreadTypes] = object + ds: Datastore + res: ptr ThreadResult[T] + cancelled: bool + semaphore: AsyncSemaphore + signal: ThreadSignalPtr + + ThreadDatastore* = ref object of Datastore + tp: Taskpool + ds: Datastore + semaphore: AsyncSemaphore # semaphore is used for backpressure \ + # to avoid exhausting file descriptors + withLocks: bool + tasks: Table[Key, Future[void]] + queryLock: AsyncLock # global query lock, this is only really \ + # needed for the fsds, but it is expensive! + +template withLocks( + self: ThreadDatastore, + ctx: TaskCtx, + key: ?Key = Key.none, + fut: Future[void], + body: untyped): untyped = + try: + if key.isSome and key.get in self.tasks: + if self.withLocks: + await self.tasks[key.get] + self.tasks[key.get] = fut # we alway want to store the future, but only await if we're using locks + + if self.withLocks: + await self.queryLock.acquire() # only lock if it's required (fsds) + + block: + body + finally: + if self.withLocks: + if key.isSome and key.get in self.tasks: + self.tasks.del(key.get) + if self.queryLock.locked: + self.queryLock.release() + +# TODO: needs rework, we can't use `result` with async +template dispatchTask[T]( + self: ThreadDatastore, + ctx: TaskCtx[T], + key: ?Key = Key.none, + runTask: proc): auto = + try: + await self.semaphore.acquire() + let signal = ThreadSignalPtr.new() + if signal.isErr: + failure(signal.error) + else: + ctx.signal = signal.get() + let + fut = wait(ctx.signal) + + withLocks(self, ctx, key, fut): + runTask() + await fut + if ctx.res[].isErr: + failure ctx.res[].error + else: + when result.T isnot void: + success result.T(ctx.res[].get) + else: + success() + except CancelledError as exc: + trace "Cancelling thread future!", exc = exc.msg + ctx.cancelled = true + await ctx.signal.fire() + raise exc + finally: + discard ctx.signal.close() + self.semaphore.release() + +proc signalMonitor[T](ctx: ptr TaskCtx, fut: Future[T]) {.async.} = + ## Monitor the signal and cancel the future if + ## the cancellation flag is set + ## + + if ctx.isNil: + trace "ctx is nil" + return + + try: + await ctx[].signal.wait() + trace "Received signal" + + if ctx[].cancelled: # there could eventually be other flags + trace "Cancelling future" + if not fut.finished: + await fut.cancelAndWait() # cancel the `has` future + + discard ctx[].signal.fireSync() + except CatchableError as exc: + trace "Exception in thread signal monitor", exc = exc.msg + ctx[].res[].err(exc) + discard ctx[].signal.fireSync() + +proc asyncHasTask( + ctx: ptr TaskCtx[bool], + key: ptr Key) {.async.} = + if ctx.isNil: + trace "ctx is nil" + return + + let + key = key[] + fut = ctx[].ds.has(key) + + asyncSpawn signalMonitor(ctx, fut) + without ret =? (await fut).catch and res =? ret, error: + ctx[].res[].err(error) + return + + ctx[].res[].ok(res) + +proc hasTask(ctx: ptr TaskCtx, key: ptr Key) = + defer: + if not ctx.isNil: + discard ctx[].signal.fireSync() + + try: + waitFor asyncHasTask(ctx, key) + except CatchableError as exc: + trace "Unexpected exception thrown in asyncHasTask", exc = exc.msg + raiseAssert exc.msg + +method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} = + var + key = key + res = ThreadResult[bool]() + ctx = TaskCtx[bool]( + ds: self.ds, + res: addr res) + + proc runTask() = + self.tp.spawn hasTask(addr ctx, addr key) + + return self.dispatchTask(ctx, key.some, runTask) + +proc asyncDelTask(ctx: ptr TaskCtx[void], key: ptr Key) {.async.} = + if ctx.isNil: + trace "ctx is nil" + return + + let + key = key[] + fut = ctx[].ds.delete(key) + + asyncSpawn signalMonitor(ctx, fut) + without res =? (await fut).catch, error: + trace "Error in asyncDelTask", error = error.msg + ctx[].res[].err(error) + return + + ctx[].res[].ok() + return + +proc delTask(ctx: ptr TaskCtx, key: ptr Key) = + defer: + if not ctx.isNil: + discard ctx[].signal.fireSync() + + try: + waitFor asyncDelTask(ctx, key) + except CatchableError as exc: + trace "Unexpected exception thrown in asyncDelTask", exc = exc.msg + raiseAssert exc.msg + +method delete*( + self: ThreadDatastore, + key: Key): Future[?!void] {.async.} = + var + key = key + res = ThreadResult[void]() + ctx = TaskCtx[void]( + ds: self.ds, + res: addr res) + + proc runTask() = + self.tp.spawn delTask(addr ctx, addr key) + + return self.dispatchTask(ctx, key.some, runTask) + +method delete*( + self: ThreadDatastore, + keys: seq[Key]): Future[?!void] {.async.} = + + for key in keys: + if err =? (await self.delete(key)).errorOption: + return failure err + + return success() + +proc asyncPutTask( + ctx: ptr TaskCtx[void], + key: ptr Key, + data: ptr UncheckedArray[byte], + len: int) {.async.} = + + if ctx.isNil: + trace "ctx is nil" + return + + let + key = key[] + data = @(data.toOpenArray(0, len - 1)) + fut = ctx[].ds.put(key, data) + + asyncSpawn signalMonitor(ctx, fut) + without res =? (await fut).catch, error: + trace "Error in asyncPutTask", error = error.msg + ctx[].res[].err(error) + return + + ctx[].res[].ok() + +proc putTask( + ctx: ptr TaskCtx, + key: ptr Key, + data: ptr UncheckedArray[byte], + len: int) = + ## run put in a thread task + ## + + defer: + if not ctx.isNil: + discard ctx[].signal.fireSync() + + try: + waitFor asyncPutTask(ctx, key, data, len) + except CatchableError as exc: + trace "Unexpected exception thrown in asyncPutTask", exc = exc.msg + raiseAssert exc.msg + +method put*( + self: ThreadDatastore, + key: Key, + data: seq[byte]): Future[?!void] {.async.} = + var + key = key + data = data + res = ThreadResult[void]() + ctx = TaskCtx[void]( + ds: self.ds, + res: addr res) + + proc runTask() = + self.tp.spawn putTask( + addr ctx, + addr key, + makeUncheckedArray(addr data[0]), + data.len) + + return self.dispatchTask(ctx, key.some, runTask) + +method put*( + self: ThreadDatastore, + batch: seq[BatchEntry]): Future[?!void] {.async.} = + + for entry in batch: + if err =? (await self.put(entry.key, entry.data)).errorOption: + return failure err + + return success() + +proc asyncGetTask( + ctx: ptr TaskCtx[DataBuffer], + key: ptr Key) {.async.} = + if ctx.isNil: + trace "ctx is nil" + return + + let + key = key[] + fut = ctx[].ds.get(key) + + asyncSpawn signalMonitor(ctx, fut) + without res =? (await fut).catch and data =? res, error: + trace "Error in asyncGetTask", error = error.msg + ctx[].res[].err(error) + return + + trace "Got data in get" + ctx[].res[].ok(DataBuffer.new(data)) + +proc getTask( + ctx: ptr TaskCtx, + key: ptr Key) = + ## Run get in a thread task + ## + + defer: + if not ctx.isNil: + discard ctx[].signal.fireSync() + + try: + waitFor asyncGetTask(ctx, key) + except CatchableError as exc: + trace "Unexpected exception thrown in asyncGetTask", exc = exc.msg + raiseAssert exc.msg + +method get*( + self: ThreadDatastore, + key: Key): Future[?!seq[byte]] {.async.} = + var + key = key + res = ThreadResult[DataBuffer]() + ctx = TaskCtx[DataBuffer]( + ds: self.ds, + res: addr res) + + proc runTask() = + self.tp.spawn getTask(addr ctx, addr key) + + return self.dispatchTask(ctx, key.some, runTask) + +method close*(self: ThreadDatastore): Future[?!void] {.async.} = + for fut in self.tasks.values.toSeq: + await fut.cancelAndWait() # probably want to store the signal, instead of the future (or both?) + + await self.ds.close() + +proc asyncQueryTask( + ctx: ptr TaskCtx, + iter: ptr QueryIter) {.async.} = + + if ctx.isNil or iter.isNil: + trace "ctx is nil" + return + + let + fut = iter[].next() + + asyncSpawn signalMonitor(ctx, fut) + without ret =? (await fut).catch and res =? ret, error: + trace "Error in asyncQueryTask", error = error.msg + ctx[].res[].err(error) + return + + if res.key.isNone: + ctx[].res[].ok((default(DataBuffer), default(DataBuffer))) + return + + var + keyBuf = DataBuffer.new($(res.key.get())) + dataBuf = DataBuffer.new(res.data) + + trace "Got query result", key = $res.key.get(), data = res.data + ctx[].res[].ok((keyBuf, dataBuf)) + +proc queryTask( + ctx: ptr TaskCtx, + iter: ptr QueryIter) = + + defer: + if not ctx.isNil: + discard ctx[].signal.fireSync() + + try: + waitFor asyncQueryTask(ctx, iter) + except CatchableError as exc: + trace "Unexpected exception thrown in asyncQueryTask", exc = exc.msg + raiseAssert exc.msg + +method query*( + self: ThreadDatastore, + query: Query): Future[?!QueryIter] {.async.} = + without var childIter =? await self.ds.query(query), error: + return failure error + + var + iter = QueryIter.new() + lock = newAsyncLock() # serialize querying under threads + + proc next(): Future[?!QueryResponse] {.async.} = + defer: + if lock.locked: + lock.release() + + trace "About to query" + if lock.locked: + return failure (ref DatastoreError)(msg: "Should always await query features") + + await lock.acquire() + + if iter.finished == true: + return failure (ref QueryEndedError)(msg: "Calling next on a finished query!") + + iter.finished = childIter.finished + var + res = ThreadResult[ThreadQueryRes]() + ctx = TaskCtx[ThreadQueryRes]( + ds: self.ds, + res: addr res) + + proc runTask() = + self.tp.spawn queryTask(addr ctx, addr childIter) + + return self.dispatchTask(ctx, Key.none, runTask) + + iter.next = next + return success iter + +proc new*( + self: type ThreadDatastore, + ds: Datastore, + withLocks = static false, + tp: Taskpool): ?!ThreadDatastore = + doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads" + + success ThreadDatastore( + tp: tp, + ds: ds, + withLocks: withLocks, + queryLock: newAsyncLock(), + semaphore: AsyncSemaphore.new(tp.numThreads - 1)) diff --git a/datastore/threads/threadresult.nim b/datastore/threads/threadresult.nim new file mode 100644 index 00000000..fcb7ffdd --- /dev/null +++ b/datastore/threads/threadresult.nim @@ -0,0 +1,43 @@ +import std/atomics +import std/options + +import pkg/questionable/results +import pkg/results + +import ../types +import ../query +import ../key + +import ./databuffer + +type + ErrorEnum* {.pure.} = enum + DatastoreErr, + DatastoreKeyNotFoundErr, + QueryEndedErr, + CatchableErr + + ThreadTypes* = void | bool | SomeInteger | DataBuffer | tuple | Atomic + ThreadResErr* = (ErrorEnum, DataBuffer) + ThreadQueryRes* = (DataBuffer, DataBuffer) + ThreadResult*[T: ThreadTypes] = Result[T, ThreadResErr] + +converter toThreadErr*(e: ref CatchableError): ThreadResErr {.inline, raises: [].} = + if e of DatastoreKeyNotFound: (ErrorEnum.DatastoreKeyNotFoundErr, DataBuffer.new(e.msg)) + elif e of QueryEndedError: (ErrorEnum.QueryEndedErr, DataBuffer.new(e.msg)) + elif e of DatastoreError: (DatastoreErr, DataBuffer.new(e.msg)) + elif e of CatchableError: (CatchableErr, DataBuffer.new(e.msg)) + else: raise (ref Defect)(msg: e.msg) + +converter toExc*(e: ThreadResErr): ref CatchableError = + case e[0]: + of ErrorEnum.DatastoreKeyNotFoundErr: (ref DatastoreKeyNotFound)(msg: $e[1]) + of ErrorEnum.QueryEndedErr: (ref QueryEndedError)(msg: $e[1]) + of ErrorEnum.DatastoreErr: (ref DatastoreError)(msg: $e[1]) + of ErrorEnum.CatchableErr: (ref CatchableError)(msg: $e[1]) + +converter toQueryResponse*(r: ThreadQueryRes): QueryResponse = + if not r[0].isNil and r[0].len > 0 and key =? Key.init($r[0]): + (key.some, @(r[1])) + else: + (Key.none, EmptyBytes) diff --git a/datastore/tieredds.nim b/datastore/tieredds.nim index 4eca23be..8448f9bf 100644 --- a/datastore/tieredds.nim +++ b/datastore/tieredds.nim @@ -74,13 +74,16 @@ method get*( bytes: seq[byte] for store in self.stores: - without bytes =? (await store.get(key)): - continue + without bytes =? (await store.get(key)), err: + if err of DatastoreKeyNotFound: + continue + else: + return failure(err) if bytes.len <= 0: continue - # put found data into stores logically in front of the current store + # put found data into stores in front of the current store for s in self.stores: if s == store: break if( @@ -88,7 +91,10 @@ method get*( res.isErr): return failure res.error - return success bytes + if bytes.len > 0: + return success bytes + + return failure (ref DatastoreKeyNotFound)(msg: "Key not found") method put*( self: TieredDatastore, diff --git a/datastore/types.nim b/datastore/types.nim index b019cdb0..6000faf1 100644 --- a/datastore/types.nim +++ b/datastore/types.nim @@ -6,5 +6,6 @@ const type DatastoreError* = object of CatchableError DatastoreKeyNotFound* = object of DatastoreError + QueryEndedError* = object of DatastoreError - Datastore* = ref object of RootObj + Datastore* {.acyclic.} = ref object of RootObj diff --git a/nimble.lock b/nimble.lock new file mode 100644 index 00000000..3a208e32 --- /dev/null +++ b/nimble.lock @@ -0,0 +1,214 @@ +{ + "version": 2, + "packages": { + "upraises": { + "version": "0.1.0", + "vcsRevision": "d9f268db1021959fe0f2c7a5e49fba741f9932a0", + "url": "https://github.com/markspanbroek/upraises", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "176234f808b44a0be763df706ed634d6e8df17bb" + } + }, + "results": { + "version": "0.4.0", + "vcsRevision": "f3c666a272c69d70cb41e7245e7f6844797303ad", + "url": "https://github.com/arnetheduck/nim-results", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "51e08ca9524db98dc909fb39192272cc2b5451c7" + } + }, + "unittest2": { + "version": "0.1.0", + "vcsRevision": "2300fa9924a76e6c96bc4ea79d043e3a0f27120c", + "url": "https://github.com/status-im/nim-unittest2", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "914cf9a380c83b2ae40697981e5d94903505e87e" + } + }, + "stew": { + "version": "0.1.0", + "vcsRevision": "3159137d9a3110edb4024145ce0ba778975de40e", + "url": "https://github.com/status-im/nim-stew", + "downloadMethod": "git", + "dependencies": [ + "results", + "unittest2" + ], + "checksums": { + "sha1": "4ab494e272e997011853faddebe9e55183613776" + } + }, + "faststreams": { + "version": "0.3.0", + "vcsRevision": "720fc5e5c8e428d9d0af618e1e27c44b42350309", + "url": "https://github.com/status-im/nim-faststreams", + "downloadMethod": "git", + "dependencies": [ + "stew", + "unittest2" + ], + "checksums": { + "sha1": "ab178ba25970b95d953434b5d86b4d60396ccb64" + } + }, + "serialization": { + "version": "0.2.0", + "vcsRevision": "4bdbc29e54fe54049950e352bb969aab97173b35", + "url": "https://github.com/status-im/nim-serialization", + "downloadMethod": "git", + "dependencies": [ + "faststreams", + "unittest2", + "stew" + ], + "checksums": { + "sha1": "c8c99a387aae488e7008aded909ebfe662e74450" + } + }, + "sqlite3_abi": { + "version": "3.40.1.1", + "vcsRevision": "362e1bd9f689ad9f5380d9d27f0705b3d4dfc7d3", + "url": "https://github.com/arnetheduck/nim-sqlite3-abi", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "8e91db8156a82383d9c48f53b33e48f4e93077b1" + } + }, + "testutils": { + "version": "0.5.0", + "vcsRevision": "dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34", + "url": "https://github.com/status-im/nim-testutils", + "downloadMethod": "git", + "dependencies": [ + "unittest2" + ], + "checksums": { + "sha1": "756d0757c4dd06a068f9d38c7f238576ba5ee897" + } + }, + "json_serialization": { + "version": "0.1.5", + "vcsRevision": "85b7ea093cb85ee4f433a617b97571bd709d30df", + "url": "https://github.com/status-im/nim-json-serialization", + "downloadMethod": "git", + "dependencies": [ + "serialization", + "stew" + ], + "checksums": { + "sha1": "c6b30565292acf199b8be1c62114726e354af59e" + } + }, + "chronicles": { + "version": "0.10.3", + "vcsRevision": "32ac8679680ea699f7dbc046e8e0131cac97d41a", + "url": "https://github.com/status-im/nim-chronicles", + "downloadMethod": "git", + "dependencies": [ + "testutils", + "json_serialization" + ], + "checksums": { + "sha1": "79f09526d4d9b9196dd2f6a75310d71a890c4f88" + } + }, + "asynctest": { + "version": "0.3.2", + "vcsRevision": "a236a5f0f3031573ac2cb082b63dbf6e170e06e7", + "url": "https://github.com/markspanbroek/asynctest", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "0ef50d086659835b0a23a4beb77cb11747695448" + } + }, + "httputils": { + "version": "0.3.0", + "vcsRevision": "87b7cbf032c90b9e6b446081f4a647e950362cec", + "url": "https://github.com/status-im/nim-http-utils", + "downloadMethod": "git", + "dependencies": [ + "stew", + "unittest2" + ], + "checksums": { + "sha1": "72a138157a9951f0986a9c4afc8c9a83ce3979a8" + } + }, + "taskpools": { + "version": "0.0.4", + "vcsRevision": "b3673c7a7a959ccacb393bd9b47e997bbd177f5a", + "url": "https://github.com/status-im/nim-taskpools", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "e43c5170d4e9ef1b27dd0956ffa0db46f992f9a6" + } + }, + "pretty": { + "version": "0.1.0", + "vcsRevision": "60850f8c595d4f0b9e55f3a99c7c471244a3182a", + "url": "https://github.com/treeform/pretty", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "b0a1cf8607d169050acd2bdca8d76fe850c23648" + } + }, + "bearssl": { + "version": "0.2.1", + "vcsRevision": "e4157639db180e52727712a47deaefcbbac6ec86", + "url": "https://github.com/status-im/nim-bearssl", + "downloadMethod": "git", + "dependencies": [ + "unittest2" + ], + "checksums": { + "sha1": "a5086fd5c0af2b852f34c0cc6e4cff93a98f97ec" + } + }, + "chronos": { + "version": "3.2.0", + "vcsRevision": "0277b65be2c7a365ac13df002fba6e172be55537", + "url": "https://github.com/status-im/nim-chronos", + "downloadMethod": "git", + "dependencies": [ + "stew", + "bearssl", + "httputils", + "unittest2" + ], + "checksums": { + "sha1": "78a41db7fb05b937196d4fa2f1e3fb4353b36a07" + } + }, + "questionable": { + "version": "0.10.10", + "vcsRevision": "b3cf35ac450fd42c9ea83dc084f5cba2efc55da3", + "url": "https://github.com/markspanbroek/questionable", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "8bb23a05d7f21619010471aa009e27d3fa73d93a" + } + }, + "threading": { + "version": "0.2.0", + "vcsRevision": "bcc284991ba928d1ed299a81a93b7113cc7de04f", + "url": "https://github.com/nim-lang/threading", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "08dfc46cedc3fc202e5de6ef80a655598331eb89" + } + } + }, + "tasks": {} +} diff --git a/tests/config.nims b/tests/config.nims new file mode 100644 index 00000000..0f840a15 --- /dev/null +++ b/tests/config.nims @@ -0,0 +1 @@ +--path:".." diff --git a/tests/datastore/dscommontests.nim b/tests/datastore/dscommontests.nim index 1e0353cb..24dd7d39 100644 --- a/tests/datastore/dscommontests.nim +++ b/tests/datastore/dscommontests.nim @@ -3,6 +3,7 @@ import std/options import pkg/asynctest import pkg/chronos import pkg/stew/results +import pkg/questionable/results import pkg/datastore @@ -56,3 +57,9 @@ proc basicStoreTests*( for k in batch: check: not (await ds.has(k)).tryGet + + test "handle missing key": + let key = Key.init("/missing/key").tryGet() + + expect(DatastoreKeyNotFound): + discard (await ds.get(key)).tryGet() # non existing key diff --git a/tests/datastore/querycommontests.nim b/tests/datastore/querycommontests.nim index 4f0a15d8..a0f93a7f 100644 --- a/tests/datastore/querycommontests.nim +++ b/tests/datastore/querycommontests.nim @@ -36,9 +36,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res check: res.len == 3 @@ -63,9 +74,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res check: res.len == 3 @@ -90,9 +112,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res check: res.len == 2 @@ -117,9 +150,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = iter = (await ds.query(q)).tryGet var - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res res.sort do (a, b: QueryResponse) -> int: cmp(a.key.get.id, b.key.get.id) @@ -152,9 +196,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res check: res.len == 10 @@ -175,9 +230,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res check: res.len == 10 @@ -198,9 +264,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res check: res.len == 5 @@ -237,9 +314,20 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = kvs = kvs.reversed let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + res = block: + var + res: seq[QueryResponse] + cnt = 0 + + for pair in iter: + let (key, val) = (await pair).tryGet + if key.isNone: + break + + res.add((key, val)) + cnt.inc + + res check: res.len == 100 diff --git a/tests/datastore/sql/testsqliteds.nim b/tests/datastore/sql/testsqliteds.nim index 49910eb6..c629eb0c 100644 --- a/tests/datastore/sql/testsqliteds.nim +++ b/tests/datastore/sql/testsqliteds.nim @@ -3,7 +3,7 @@ import std/os import std/sequtils from std/algorithm import sort, reversed -import pkg/asynctest/unittest2 +import pkg/asynctest import pkg/chronos import pkg/stew/results import pkg/stew/byteutils diff --git a/tests/datastore/testasyncsemaphore.nim b/tests/datastore/testasyncsemaphore.nim new file mode 100644 index 00000000..3c8bb84b --- /dev/null +++ b/tests/datastore/testasyncsemaphore.nim @@ -0,0 +1,206 @@ +{.used.} + +# Nim-Libp2p +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +import random + +import pkg/chronos +import pkg/asynctest + +import pkg/datastore/threads/asyncsemaphore + +randomize() + +suite "AsyncSemaphore": + test "Should acquire": + let sema = AsyncSemaphore.new(3) + + await sema.acquire() + await sema.acquire() + await sema.acquire() + + check sema.count == 0 + + test "Should release": + let sema = AsyncSemaphore.new(3) + + await sema.acquire() + await sema.acquire() + await sema.acquire() + + check sema.count == 0 + sema.release() + sema.release() + sema.release() + check sema.count == 3 + + test "Should queue acquire": + let sema = AsyncSemaphore.new(1) + + await sema.acquire() + let fut = sema.acquire() + + check sema.count == 0 + sema.release() + sema.release() + check sema.count == 1 + + await sleepAsync(10.millis) + check fut.finished() + + test "Should keep count == size": + let sema = AsyncSemaphore.new(1) + sema.release() + sema.release() + sema.release() + check sema.count == 1 + + test "Should tryAcquire": + let sema = AsyncSemaphore.new(1) + await sema.acquire() + check sema.tryAcquire() == false + + test "Should tryAcquire and acquire": + let sema = AsyncSemaphore.new(4) + check sema.tryAcquire() == true + check sema.tryAcquire() == true + check sema.tryAcquire() == true + check sema.tryAcquire() == true + check sema.count == 0 + + let fut = sema.acquire() + check fut.finished == false + check sema.count == 0 + + sema.release() + sema.release() + sema.release() + sema.release() + sema.release() + + check fut.finished == true + check sema.count == 4 + + test "Should restrict resource access": + let sema = AsyncSemaphore.new(3) + var resource = 0 + + proc task() {.async.} = + try: + await sema.acquire() + resource.inc() + check resource > 0 and resource <= 3 + let sleep = rand(0..10).millis + # echo sleep + await sleepAsync(sleep) + finally: + resource.dec() + sema.release() + + var tasks: seq[Future[void]] + for i in 0..<10: + tasks.add(task()) + + await allFutures(tasks) + + test "Should cancel sequential semaphore slot": + let sema = AsyncSemaphore.new(1) + + await sema.acquire() + + let + tmp = sema.acquire() + tmp2 = sema.acquire() + check: + not tmp.finished() + not tmp2.finished() + + tmp.cancel() + sema.release() + + check tmp2.finished() + + sema.release() + + check await sema.acquire().withTimeout(10.millis) + + test "Should handle out of order cancellations": + let sema = AsyncSemaphore.new(1) + + await sema.acquire() # 1st acquire + let tmp1 = sema.acquire() # 2nd acquire + check not tmp1.finished() + + let tmp2 = sema.acquire() # 3rd acquire + check not tmp2.finished() + + let tmp3 = sema.acquire() # 4th acquire + check not tmp3.finished() + + # up to this point, we've called acquire 4 times + tmp1.cancel() # 1st release (implicit) + tmp2.cancel() # 2nd release (implicit) + + check not tmp3.finished() # check that we didn't release the wrong slot + + sema.release() # 3rd release (explicit) + check tmp3.finished() + + sema.release() # 4th release + check await sema.acquire().withTimeout(10.millis) + + test "Should properly handle timeouts and cancellations": + let sema = AsyncSemaphore.new(1) + + await sema.acquire() + check not(await sema.acquire().withTimeout(1.millis)) # Should not acquire but cancel + sema.release() + + check await sema.acquire().withTimeout(10.millis) + + test "Should handle forceAcquire properly": + let sema = AsyncSemaphore.new(1) + + await sema.acquire() + check not(await sema.acquire().withTimeout(1.millis)) # Should not acquire but cancel + + let + fut1 = sema.acquire() + fut2 = sema.acquire() + + sema.forceAcquire() + sema.release() + + await fut1 or fut2 or sleepAsync(1.millis) + check: + fut1.finished() + not fut2.finished() + + sema.release() + await fut1 or fut2 or sleepAsync(1.millis) + check: + fut1.finished() + fut2.finished() + + + sema.forceAcquire() + sema.forceAcquire() + + let + fut3 = sema.acquire() + fut4 = sema.acquire() + fut5 = sema.acquire() + sema.release() + sema.release() + await sleepAsync(1.millis) + check: + fut3.finished() + fut4.finished() + not fut5.finished() diff --git a/tests/datastore/testdatabuffer.nim b/tests/datastore/testdatabuffer.nim new file mode 100644 index 00000000..7fc3ceba --- /dev/null +++ b/tests/datastore/testdatabuffer.nim @@ -0,0 +1,107 @@ +import std/options +import std/sequtils +import std/algorithm +import std/locks +import std/os +import pkg/stew/byteutils +import pkg/unittest2 +import pkg/questionable +import pkg/questionable/results +import pkg/datastore/key + +include ../../datastore/threads/databuffer + +var + shareVal: DataBuffer + lock: Lock + cond: Cond + +var threads: array[2,Thread[int]] + +proc thread1(val: int) {.thread.} = + echo "thread1" + {.cast(gcsafe).}: + for i in 0..