Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scls-format/scls-format.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ library
Cardano.SCLS.Internal.Record.Hdr
Cardano.SCLS.Internal.Record.Internal.Class
Cardano.SCLS.Internal.Record.Manifest
Cardano.SCLS.Internal.Serializer.Builder.InMemory
Cardano.SCLS.Internal.Serializer.ChunksBuilder.InMemory
Cardano.SCLS.Internal.Serializer.External.Impl
Cardano.SCLS.Internal.Serializer.HasKey
Expand Down
4 changes: 4 additions & 0 deletions scls-format/src/Cardano/SCLS/Internal/Entry.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module Cardano.SCLS.Internal.Entry (
) where

import Cardano.SCLS.Internal.Serializer.HasKey
import Cardano.SCLS.Internal.Serializer.MemPack (MemPackHeaderOffset (..))
import Cardano.Types.ByteOrdered (BigEndian (..))
import Data.MemPack
import Data.MemPack.Buffer
Expand Down Expand Up @@ -41,6 +42,9 @@ instance (Typeable k, IsKey k, MemPack v, Typeable v) => MemPack (ChunkEntry k v
v <- unpackM
return (ChunkEntry k v)

instance (Typeable k, IsKey k, Typeable v, MemPack v) => MemPackHeaderOffset (ChunkEntry k v) where
headerSizeOffset = 4

instance (Eq k, Eq v) => Eq (ChunkEntry k v) where
(ChunkEntry k1 v1) == (ChunkEntry k2 v2) = k1 == k2 && v1 == v2

Expand Down
185 changes: 185 additions & 0 deletions scls-format/src/Cardano/SCLS/Internal/Serializer/Builder/InMemory.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeFamilies #-}

{- |
Implementation of the state machine that fills current chunk in memory.

It manages proper filling of the buffers and emitting the values when
the next item can't be added.

Current implementation expects the incoming values in already sorted order.

Implementation is done in the way so it would be possible to use it with
any existing stream and effect system as long as they could carry a state.
-}
module Cardano.SCLS.Internal.Serializer.Builder.InMemory (
BuilderItem (..),
mkMachine,
Command (..),
BuilderMachine (..),
) where

import Cardano.SCLS.Internal.Hash
import Cardano.SCLS.Internal.Serializer.MemPack
import Control.Monad.Primitive
import Crypto.Hash (Blake2b_224 (Blake2b_224))
import Crypto.Hash.MerkleTree.Incremental qualified as MT
import Data.MemPack

import Data.Kind (Type)
import Data.Primitive.ByteArray
import Data.Typeable
import Foreign.Ptr
import Unsafe.Coerce (unsafeCoerce)

-- | Typeclass for items that can be emitted by the builder state machine.
class BuilderItem item where
-- | Type of parameters needed to encode entries and build the item
type Parameters item :: Type

-- | Get the data payload of the item
bItemData :: item -> ByteArray

-- | Get the number of entries contained in the item
bItemEntriesCount :: item -> Int

-- | Construct an item from build parameters, data and entry count
bMkItem :: Parameters item -> ByteArray -> Int -> item

-- | Encode an entry for the item using the provided parameters
bEncodeEntry :: (MemPack a, Typeable a) => Parameters item -> a -> a

-- | Command for the state machine
data Command item type_ where
-- | Append a new item to the buffer.
Append :: (MemPack u, Typeable u, MemPackHeaderOffset u) => u -> Command item (BuilderMachine item, [item])
{- | Finalize building of the buffer. Calling this command does not

It's up to the implementation if the state machine can be used
after interpreting this command.
-}
Finalize :: Command item (Digest, Maybe item)

{- | State machine for building items in memory.

Basically it's an interpreter for the 'Command' type that is implemented
the way that it can be inserted into the different streaming pipelines
or effect libraries
-}
newtype BuilderMachine item = BuilderMachine
{ interpretCommand :: forall result. Command item result -> IO result
}

-- | Create an instance of the state machine.
mkMachine ::
forall item.
(BuilderItem item) =>
-- | Buffer size in bytes
Int ->
Parameters item ->
IO (BuilderMachine item)
mkMachine bufferSize params = do
-- We perform copying when we emit data outside of the state machine.
-- So this buffer is reused for all the items, as a result we copy
--
-- We allocate pinned memory because we pass it to the digest code
-- without copying by passing raw pointer.
storage <- newPinnedByteArray bufferSize

-- Use fix? We love fixed point combinators do we not?
let machine (!entriesCount :: Int) (!offset :: Int) !merkleTreeState =
BuilderMachine
{ interpretCommand = \case
Finalize -> do
let final = Digest $ MT.merkleRootHash $ MT.finalize merkleTreeState
if offset == 0 -- no new data, nothing to emit
then
pure (final, Nothing)
else do
frozenData <- freezeByteArrayPinned storage 0 offset
pure (final, Just (bMkItem params frozenData entriesCount))
Append @a input -> do
let entry = Entry $ bEncodeEntry @item params input
let l = packedByteCount entry
if offset + l <= bufferSize -- if we fit the current buffer we just need to write data and continue
then do
(merkleTreeState', newOffset) <-
unsafeAppendEntryToBuffer merkleTreeState storage offset entry
pure (machine (entriesCount + 1) newOffset merkleTreeState', [])
else do
-- We have no space in the current buffer, so we need to emit it first
frozenBuffer <- freezeByteArrayPinned storage 0 offset
if l > bufferSize
then do
let !tmpBuffer = pack entry
!merkleTreeState' = MT.add merkleTreeState (uncheckedByteArrayEntryContents @a tmpBuffer)
return
( machine 0 0 merkleTreeState'
, mkDataToEmit [(params, frozenBuffer, entriesCount), (params, tmpBuffer, 1)]
)
else do
(merkleTreeState', newOffset) <-
unsafeAppendEntryToBuffer merkleTreeState storage 0 entry
pure
( machine 1 newOffset merkleTreeState'
, mkDataToEmit [(params, frozenBuffer, entriesCount)]
)
}
return $! machine 0 0 (MT.empty Blake2b_224)

{- | Freeze a bytearray to the pinned immutable bytearray by copying its contents.

It's safe to use the source bytearray after this operation.
-}
freezeByteArrayPinned :: (PrimMonad m) => MutableByteArray (PrimState m) -> Int -> Int -> m ByteArray
freezeByteArrayPinned !src !off !len = do
dst <- newPinnedByteArray len
copyMutableByteArray dst 0 src off len
unsafeFreezeByteArray dst

unsafeAppendEntryToBuffer :: forall u. (MemPack u, Typeable u, MemPackHeaderOffset u) => MT.MerkleTreeState Blake2b_224 -> MutableByteArray (PrimState IO) -> Int -> Entry u -> IO (MT.MerkleTreeState Blake2b_224, Int)
unsafeAppendEntryToBuffer !merkleTreeState !storage !offset u = do
newOffset <- unsafeAppendToBuffer storage offset u
let l = newOffset - offset
merkleTreeState' <- withMutableByteArrayContents storage $ \ptr -> do
let csb = CStringLenBuffer (ptr `plusPtr` (offset + headerSizeOffset @u), l - headerSizeOffset @u)
return $! MT.add merkleTreeState csb
return (merkleTreeState', newOffset)

{- | Helper to get access to the entry contents.
This method should be used on the pinned 'ByteArray' only, but the function does
not enforce this.
-}
uncheckedByteArrayEntryContents :: forall a. (MemPackHeaderOffset a) => ByteArray -> CStringLenBuffer
uncheckedByteArrayEntryContents !buffer = CStringLenBuffer (byteArrayContents buffer `plusPtr` (headerSizeOffset @a), sizeofByteArray buffer - (headerSizeOffset @a))

{- | Unsafe helper that we need because MemPack interface only allows ST, and
no other PrimMonad.

There is unsafe prefix, because this function uses 'unsafeCoerce' internally,
but it ensures everything to make it safe to use.

This functions prepends the packed values with its lengths.
-}
unsafeAppendToBuffer :: (MemPack u) => MutableByteArray (PrimState IO) -> Int -> u -> IO Int
unsafeAppendToBuffer !storage !offset u = stToPrim $ do
let uInST = unsafeCoerce storage
(_, offset') <-
runStateT (runPack (packM u) uInST) offset
pure offset'

{- | Helper to create the list of items to emit from the list of
(data, count) tuples.

This function filters out items with 0 entries.
-}
mkDataToEmit :: (BuilderItem item) => [(Parameters item, ByteArray, Int)] -> [item]
mkDataToEmit = mkDataToEmit' []
where
mkDataToEmit' acc [] = reverse acc
mkDataToEmit' acc ((_, _, 0) : xs) = mkDataToEmit' acc xs
mkDataToEmit' acc ((params, u, count) : xs) =
mkDataToEmit' (bMkItem params u count : acc) xs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeFamilies #-}

{- |
Implementation of the state machine that fills current chunk in memory.
Expand All @@ -15,159 +16,34 @@ any existing stream and effect system as long as they could carry a state.
-}
module Cardano.SCLS.Internal.Serializer.ChunksBuilder.InMemory (
mkMachine,
Command (..),
BuilderMachine (..),
B.Command (..),
BuilderMachine,
ChunkItem (..),
B.interpretCommand,
) where

import Cardano.SCLS.Internal.Hash
import Cardano.SCLS.Internal.Record.Chunk
import Cardano.SCLS.Internal.Serializer.MemPack
import Control.Monad.Primitive
import Crypto.Hash (Blake2b_224 (Blake2b_224))
import Crypto.Hash.MerkleTree.Incremental qualified as MT
import Data.MemPack
import Cardano.SCLS.Internal.Serializer.Builder.InMemory qualified as B

import Cardano.SCLS.Internal.Serializer.Builder.InMemory (BuilderItem (Parameters))
import Data.Primitive.ByteArray
import Data.Typeable
import Foreign.Ptr
import Unsafe.Coerce (unsafeCoerce)

data ChunkItem = ChunkItem
{ chunkItemFormat :: !ChunkFormat
, chunkItemData :: !ByteArray
, chunkItemEntriesCount :: !Int
{ chunkItemFormat :: ChunkFormat
, chunkItemData :: ByteArray
, chunkItemEntriesCount :: Int
}

-- | Command for the state machine
data Command type_ where
-- | Append a new item to the buffer.
Append :: (MemPack u, Typeable u) => u -> Command (BuilderMachine, [ChunkItem])
{- | Finalize building of the buffer. Calling this command does not
instance B.BuilderItem ChunkItem where
type Parameters ChunkItem = ChunkFormat
bItemData = chunkItemData
bItemEntriesCount = chunkItemEntriesCount
bMkItem chunkItemFormat data_ count = ChunkItem{chunkItemData = data_, chunkItemEntriesCount = count, chunkItemFormat}
bEncodeEntry ChunkFormatRaw entry = entry
bEncodeEntry ChunkFormatZstd _entry = error "Chunk format zstd is not implemented yet"
bEncodeEntry ChunkFormatZstdE _entry = error "Chunk format zstd-e is not implemented yet"

It's up to the implementation if the state machine can be used
after interpreting this command.
-}
Finalize :: Command (Digest, Maybe ChunkItem)
type BuilderMachine = B.BuilderMachine ChunkItem

{- | State machine for building chunks in memory.

Basically it's an interpreter for the 'Command' type that is implemented
the way that it can be inserted into the different streaming pipelines
or effect libraries
-}
newtype BuilderMachine = BuilderMachine
{ interpretCommand :: forall result. Command result -> IO result
}

-- | Create an instance of the state machine.
mkMachine ::
-- | Buffer size in bytes
Int ->
-- | Encoding format in chunks
ChunkFormat ->
IO BuilderMachine
mkMachine _ ChunkFormatZstd = error "Chunk format zstd is not implemented yet"
mkMachine _ ChunkFormatZstdE = error "Chunk format zstd-e is not implemented yet"
mkMachine bufferSize format@ChunkFormatRaw = do
-- We perform copying when we emit data outside of the state machine.
-- So this buffer is reused for all the chunks, as a result we copy
--
-- We allocate pinned memory because we pass it to the digest code
-- without copying by passing raw pointer.
storage <- newPinnedByteArray bufferSize

-- Use fix? We love fixed point combinators do we not?
let machine (!entriesCount :: Int) (!offset :: Int) !merkleTreeState =
BuilderMachine
{ interpretCommand = \case
Finalize -> do
let final = Digest $ MT.merkleRootHash $ MT.finalize merkleTreeState
if offset == 0 -- no new data, nothing to emit
then
pure (final, Nothing)
else do
frozenData <- freezeByteArrayPinned storage 0 offset
pure (final, Just ChunkItem{chunkItemEntriesCount = entriesCount, chunkItemFormat = format, chunkItemData = frozenData})
Append input -> do
let entry = Entry input
let l = packedByteCount entry
if offset + l <= bufferSize -- if we fit the current buffer we just need to write data and continue
then do
(merkleTreeState', newOffset) <-
unsafeAppendEntryToBuffer merkleTreeState storage offset entry
pure (machine (entriesCount + 1) newOffset merkleTreeState', [])
else do
-- We have no space in the current buffer, so we need to emit it first
frozenBuffer <- freezeByteArrayPinned storage 0 offset
if l > bufferSize
then do
let !tmpBuffer = pack entry
!merkleTreeState' = MT.add merkleTreeState (uncheckedByteArrayEntryContents tmpBuffer)
return
( machine 0 0 merkleTreeState'
, mkChunksToEmit [(format, frozenBuffer, entriesCount), (format, tmpBuffer, 1)]
)
else do
(merkleTreeState', newOffset) <-
unsafeAppendEntryToBuffer merkleTreeState storage 0 entry
pure
( machine 1 newOffset merkleTreeState'
, mkChunksToEmit [(format, frozenBuffer, entriesCount)]
)
}
return $! machine 0 0 (MT.empty Blake2b_224)

{- | Freeze a bytearray to the pinned immutable bytearray by copying its contents.

It's safe to use the source bytearray after this operation.
-}
freezeByteArrayPinned :: (PrimMonad m) => MutableByteArray (PrimState m) -> Int -> Int -> m ByteArray
freezeByteArrayPinned !src !off !len = do
dst <- newPinnedByteArray len
copyMutableByteArray dst 0 src off len
unsafeFreezeByteArray dst

unsafeAppendEntryToBuffer :: (MemPack u, Typeable u) => MT.MerkleTreeState Blake2b_224 -> MutableByteArray (PrimState IO) -> Int -> Entry u -> IO (MT.MerkleTreeState Blake2b_224, Int)
unsafeAppendEntryToBuffer !merkleTreeState !storage !offset u = do
newOffset <- unsafeAppendToBuffer storage offset u
let l = newOffset - offset
merkleTreeState' <- withMutableByteArrayContents storage $ \ptr -> do
let csb = CStringLenBuffer (ptr `plusPtr` (offset + 4), l - 4)
return $! MT.add merkleTreeState csb
return (merkleTreeState', newOffset)

{- | Helper to get access to the entry contents.
This method should be used on the pinned 'ByteArray' only, but the function does
not enforce this.
-}
uncheckedByteArrayEntryContents :: ByteArray -> CStringLenBuffer
uncheckedByteArrayEntryContents !buffer = CStringLenBuffer (byteArrayContents buffer `plusPtr` 4, sizeofByteArray buffer - 4)

{- | Unsafe helper that we need because MemPack interface only allows ST, and
no other PrimMonad.

There is unsafe prefix, because this function uses 'unsafeCoerce' internally,
but it ensures everything to make it safe to use.

This functions prepends the packed values with its lengths.
-}
unsafeAppendToBuffer :: (MemPack u) => MutableByteArray (PrimState IO) -> Int -> u -> IO Int
unsafeAppendToBuffer !storage !offset u = stToPrim $ do
let uInST = unsafeCoerce storage
(_, offset') <-
runStateT (runPack (packM u) uInST) offset
pure offset'

{- | Helper to create the list of chunks to emit from the list of
(format, data, count) tuples.

This function filters out the chunks with 0 entries.
-}
mkChunksToEmit :: [(ChunkFormat, ByteArray, Int)] -> [ChunkItem]
mkChunksToEmit = mkChunksToEmit' []
where
mkChunksToEmit' acc [] = reverse acc
mkChunksToEmit' acc ((_, _, 0) : xs) = mkChunksToEmit' acc xs
mkChunksToEmit' acc ((format, u, count) : xs) =
mkChunksToEmit' (ChunkItem{chunkItemFormat = format, chunkItemData = u, chunkItemEntriesCount = count} : acc) xs
mkMachine :: Int -> B.Parameters ChunkItem -> IO BuilderMachine
mkMachine = B.mkMachine
Loading