Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Threadpool refactor fsds #55

Open
wants to merge 76 commits into
base: threadpool-refactor
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
51d2b47
initial take at making fsds synchronous
elcritch Sep 27, 2023
24ac1a8
initial take at making fsds synchronous
elcritch Sep 27, 2023
653ec99
initial take at making fsds synchronous
elcritch Sep 27, 2023
5242b85
initial take at making fsds synchronous
elcritch Sep 27, 2023
bb9e343
initial take at making fsds synchronous
elcritch Sep 27, 2023
235232a
initial take at making fsds synchronous
elcritch Sep 27, 2023
5e42426
rework tuple types
elcritch Sep 27, 2023
49d846f
make Threadproxyds generic
elcritch Sep 27, 2023
b4b534b
loop tests
elcritch Sep 27, 2023
0300841
loop tests
elcritch Sep 27, 2023
a4f0c48
fix merge
elcritch Sep 27, 2023
1fd80c6
fix merge
elcritch Sep 27, 2023
c51d354
add nextSignal using mutex
elcritch Sep 27, 2023
30728c1
add nextSignal using mutex
elcritch Sep 27, 2023
e571d21
add nextSignal using mutex
elcritch Sep 27, 2023
3cc21b3
add nextSignal using mutex
elcritch Sep 27, 2023
1da59ba
add nextSignal using mutex
elcritch Sep 27, 2023
ac77917
remove ctx locks
elcritch Sep 27, 2023
ca5695f
remove ctx locks
elcritch Sep 27, 2023
fbc0061
remove ctx locks
elcritch Sep 27, 2023
efd2e1d
running 1000+ outer loops
elcritch Sep 27, 2023
a4748ef
running 1000+ outer loops
elcritch Sep 27, 2023
78ea3b1
global taskpool
elcritch Sep 27, 2023
5afec2b
change nextSignal back to ThreadSignalPtr for timeouts
elcritch Sep 27, 2023
6952987
running 1000+ outer loops
elcritch Sep 27, 2023
74953e1
try newer questionable?
elcritch Sep 27, 2023
8db2857
Merge branch 'threadpool-refactor-generics' into threadpool-refactor-…
elcritch Sep 27, 2023
9fdba84
try newer questionable?
elcritch Sep 28, 2023
10bad7e
try newer questionable?
elcritch Sep 28, 2023
2eb0ee6
update fsds
elcritch Sep 28, 2023
1317120
update fsds
elcritch Sep 28, 2023
bda61df
update fsds
elcritch Sep 28, 2023
36ec858
refactor tests
elcritch Sep 28, 2023
439fd92
refactor tests
elcritch Sep 28, 2023
c215f9c
refactor tests
elcritch Sep 28, 2023
ef5a30f
refactor tests
elcritch Sep 28, 2023
ed34fe2
refactor tests
elcritch Sep 28, 2023
2a96b1b
refactor tests
elcritch Sep 28, 2023
4fa532e
refactor tests
elcritch Sep 28, 2023
1b7866a
refactor
elcritch Sep 28, 2023
f587677
refactor - tests
elcritch Sep 28, 2023
9e946e6
refactor - tests
elcritch Sep 28, 2023
1c2c5f1
refactor - tests
elcritch Sep 28, 2023
5345550
refactor - tests
elcritch Sep 28, 2023
e778fdb
refactor - tests
elcritch Sep 28, 2023
d5850eb
refactor - tests
elcritch Sep 28, 2023
ade0898
refactor - tests
elcritch Sep 28, 2023
cac5d52
refactor - tests
elcritch Sep 28, 2023
d4a7549
refactor - tests
elcritch Sep 28, 2023
17337ea
refactor - tests
elcritch Sep 28, 2023
f0591a0
refactor - tests
elcritch Sep 28, 2023
16bd98d
refactor - tests
elcritch Sep 28, 2023
ae31185
refactor - tests
elcritch Sep 28, 2023
dcb70ca
refactor - tests
elcritch Sep 28, 2023
0f0e113
refactor - tests
elcritch Sep 28, 2023
fb73d6b
refactor - tests
elcritch Sep 28, 2023
1ef4825
refactor - tests
elcritch Sep 28, 2023
ea4790a
refactor - tests
elcritch Sep 28, 2023
58fcdd9
switch to two sqlites for now
elcritch Sep 28, 2023
e761957
switch to two sqlites for now
elcritch Sep 28, 2023
b7c9308
switch to two sqlites for now
elcritch Sep 28, 2023
8f2f06e
skip fsds for now
elcritch Sep 28, 2023
4b804d1
change threding lib
elcritch Sep 28, 2023
e6afc68
whats up with windows + questionable + generics
elcritch Sep 28, 2023
eba4033
whats up with windows + questionable + generics
elcritch Sep 28, 2023
e653364
whats up with windows + questionable + generics
elcritch Sep 28, 2023
f98432d
fixed thing
elcritch Sep 28, 2023
d2d051d
fixed thing
elcritch Sep 28, 2023
3822a97
fixed thing
elcritch Sep 28, 2023
46f22dc
test queries
elcritch Sep 28, 2023
bd79cb7
test task cancel
elcritch Sep 28, 2023
2481746
test task cancel
elcritch Sep 28, 2023
43520c3
test task cancel
elcritch Sep 28, 2023
47fd3ba
test task cancel
elcritch Sep 28, 2023
0df63d6
test task cancel
elcritch Sep 28, 2023
374e5a8
test task cancel
elcritch Sep 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datastore.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ license = "Apache License 2.0 or MIT"
requires "nim >= 1.6.14",
"asynctest >= 0.3.1 & < 0.4.0",
"chronos#0277b65be2c7a365ac13df002fba6e172be55537",
"questionable >= 0.10.3 & < 0.11.0",
"questionable#head",
"sqlite3_abi",
"stew",
"unittest2",
"pretty",
"threading",
"https://github.com/elcritch/threading#test-smartptrsleak",
"taskpools",
"upraises >= 0.1.0 & < 0.2.0",
"chronicles"
Expand Down
8 changes: 3 additions & 5 deletions datastore/backend.nim
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ proc dbQuery*[K](

proc `$`*(id: KeyId): string = $(id.data)

proc toKey*(tp: typedesc[KeyId], id: cstring): KeyId = KeyId.new(id)
proc toKey*(tp: typedesc[string], id: cstring): string = $(id)
proc toKey*(tp: typedesc[KeyId], id: string|cstring): KeyId = KeyId.new($id)
proc toKey*(tp: typedesc[string], id: string|cstring): string = $(id)
# proc toKey*(tp: typedesc[Key], id: string|cstring): KeyId = Key.init($id).expect("valid key")

template toVal*(tp: typedesc[DataBuffer], id: openArray[byte]): DataBuffer = DataBuffer.new(id)
template toVal*(tp: typedesc[seq[byte]], id: openArray[byte]): seq[byte] = @(id)
Expand All @@ -62,6 +63,3 @@ proc new*(tp: typedesc[KeyId], id: cstring): KeyId =

proc new*(tp: typedesc[KeyId], id: string): KeyId =
KeyId(data: DataBuffer.new(id))

template toOpenArray*(x: DbKey): openArray[char] =
x.data.toOpenArray(char)
201 changes: 110 additions & 91 deletions datastore/fsds.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,36 @@ import std/os
import std/options
import std/strutils

import pkg/chronos
import pkg/questionable
import pkg/questionable/results
from pkg/stew/results as stewResults import get, isErr
import pkg/upraises

import ./backend
import ./datastore

export datastore

push: {.upraises: [].}

type
FSDatastore* = ref object of Datastore
root*: string
FSDatastore*[K, V] = object
root*: DataBuffer
ignoreProtected: bool
depth: int

proc isRootSubdir*(root, path: string): bool =
path.startsWith(root)

proc validDepth*(self: FSDatastore, key: Key): bool =
key.len <= self.depth

proc isRootSubdir*(self: FSDatastore, path: string): bool =
path.startsWith(self.root)

proc path*(self: FSDatastore, key: Key): ?!string =
proc findPath*[K,V](self: FSDatastore[K,V], key: K): ?!string =
## Return filename corresponding to the key
## or failure if the key doesn't correspond to a valid filename
##

let root = $self.root
let key = Key.init($key).get()
if not self.validDepth(key):
return failure "Path has invalid depth!"

Expand All @@ -53,22 +54,27 @@ proc path*(self: FSDatastore, key: Key): ?!string =
segments.add(ns.field / ns.value)

let
fullname = (self.root / segments.joinPath())
fullname = (root / segments.joinPath())
.absolutePath()
.catch()
.get()
.addFileExt(FileExt)

if not self.isRootSubdir(fullname):
if not root.isRootSubdir(fullname):
return failure "Path is outside of `root` directory!"

return success fullname

method has*(self: FSDatastore, key: Key): Future[?!bool] {.async.} =
return self.path(key).?fileExists()
proc has*[K,V](self: FSDatastore[K,V], key: K): ?!bool =
without path =? self.findPath(key), error:
return failure error
success path.fileExists()

proc contains*[K](self: FSDatastore, key: K): bool =
return self.has(key).get()

method delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} =
without path =? self.path(key), error:
proc delete*[K,V](self: FSDatastore[K,V], key: K): ?!void =
without path =? self.findPath(key), error:
return failure error

