Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datastore.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license = "Apache License 2.0 or MIT"
requires "nim >= 1.6.14",
"asynctest >= 0.3.1 & < 0.4.0",
"chronos#0277b65be2c7a365ac13df002fba6e172be55537",
"questionable >= 0.10.3 & < 0.11.0",
"questionable",
"sqlite3_abi",
"stew",
"unittest2",
Expand Down
169 changes: 78 additions & 91 deletions datastore/threads/threadproxyds.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,50 +36,41 @@ logScope:

type

ThreadBackendKinds* = enum
Sqlite
# Filesystem

ThreadBackend* = object
## backend case type to avoid needing to make ThreadDatastore generic
case kind*: ThreadBackendKinds
of Sqlite:
sql*: SQLiteBackend[KeyId,DataBuffer]

TaskCtxObj*[T: ThreadTypes] = object
res: ThreadResult[T]
signal: ThreadSignalPtr
running: bool ## used to mark when a task worker is running
cancelled: bool ## used to cancel a task before it's started
nextSignal: ThreadSignalPtr

TaskCtx*[T] = SharedPtr[TaskCtxObj[T]]
## Task context object.
## This is a SharedPtr to make the query iter simpler

ThreadDatastore* = ref object of Datastore
ThreadDatastore*[BT] = ref object of Datastore
tp: Taskpool
backend: ThreadBackend
backend: BT
semaphore: AsyncSemaphore # semaphore is used for backpressure \
# to avoid exhausting file descriptors

var ctxLock: Lock
ctxLock.initLock()
proc newTaskCtx*[T](tp: typedesc[T],
signal: ThreadSignalPtr,
nextSignal: ThreadSignalPtr = nil): TaskCtx[T] =
newSharedPtr(TaskCtxObj[T](signal: signal, nextSignal: nextSignal))

proc setCancelled[T](ctx: TaskCtx[T]) =
# withLock(ctxLock):
ctx[].cancelled = true

proc setRunning[T](ctx: TaskCtx[T]): bool =
# withLock(ctxLock):
if ctx[].cancelled:
return false
ctx[].running = true
return true
proc setDone[T](ctx: TaskCtx[T]) =
# withLock(ctxLock):
ctx[].running = false

proc acquireSignal(): ?!ThreadSignalPtr =
# echo "signal:OPEN!"
let signal = ThreadSignalPtr.new()
if signal.isErr():
failure (ref CatchableError)(msg: "failed to aquire ThreadSignalPtr: " & signal.error())
Expand Down Expand Up @@ -113,33 +104,31 @@ template executeTask[T](ctx: TaskCtx[T], blk: untyped) =
ctx.setDone()
discard ctx[].signal.fireSync()

template dispatchTaskWrap[T](self: ThreadDatastore,
template dispatchTaskWrap[BT](self: ThreadDatastore[BT],
signal: ThreadSignalPtr,
blk: untyped
): auto =
var ds {.used, inject.} = self.backend
proc runTask() =
`blk`
runTask()
await wait(ctx[].signal)

template dispatchTask[BT](self: ThreadDatastore[BT],
signal: ThreadSignalPtr,
blk: untyped
): auto =
case self.backend.kind:
of Sqlite:
var ds {.used, inject.} = self.backend.sql
proc runTask() =
`blk`
runTask()
await wait(ctx[].signal)

template dispatchTask[T](self: ThreadDatastore,
signal: ThreadSignalPtr,
blk: untyped
): auto =
): auto =
## handles dispatching a task from an async context
## `blk` is the actions, it has `ctx` and `ds` variables in scope.
## note that `ds` is a generic
let ctx {.inject.} = newSharedPtr(TaskCtxObj[T](signal: signal))
try:
dispatchTaskWrap[T](self, signal, blk)
dispatchTaskWrap[BT](self, signal, blk)
except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg
ctx.setCancelled()
raise exc
finally:
# echo "signal:CLOSE!"
discard ctx[].signal.close()
self.semaphore.release()

