Skip to content

Test concurrent upserts on a single table #689

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lsm-tree.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ test-suite lsm-tree-test
Test.Database.LSMTree.Internal.WriteBufferBlobs.FS
Test.Database.LSMTree.Internal.WriteBufferReader.FS
Test.Database.LSMTree.Model.Table
Test.Database.LSMTree.Parallel
Test.Database.LSMTree.Resolve
Test.Database.LSMTree.StateMachine
Test.Database.LSMTree.StateMachine.DL
Expand Down
13 changes: 12 additions & 1 deletion src/Database/LSMTree/Internal/Primitive.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
{-# OPTIONS_HADDOCK not-home #-}

module Database.LSMTree.Internal.Primitive (
indexWord8ArrayAsWord16
byteSwapInt
, indexWord8ArrayAsInt
, indexWord8ArrayAsWord16
, indexWord8ArrayAsWord32
, indexWord8ArrayAsWord64
) where
Expand All @@ -12,6 +14,15 @@ import Data.Primitive.ByteArray (ByteArray (..))
import GHC.Exts
import GHC.Word

{-# INLINE byteSwapInt #-}
byteSwapInt :: Int -> Int
byteSwapInt (I# i#) = I# (word2Int# (byteSwap# (int2Word# i#)))

{-# INLINE indexWord8ArrayAsInt #-}
indexWord8ArrayAsInt :: ByteArray -> Int -> Int
indexWord8ArrayAsInt (ByteArray !ba#) (I# !off#) =
I# (indexWord8ArrayAsInt# ba# off#)

{-# INLINE indexWord8ArrayAsWord16 #-}
indexWord8ArrayAsWord16 :: ByteArray -> Int -> Word16
indexWord8ArrayAsWord16 (ByteArray !ba#) (I# !off#) =
Expand Down
18 changes: 17 additions & 1 deletion src/Database/LSMTree/Internal/Serialise/Class.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import qualified Data.Vector.Primitive as VP
import Data.Void (Void, absurd)
import Data.Word
import Database.LSMTree.Internal.ByteString (byteArrayToSBS)
import Database.LSMTree.Internal.Primitive (indexWord8ArrayAsWord64)
import Database.LSMTree.Internal.Primitive
import Database.LSMTree.Internal.RawBytes (RawBytes (..))
import qualified Database.LSMTree.Internal.RawBytes as RB
import Database.LSMTree.Internal.Vector
Expand Down Expand Up @@ -135,6 +135,22 @@ requireBytesExactly tyName expected actual x
. showInt actual
$ ""

{-------------------------------------------------------------------------------
Int
-------------------------------------------------------------------------------}

instance SerialiseKey Int where
serialiseKey x = RB.RawBytes $ byteVectorFromPrim $ byteSwapInt x

deserialiseKey (RawBytes (VP.Vector off len ba)) =
requireBytesExactly "Int" 8 len $ byteSwapInt (indexWord8ArrayAsInt ba off)

instance SerialiseValue Int where
serialiseValue x = RB.RawBytes $ byteVectorFromPrim $ x

deserialiseValue (RawBytes (VP.Vector off len ba)) =
requireBytesExactly "Int" 8 len $ indexWord8ArrayAsInt ba off

{-------------------------------------------------------------------------------
Word64
-------------------------------------------------------------------------------}
Expand Down
2 changes: 2 additions & 0 deletions test/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import qualified Test.Database.LSMTree.Internal.Vector.Growing
import qualified Test.Database.LSMTree.Internal.WriteBufferBlobs.FS
import qualified Test.Database.LSMTree.Internal.WriteBufferReader.FS
import qualified Test.Database.LSMTree.Model.Table
import qualified Test.Database.LSMTree.Parallel
import qualified Test.Database.LSMTree.Resolve
import qualified Test.Database.LSMTree.StateMachine
import qualified Test.Database.LSMTree.StateMachine.DL
Expand Down Expand Up @@ -89,6 +90,7 @@ main = do
, Test.Database.LSMTree.Internal.WriteBufferBlobs.FS.tests
, Test.Database.LSMTree.Internal.WriteBufferReader.FS.tests
, Test.Database.LSMTree.Model.Table.tests
, Test.Database.LSMTree.Parallel.tests
, Test.Database.LSMTree.Resolve.tests
, Test.Database.LSMTree.UnitTests.tests
, Test.Database.LSMTree.StateMachine.tests
Expand Down
3 changes: 2 additions & 1 deletion test/Test/Database/LSMTree/Internal/Serialise/Class.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import Test.Tasty.QuickCheck

tests :: TestTree
tests = testGroup "Test.Database.LSMTree.Internal.Serialise.Class"
[ testGroup "Word64" (allProperties @Word64 True)
[ testGroup "Int" (allProperties @Int False)
, testGroup "Word64" (allProperties @Word64 True)
, testGroup "ByteString" (allProperties @ByteString True)
, testGroup "LazyByteString" (allProperties @LazyByteString True)
, testGroup "ShortByteString" (allProperties @ShortByteString True)
Expand Down
85 changes: 85 additions & 0 deletions test/Test/Database/LSMTree/Parallel.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
module Test.Database.LSMTree.Parallel (tests) where

import Control.Monad.Class.MonadAsync
import Control.Tracer
import qualified Data.Map.Strict as Map
import Data.Semigroup
import qualified Data.Vector as V
import Data.Void
import Database.LSMTree
import qualified System.FS.API as FS
import Test.Database.LSMTree.UnitTests (ignoreBlobRef)
import Test.Tasty
import Test.Tasty.QuickCheck
import Test.Util.FS

tests :: TestTree
tests = testGroup "Test.Database.LSMTree.Parallel" [
testProperty "prop_concurrentUpserts" $
forAllShrink genTinyAllocNumEntries shrinkTinyAllocNumEntries
prop_concurrentUpserts
]

{-------------------------------------------------------------------------------
Concurrent upserts on one table
-------------------------------------------------------------------------------}

prop_concurrentUpserts ::
WriteBufferAlloc
-> ParallelShrink
-> V.Vector (Key, Value)
-> [V.Vector (Key, Value)]
-> V.Vector Key
-> Property
prop_concurrentUpserts wbAlloc (ParallelShrink n) setupBatch parBatches lookupBatch =
conjoin $ replicate n $
ioProperty $
withTempIOHasBlockIO "prop_concurrentUpserts" $ \hfs hbio ->
withSession nullTracer hfs hbio (FS.mkFsPath []) $ \sess ->
withTableWith @_ @Key @Value @Blob conf sess $ \table -> do
upserts table setupBatch
forConcurrently_ parBatches $ upserts table
vs <- lookups table lookupBatch
pure $ V.map modelLookup lookupBatch === V.map ignoreBlobRef vs
where
conf = defaultTableConfig { confWriteBufferAlloc = wbAlloc }

modelTable =
let ms = fromBatch setupBatch : fmap fromBatch parBatches
in Map.unionsWith resolve ms
where
fromBatch = Map.fromListWith resolve . V.toList

modelLookup k = case Map.lookup k modelTable of
Nothing -> NotFound
Just v -> Found v

newtype Key = Key Int
deriving stock (Show, Eq, Ord)
deriving Arbitrary via Small Int
deriving newtype SerialiseKey

newtype Value = Value Int
deriving stock (Show, Eq, Ord)
deriving newtype SerialiseValue
deriving ResolveValue via ResolveViaSemigroup (Sum Int)
deriving Arbitrary via Int

newtype Blob = Blob Void
deriving newtype SerialiseValue

genTinyAllocNumEntries :: Gen WriteBufferAlloc
genTinyAllocNumEntries = AllocNumEntries <$> elements [1..5]

shrinkTinyAllocNumEntries :: WriteBufferAlloc -> [WriteBufferAlloc]
shrinkTinyAllocNumEntries (AllocNumEntries x) =
[ AllocNumEntries x' | Positive x' <- shrink (Positive x), x' >= 2]

newtype ParallelShrink = ParallelShrink Int
deriving stock Show

instance Arbitrary ParallelShrink where
arbitrary = pure (ParallelShrink 1)
shrink (ParallelShrink n)
| n == 1 = [ParallelShrink 100]
| otherwise = []
6 changes: 5 additions & 1 deletion test/Test/Database/LSMTree/UnitTests.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE OverloadedStrings #-}

module Test.Database.LSMTree.UnitTests (tests) where
module Test.Database.LSMTree.UnitTests (
tests
-- * Utilities
, ignoreBlobRef
) where

import Control.Tracer (nullTracer)
import Data.ByteString (ByteString)
Expand Down