if not path.fileExists():
Expand All @@ -81,33 +87,39 @@ method delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} =

return success()

method delete*(self: FSDatastore, keys: seq[Key]): Future[?!void] {.async.} =
proc delete*[K,V](self: FSDatastore[K,V], keys: openArray[K]): ?!void =
for key in keys:
if err =? (await self.delete(key)).errorOption:
if err =? self.delete(key).errorOption:
return failure err

return success()

proc readFile*(self: FSDatastore, path: string): ?!seq[byte] =
proc readFile[V](self: FSDatastore, path: string): ?!V =
var
file: File

defer:
file.close

if not file.open(path):
return failure "unable to open file!"
return failure "unable to open file! path: " & path

try:
let
size = file.getFileSize
size = file.getFileSize().int

when V is seq[byte]:
var bytes = newSeq[byte](size)
elif V is V:
var bytes = V.new(size=size)
else:
{.error: "unhandled result type".}
var
bytes = newSeq[byte](size)
read = 0

# echo "BYTES: ", bytes.repr
while read < size:
read += file.readBytes(bytes, read, size)
read += file.readBytes(bytes.toOpenArray(0, size-1), read, size)

if read < size:
return failure $read & " bytes were read from " & path &
Expand All @@ -118,61 +130,71 @@ proc readFile*(self: FSDatastore, path: string): ?!seq[byte] =
except CatchableError as e:
return failure e