Expand All @@ -149,38 +138,39 @@ proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} =
executeTask(ctx):
has(ds, key)

method has*(self: ThreadDatastore,
key: Key): Future[?!bool] {.async.} =
method has*[BT](self: ThreadDatastore[BT],
key: Key): Future[?!bool] {.async.} =
await self.semaphore.acquire()
without signal =? acquireSignal(), err:
return failure err

let key = KeyId.new key.id()
dispatchTask[bool](self, signal):
let ctx = newTaskCtx(bool, signal=signal)
dispatchTask(self, signal):
let key = KeyId.new key.id()
self.tp.spawn hasTask(ctx, ds, key)
return ctx[].res.toRes(v => v)


proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB;
method deleteTask[T, DB](ctx: TaskCtx[T], ds: DB;
key: KeyId) {.gcsafe.} =
## run backend command
executeTask(ctx):
delete(ds, key)

method delete*(self: ThreadDatastore,
method delete*[BT](self: ThreadDatastore[BT],
key: Key): Future[?!void] {.async.} =
## delete key
await self.semaphore.acquire()
without signal =? acquireSignal(), err:
return failure err

let key = KeyId.new key.id()
dispatchTask[void](self, signal):
let ctx = newTaskCtx(void, signal=signal)
dispatchTask(self, signal):
let key = KeyId.new key.id()
self.tp.spawn deleteTask(ctx, ds, key)

return ctx[].res.toRes()

method delete*(self: ThreadDatastore,
method delete*[BT](self: ThreadDatastore[BT],
keys: seq[Key]): Future[?!void] {.async.} =
## delete batch
for key in keys:
Expand All @@ -196,23 +186,24 @@ proc putTask[T, DB](ctx: TaskCtx[T], ds: DB;
executeTask(ctx):
put(ds, key, data)

method put*(self: ThreadDatastore,
method put*[BT](self: ThreadDatastore[BT],
key: Key,
data: seq[byte]): Future[?!void] {.async.} =
## put key with data
await self.semaphore.acquire()
without signal =? acquireSignal(), err:
return failure err

dispatchTask[void](self, signal):
let ctx = newTaskCtx(void, signal=signal)
dispatchTask(self, signal):
let key = KeyId.new key.id()
let data = DataBuffer.new data
self.tp.spawn putTask(ctx, ds, key, data)

return ctx[].res.toRes()

method put*(
self: ThreadDatastore,
method put*[DB](
self: ThreadDatastore[DB],
batch: seq[BatchEntry]): Future[?!void] {.async.} =
## put batch data
for entry in batch:
Expand All @@ -222,42 +213,38 @@ method put*(
return success()


proc getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB;
method getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB;
key: KeyId) {.gcsafe, nimcall.} =
## run backend command
executeTask(ctx):
let res = get(ds, key)
res

method get*(self: ThreadDatastore,
key: Key,
): Future[?!seq[byte]] {.async.} =
method get*[BT](self: ThreadDatastore[BT],
key: Key,
): Future[?!seq[byte]] {.async.} =
await self.semaphore.acquire()
without signal =? acquireSignal(), err:
return failure err

let key = KeyId.new key.id()
dispatchTask[DataBuffer](self, signal):
let ctx = newTaskCtx(DataBuffer, signal=signal)
dispatchTask(self, signal):
let key = KeyId.new key.id()
self.tp.spawn getTask(ctx, ds, key)

return ctx[].res.toRes(v => v.toSeq())

method close*(self: ThreadDatastore): Future[?!void] {.async.} =
method close*[BT](self: ThreadDatastore[BT]): Future[?!void] {.async.} =
await self.semaphore.closeAll()
case self.backend.kind:
of Sqlite:
self.backend.sql.close()
self.backend.close()

type
QResult = DbQueryResponse[KeyId, DataBuffer]

import os

proc queryTask[DB](
method queryTask[DB](
ctx: TaskCtx[QResult],
ds: DB,
query: DbQuery[KeyId],
nextSignal: ThreadSignalPtr
) {.gcsafe, nimcall.} =
## run query command
executeTask(ctx):
Expand All @@ -271,8 +258,8 @@ proc queryTask[DB](
# otherwise manually an set empty ok result
ctx[].res.ok (KeyId.none, DataBuffer(), )
discard ctx[].signal.fireSync()
if not nextSignal.waitSync(10.seconds).get():
raise newException(DeadThreadDefect, "query task timeout; possible deadlock!")
if not ctx[].nextSignal.waitSync(10.seconds).get():
raise newException(DeadThreadDefect, "queryTask timed out")

var handle = handleRes.get()
for item in handle.iter():
Expand All @@ -287,13 +274,13 @@ proc queryTask[DB](
exc

discard ctx[].signal.fireSync()

discard nextSignal.waitSync().get()
if not ctx[].nextSignal.waitSync(10.seconds).get():
raise newException(DeadThreadDefect, "queryTask timed out")

# set final result
(?!QResult).ok((KeyId.none, DataBuffer()))

method query*(self: ThreadDatastore,
method query*[BT](self: ThreadDatastore[BT],
q: Query
): Future[?!QueryIter] {.async.} =
## performs async query
Expand All @@ -304,23 +291,34 @@ method query*(self: ThreadDatastore,
return failure err
without nextSignal =? acquireSignal(), err:
return failure err
let ctx = newTaskCtx(QResult, signal=signal, nextSignal=nextSignal)

proc iterDispose() {.async.} =
# echo "signal:CLOSE!"
ctx.setCancelled()
await ctx[].nextSignal.fire()
discard ctx[].signal.close()
# echo "nextSignal:CLOSE!"
discard ctx[].nextSignal.close()
self.semaphore.release()

try:
let query = dbQuery(
key= KeyId.new q.key.id(),
value=q.value, limit=q.limit, offset=q.offset, sort=q.sort)

# setup initial queryTask
let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal))
dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal):
self.tp.spawn queryTask(ctx, ds, query, nextSignal)
await nextSignal.fire()
dispatchTaskWrap(self, signal):
self.tp.spawn queryTask(ctx, ds, query)
await ctx[].nextSignal.fire()

var
lock = newAsyncLock() # serialize querying under threads
iter = QueryIter.new()
var lock = newAsyncLock() # serialize querying under threads
var iter = QueryIter.new()
iter.dispose = proc (): Future[?!void] {.async.} =
iterDispose()
success()

proc next(): Future[?!QueryResponse] {.async.} =
iter.next = proc(): Future[?!QueryResponse] {.async.} =
let ctx = ctx
try:
trace "About to query"
Expand All @@ -330,12 +328,11 @@ method query*(self: ThreadDatastore,
return failure (ref QueryEndedError)(msg: "Calling next on a finished query!")

await wait(ctx[].signal)

if not ctx[].running:
iter.finished = true

defer:
await nextSignal.fire()
await ctx[].nextSignal.fire()

if ctx[].res.isErr():
return err(ctx[].res.error())
Expand All @@ -347,34 +344,24 @@ method query*(self: ThreadDatastore,
except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg
ctx.setCancelled()
discard ctx[].signal.close()
discard nextSignal.close()
self.semaphore.release()
await iterDispose() # todo: is this valid?
raise exc

iter.next = next
return success iter
except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg
discard signal.close()
discard nextSignal.close()
self.semaphore.release()
await iterDispose()
raise exc

proc new*[DB](self: type ThreadDatastore,
db: DB,
withLocks = static false,
tp: Taskpool
): ?!ThreadDatastore =
): ?!ThreadDatastore[DB] =
doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads"

when DB is SQLiteBackend[KeyId,DataBuffer]:
let backend = ThreadBackend(kind: Sqlite, sql: db)
else:
{.error: "unsupported backend: " & $typeof(db).}

success ThreadDatastore(
success ThreadDatastore[DB](
tp: tp,
backend: backend,
backend: db,
semaphore: AsyncSemaphore.new(tp.numThreads - 1)
)
Loading