-
Notifications
You must be signed in to change notification settings - Fork 3
Threaded IO datastore with minimal modifications #48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
dryajov
wants to merge
173
commits into
master
Choose a base branch
from
threadpool-semaphore
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
173 commits
Select commit
Hold shift + click to select a range
b322d0b
setup the path correctly
dryajov 030dc9e
fix conflicting testing symbols
dryajov 542fdf2
initial data buffer
elcritch e81c5fa
add helpers
elcritch da0eafe
setup shared datastore
elcritch 57450fe
make blank sharedds
elcritch e592aaf
updating items
elcritch edc9321
updates
elcritch d39c096
setting up thread backend
elcritch d09ef9b
plumbing
elcritch 055edb4
plumbing values
elcritch 5ef4196
plumbing values
elcritch 3bde00d
plumbing values
elcritch a0d60cc
reworking results and tasks
elcritch 972ee22
more changes
elcritch ec4b442
more changes
elcritch 368c251
more changes
elcritch 2809788
getting somewhere now
elcritch 8644cbc
fixes
elcritch 567e0d7
fixes
elcritch 321a4b6
cleanup
elcritch 567f84e
cleanup
elcritch b48e45c
adding testing
elcritch 0daf168
adding testing
elcritch d5fe24c
adding testing
elcritch 2e9695e
adding testing
elcritch 464896b
adding test backend
elcritch 6c5059e
fixing tp setup
elcritch 0790cca
setup sharedptr
elcritch a9e9076
updates
elcritch ab850d4
updates
elcritch a3cff98
updates
elcritch 8ab5da0
create backend
elcritch 3da60e0
add threadid
elcritch f66531d
passing put args
elcritch 0b2e016
adding sharedds get
elcritch 6c133a2
cleanup
elcritch 86a7085
switching up
elcritch aa01665
switching up errors
elcritch 35c4466
switching up errors
elcritch 727f8b6
properly close signal even on cancel
elcritch 0d4e6f2
switch to smartptrs
elcritch 768de3b
cleanup
elcritch 89f43f5
more cleanup
elcritch b90eff2
cleanup
elcritch a359edb
cleanup
elcritch d60bd35
rework ds
elcritch 88e26f9
rework ds
elcritch d70e0df
rework ds
elcritch 904d65b
add memory (test) ds
elcritch 6399534
add memory (test) ds
elcritch 9ee931b
add memory (test) ds
elcritch de406a5
add memory (test) ds
elcritch 9b29a00
add memory (test) ds
elcritch 170d50d
add memory (test) ds
elcritch 985697b
add memory (test) ds
elcritch a7f6794
add memory (test) ds
elcritch 23423e5
add memory (test) ds
elcritch 344ba6a
basic datstore checks
elcritch a11a681
basic datstore checks
elcritch 8445920
passing basic ds tests
elcritch 84986c7
sorting keys - fix
elcritch 18e96b6
cleanup
elcritch f318877
cleanup
elcritch b27593e
cleanup
elcritch 51683be
cleanup
elcritch 03830ce
cleanup
elcritch a68ce94
test setup
elcritch 6364c83
test setup
elcritch 40cc03e
updates
elcritch 22c9d85
rename sharedds to threadproxyds
elcritch 2cded13
impl delete
elcritch 9fa6c6e
impl has
elcritch db666f9
impl has
elcritch c21532b
impl has
elcritch 11267c8
first impl of batch ops
elcritch a2c9a7e
various cleanup
elcritch 1e6fc45
don't test 1.2
elcritch 485f2ec
cleanup
elcritch 9e0b976
change TResult to case type
elcritch 5f13765
switch to result type
elcritch 7a3c64d
switch to result type
elcritch 9d6e6ae
cleanup
elcritch e52f794
add querybuffer type
elcritch a2f7745
add querybuffer type
elcritch 3f8c471
add querybuffer type
elcritch 120f770
implementing query type
elcritch 14c3947
implementing query type
elcritch f1d2a0e
implementing query type
elcritch 428b3e6
implementing query type
elcritch 2979ae0
implementing query type
elcritch 9ea1b58
implementing query type
elcritch 76ec7e7
implementing query type
elcritch 2c881d9
cleanup
elcritch 3f3e4b8
update nim
elcritch 64434b7
stylechecks workaround
elcritch da83b04
remove stylechecks workaround
elcritch a95cd9c
bump required nim version
elcritch b1a5b9c
implementing query type
elcritch 52286b8
query iterator using items is breaks when the DS isn't blocking
elcritch ed508b4
query iterator using items is breaks when the DS isn't blocking
elcritch 9b004cd
query iterator using items is breaks when the DS isn't blocking
elcritch 8fccc77
query iterator using items is breaks when the DS isn't blocking
elcritch 3a9ee98
query iterator using items is breaks when the DS isn't blocking
elcritch 221d193
query iterator using items is breaks when the DS isn't blocking
elcritch 2d2b663
compiler really doesn't like this
elcritch 1f00125
fix compiler issue -- wasn't detecting discard on result correctly
elcritch 0caa606
fix test
elcritch 619d3e9
add tests for new ds'es
elcritch 24ce85e
add tests for new ds'es
elcritch f2bfe7a
add tests for new ds'es
elcritch 2a04a36
add tests for new ds'es
elcritch 1760ae2
set manual version of chronos
elcritch e84ccb2
add taskpools
elcritch 6c3b0d6
fix unitest2 import
dryajov 146cbcb
wip
dryajov b7454d6
foreign buffer
dryajov 9bbf3ed
use shared table
dryajov 13e89bc
adding semaphore for async backpresure
dryajov 184420c
re-adding databuf
dryajov 5adc7c9
this would never work
dryajov 4c48383
reworked with less copying
dryajov 2829ac8
remove useless async annotation
dryajov d6c4d97
add $ operator
dryajov 19954c6
quick and dirty query
dryajov 776c58f
re-added query and added raises (do they work?)
dryajov 7a9bc11
make concurrent (but don't need anymore)
dryajov 6a3882f
reverted query back (it works as is)
dryajov 88eb96e
don't use toSeq
dryajov 6c86d2b
make sure all tests pass
dryajov f003812
remove asyncsemaphore
dryajov af31030
enable sqllite threading tests
dryajov 3500913
adding semaphore
dryajov 221e93f
remove asyncsemaphore
dryajov e39bd53
wip
dryajov 812b4fd
use foreach iterator
dryajov 6d5c471
add sqlite query tests
dryajov a96de2c
fix
dryajov 59f1f0b
add semaphore tests
dryajov 25678b2
change iter constructore back to new
dryajov a17e9fe
remove memdb (not needed)
dryajov b2cb1fd
remove memdb tests
dryajov 0fde28a
move assert into lock
dryajov 71c704f
adding todo
dryajov 600dca6
adding serialization to query iter
dryajov 783ecc3
fix databuffer tests
dryajov 7f6d95b
adding databuffer tests
dryajov 20d7234
re-enable missing key check
dryajov 1713c76
adding back async semaphore
dryajov d151c01
enable cancellations
dryajov 9e27eec
adding chronicles and generating lock file
dryajov f6acaa6
add async semaphore tests
dryajov bee79ff
added (ugly!) locking capabilities
dryajov 84681cd
make all tests pass
dryajov f849c1c
re-export threaded datastore
dryajov 52d6a85
fixing dumb error in trace
dryajov 7306a0b
simplify locking
dryajov ed09b9c
re-enable tests
dryajov 75fa37f
avoid duplicating code
dryajov 2c5186e
use baseAddr
dryajov d5a1b34
remove ptr to Datastore in TaskCtx, it's a ref
dryajov 3d33820
change logscope
dryajov 0beb3d3
handle error passing and conversion better
dryajov bb304a2
fix broken key not found test
dryajov 181168d
fix tired ds
dryajov 14f8c3a
check for nil ctx and set iter.finished correctly
dryajov 7ceccf9
get rid of unsafeAddr everywhere
dryajov 2eb120b
make path threadvar
dryajov 81372c9
duh
dryajov 3d84781
copy data and keys to thread local gc
dryajov 9013a0c
GC_fullcollect() in tests to check mem consistency
dryajov 215d334
avoid segfault on fullcollect
dryajov 4cd74af
add arc/orc support
dryajov File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,3 +9,6 @@ coverage | |
datastore.nims | ||
nimcache | ||
TODO | ||
nim.cfg | ||
nimble.develop | ||
nimble.paths |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
# Nim-Libp2p | ||
# Copyright (c) 2023 Status Research & Development GmbH | ||
# Licensed under either of | ||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) | ||
# * MIT license ([LICENSE-MIT](LICENSE-MIT)) | ||
# at your option. | ||
# This file may not be copied, modified, or distributed except according to | ||
# those terms. | ||
|
||
{.push raises: [].} | ||
|
||
import sequtils | ||
import chronos, chronicles | ||
|
||
# TODO: this should probably go in chronos | ||
|
||
logScope: | ||
topics = "datastore semaphore" | ||
|
||
type | ||
AsyncSemaphore* = ref object of RootObj | ||
size*: int | ||
count: int | ||
queue: seq[Future[void]] | ||
|
||
func new*(_: type AsyncSemaphore, size: int): AsyncSemaphore = | ||
AsyncSemaphore(size: size, count: size) | ||
|
||
proc `count`*(s: AsyncSemaphore): int = s.count | ||
|
||
proc tryAcquire*(s: AsyncSemaphore): bool = | ||
## Attempts to acquire a resource, if successful | ||
## returns true, otherwise false | ||
## | ||
|
||
if s.count > 0 and s.queue.len == 0: | ||
s.count.dec | ||
trace "Acquired slot", available = s.count, queue = s.queue.len | ||
return true | ||
|
||
proc acquire*(s: AsyncSemaphore): Future[void] = | ||
## Acquire a resource and decrement the resource | ||
## counter. If no more resources are available, | ||
## the returned future will not complete until | ||
## the resource count goes above 0. | ||
## | ||
|
||
let fut = newFuture[void]("AsyncSemaphore.acquire") | ||
if s.tryAcquire(): | ||
fut.complete() | ||
return fut | ||
|
||
proc cancellation(udata: pointer) {.gcsafe.} = | ||
fut.cancelCallback = nil | ||
if not fut.finished: | ||
s.queue.keepItIf( it != fut ) | ||
|
||
fut.cancelCallback = cancellation | ||
|
||
s.queue.add(fut) | ||
|
||
trace "Queued slot", available = s.count, queue = s.queue.len | ||
return fut | ||
|
||
proc forceAcquire*(s: AsyncSemaphore) = | ||
## ForceAcquire will always succeed, | ||
## creating a temporary slot if required. | ||
## This temporary slot will stay usable until | ||
## there is less `acquire`s than `release`s | ||
s.count.dec | ||
|
||
proc release*(s: AsyncSemaphore) = | ||
## Release a resource from the semaphore, | ||
## by picking the first future from the queue | ||
## and completing it and incrementing the | ||
## internal resource count | ||
## | ||
|
||
doAssert(s.count <= s.size) | ||
|
||
if s.count < s.size: | ||
trace "Releasing slot", available = s.count, | ||
queue = s.queue.len | ||
|
||
s.count.inc | ||
while s.queue.len > 0: | ||
var fut = s.queue[0] | ||
s.queue.delete(0) | ||
if not fut.finished(): | ||
s.count.dec | ||
fut.complete() | ||
break | ||
|
||
trace "Released slot", available = s.count, | ||
queue = s.queue.len | ||
return |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
import threading/smartptrs | ||
import std/hashes | ||
import pkg/stew/ptrops | ||
|
||
export hashes | ||
|
||
type | ||
DataBufferHolder* = object | ||
buf: ptr UncheckedArray[byte] | ||
size: int | ||
|
||
DataBuffer* = SharedPtr[DataBufferHolder] ##\ | ||
## A fixed length data buffer using a SharedPtr. | ||
## It is thread safe even with `refc` since | ||
## it doesn't use string or seq types internally. | ||
## | ||
|
||
proc `=destroy`*(x: var DataBufferHolder) = | ||
## copy pointer implementation | ||
## | ||
|
||
if x.buf != nil: | ||
# echo "buffer: FREE: ", repr x.buf.pointer | ||
deallocShared(x.buf) | ||
|
||
proc len*(a: DataBuffer): int = a[].size | ||
|
||
proc isNil*(a: DataBuffer): bool = smartptrs.isNil(a) | ||
|
||
proc hash*(a: DataBuffer): Hash = | ||
a[].buf.toOpenArray(0, a[].size-1).hash() | ||
|
||
proc `==`*(a, b: DataBuffer): bool = | ||
if a.isNil and b.isNil: return true | ||
elif a.isNil or b.isNil: return false | ||
elif a[].size != b[].size: return false | ||
elif a[].buf == b[].buf: return true | ||
else: a.hash() == b.hash() | ||
|
||
proc new*(tp: type DataBuffer, size: int = 0): DataBuffer = | ||
## allocate new buffer with given size | ||
## | ||
|
||
newSharedPtr(DataBufferHolder( | ||
buf: cast[typeof(result[].buf)](allocShared0(size)), | ||
size: size, | ||
)) | ||
|
||
proc new*[T: byte | char](tp: type DataBuffer, data: openArray[T]): DataBuffer = | ||
## allocate new buffer and copies indata from openArray | ||
## | ||
result = DataBuffer.new(data.len) | ||
if data.len() > 0: | ||
# TODO: we might want to copy data, otherwise the GC might | ||
# release it on stack-unwind | ||
copyMem(result[].buf, baseAddr data, data.len) | ||
|
||
converter toSeq*(self: DataBuffer): seq[byte] = | ||
## convert buffer to a seq type using copy and either a byte or char | ||
## | ||
|
||
result = newSeq[byte](self.len) | ||
if self.len() > 0: | ||
copyMem(addr result[0], addr self[].buf[0], self.len) | ||
|
||
proc `@`*(self: DataBuffer): seq[byte] = | ||
## Convert a buffer to a seq type using copy and | ||
## either a byte or char | ||
## | ||
|
||
self.toSeq() | ||
|
||
converter toString*(data: DataBuffer): string = | ||
## convert buffer to string type using copy | ||
## | ||
|
||
if data.isNil: return "" | ||
result = newString(data.len()) | ||
if data.len() > 0: | ||
copyMem(addr result[0], addr data[].buf[0], data.len) | ||
|
||
proc `$`*(data: DataBuffer): string = | ||
## convert buffer to string type using copy | ||
## | ||
|
||
data.toString() | ||
|
||
converter toBuffer*(err: ref CatchableError): DataBuffer = | ||
## convert exception to an object with StringBuffer | ||
## | ||
|
||
return DataBuffer.new(err.msg) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good! I was thinking about that this weekend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, but maybe this should also be switched to
baseAddr
? Not sure.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or the other way around, switch
baseAddr
toaddr
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Imo,
addr
is probably safer,baseAddr
does a straight upcast
.