method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async.} =
without path =? self.path(key), error:
proc get*[K,V](self: FSDatastore[K,V], key: K): ?!V =
without path =? self.findPath(key), error:
return failure error

if not path.fileExists():
return failure(
newException(DatastoreKeyNotFound, "Key doesn't exist"))

return self.readFile(path)
return readFile[V](self, path)

method put*(
self: FSDatastore,
key: Key,
data: seq[byte]): Future[?!void] {.async.} =
proc put*[K,V](self: FSDatastore[K,V],
key: K,
data: V
): ?!void =

without path =? self.path(key), error:
without path =? self.findPath(key), error:
return failure error

try:
var data = data
createDir(parentDir(path))
writeFile(path, data)
writeFile(path, data.toOpenArray(0, data.len()-1))
except CatchableError as e:
return failure e

return success()

method put*(
proc put*[K,V](
self: FSDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async.} =
batch: seq[DbBatchEntry[K, V]]): ?!void =

for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption:
if err =? self.put(entry.key, entry.data).errorOption:
return failure err

return success()

proc dirWalker(path: string): iterator: string {.gcsafe.} =
var localPath {.threadvar.}: string

localPath = path
return iterator(): string =
try:
for p in path.walkDirRec(yieldFilter = {pcFile}, relative = true):
yield p
except CatchableError as exc:
raise newException(Defect, exc.msg)
iterator dirIter(path: string): string {.gcsafe.} =
try:
for p in path.walkDirRec(yieldFilter = {pcFile}, relative = true):
yield p
except CatchableError as exc:
raise newException(Defect, exc.msg)

method close*(self: FSDatastore): Future[?!void] {.async.} =
proc close*[K,V](self: FSDatastore[K,V]): ?!void =
return success()

method query*(
self: FSDatastore,
query: Query): Future[?!QueryIter] {.async.} =

without path =? self.path(query.key), error:
type
FsQueryHandle*[K, V] = object
query*: DbQuery[K]
cancel*: bool
closed*: bool
env*: FsQueryEnv[K,V]

FsQueryEnv*[K,V] = object
self: FSDatastore[K,V]
basePath: DataBuffer

proc query*[K,V](
self: FSDatastore[K,V],
query: DbQuery[K],
): Result[FsQueryHandle[K, V], ref CatchableError] =

let key = query.key
without path =? self.findPath(key), error:
return failure error

let basePath =
Expand All @@ -184,61 +206,58 @@ method query*(
path.parentDir
else:
path.changeFileExt("")

let env = FsQueryEnv[K,V](self: self, basePath: DataBuffer.new(basePath))
success FsQueryHandle[K, V](query: query, env: env)

let
walker = dirWalker(basePath)

var
iter = QueryIter.new()
proc close*[K,V](handle: var FsQueryHandle[K,V]) =
if not handle.closed:
handle.closed = true

var lock = newAsyncLock() # serialize querying under threads
proc next(): Future[?!QueryResponse] {.async.} =
defer:
if lock.locked:
lock.release()

if lock.locked:
return failure (ref DatastoreError)(msg: "Should always await query features")

let
path = walker()
iterator queryIter*[K, V](
handle: var FsQueryHandle[K, V]
): ?!DbQueryResponse[K, V] =
let root = $(handle.env.self.root)
let basePath = $(handle.env.basePath)

if iter.finished:
return failure "iterator is finished"

await lock.acquire()

if finished(walker):
iter.finished = true
return success (Key.none, EmptyBytes)
for path in basePath.dirIter():
if handle.cancel:
break

var
basePath = $handle.env.basePath
keyPath = basePath

keyPath.removePrefix(self.root)
keyPath.removePrefix(root)
keyPath = keyPath / path.changeFileExt("")
keyPath = keyPath.replace("\\", "/")

let
key = Key.init(keyPath).expect("should not fail")
flres = (basePath / path).absolutePath().catch
if flres.isErr():
yield DbQueryResponse[K,V].failure flres.error()
continue

let
key = K.toKey($Key.init(keyPath).expect("valid key"))
data =
if query.value:
self.readFile((basePath / path).absolutePath)
.expect("Should read file")
if handle.query.value:
let res = readFile[V](handle.env.self, flres.get)
if res.isErr():
yield DbQueryResponse[K,V].failure res.error()
continue
res.get()
else:
@[]

return success (key.some, data)
V.new()

iter.next = next
return success iter
yield success (key.some, data)
handle.close()

proc new*(
T: type FSDatastore,
root: string,
depth = 2,
caseSensitive = true,
ignoreProtected = false): ?!T =
proc newFSDatastore*[K,V](root: string,
depth = 2,
caseSensitive = true,
ignoreProtected = false
): ?!FSDatastore[K,V] =

let root = ? (
block:
Expand All @@ -248,7 +267,7 @@ proc new*(
if not dirExists(root):
return failure "directory does not exist: " & root

success T(
root: root,
success FSDatastore[K,V](
root: DataBuffer.new root,
ignoreProtected: ignoreProtected,
depth: depth)
9 changes: 4 additions & 5 deletions datastore/sql.nim
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,9 @@ method put*(self: SQLiteDatastore,
method close*(self: SQLiteDatastore): Future[?!void] {.async.} =
self.db.close()

method queryIter*(
self: SQLiteDatastore,
query: Query
): ?!(iterator(): ?!QueryResponse) =
method queryIter*(self: SQLiteDatastore,
query: Query
): ?!(iterator(): ?!QueryResponse) =

let dbquery = dbQuery(
key= KeyId.new query.key.id(),
Expand All @@ -75,7 +74,7 @@ method queryIter*(
var qhandle = ? self.db.query(dbquery)

let iter = iterator(): ?!QueryResponse =
for resp in qhandle.iter():
for resp in qhandle.queryIter():
without qres =? resp, err:
yield QueryResponse.failure err
let k = qres.key.map() do(k: KeyId) -> Key:
Expand Down
Loading