Skip to content
Closed
1 change: 1 addition & 0 deletions scls-format/scls-format.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ library
Cardano.SCLS.Internal.Record.Hdr
Cardano.SCLS.Internal.Record.Internal.Class
Cardano.SCLS.Internal.Record.Manifest
Cardano.SCLS.Internal.Record.Metadata
Cardano.SCLS.Internal.Serializer.ChunksBuilder.InMemory
Cardano.SCLS.Internal.Serializer.External.Impl
Cardano.SCLS.Internal.Serializer.MemPack
Expand Down
20 changes: 20 additions & 0 deletions scls-format/src/Cardano/SCLS/Internal/Reader.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

module Cardano.SCLS.Internal.Reader (
withNamespacedData,
withRecordData,
extractRootHash,
extractNamespaceList,
extractNamespaceHash,
Expand All @@ -30,6 +31,7 @@ import Data.Typeable
import System.IO
import System.IO qualified as IO

import Cardano.SCLS.Internal.Record.Internal.Class (IsFrameRecord)
import Streaming qualified as S
import Streaming.Prelude qualified as S

Expand Down Expand Up @@ -57,6 +59,24 @@ withNamespacedData filePath namespace f =
drain rest
go next_record

{- | Stream all records for a particular record type in the file.
No optimized access, just a full scan of the file.
-}
withRecordData :: forall t b r. (IsFrameRecord t b) => FilePath -> (S.Stream (S.Of b) IO () -> IO r) -> IO r
withRecordData filePath f =
IO.withBinaryFile filePath ReadMode \handle -> f (stream handle)
where
stream handle = do
flip fix headerOffset \go record -> do
next <- S.liftIO do
fetchNextFrame handle record
for_ next \next_record -> do
dataRecord <- S.liftIO do
fetchOffsetFrame handle next_record
for_ (decodeFrame dataRecord) \metadataRecord -> do
S.yield (frameViewContent metadataRecord)
go next_record

{- | Extract the root hash from the file at the given offset.

This function does not provide additional checks.
Expand Down
100 changes: 100 additions & 0 deletions scls-format/src/Cardano/SCLS/Internal/Record/Metadata.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}

{- |
Record to hold metadata information.
-}
module Cardano.SCLS.Internal.Record.Metadata (Metadata, mkMetadata) where

import Cardano.SCLS.Internal.Hash (Digest, digest)
import Cardano.SCLS.Internal.Record.Internal.Class
import Data.Binary (Binary (get), Word32, put)
import Data.Binary.Get (getByteString, getWord32be, getWord64be, getWord8)
import Data.Binary.Put (putByteString, putWord32be, putWord64be)
import Data.ByteString qualified as BS
import Data.Word (Word64)

-- TODO: reintroduce MetadataEntry ?
-- data MetadataEntry = MetadataEntry
-- { subject :: URI
-- , value :: BS.ByteString -- CBOR
-- }
-- deriving (Show)

data MetadataFooter = MetadataFooter
{ totalEntries :: Word64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to add strictness here, just to be sure

, entriesHash :: Digest
}
deriving (Show, Eq)

data Metadata = Metadata
{ entries :: BS.ByteString -- TODO: reintroduce [MetadataEntry] ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd reintroduce 'MetadataEntry' here, because otherwise it will likely be not very readable by the other clients.

, footer :: MetadataFooter
}
deriving (Show, Eq)

instance IsFrameRecord 0x31 Metadata where
encodeRecordContents Metadata{..} = do
putWord32be (fromIntegral (BS.length entries) :: Word32)
putByteString entries
-- putWord64be 0 -- end of entries
putFooter footer
where
-- putEntry MetadataEntry{..} = do
-- let subjectBytes = serializeURIRef' subject
-- subjectLen = BS.length subjectBytes
-- valueLen = BS.length value

-- putWord64be (fromIntegral subjectLen)
-- putByteString subjectBytes
-- putWord64be (fromIntegral valueLen)
-- putByteString value

putFooter MetadataFooter{..} = do
putWord64be totalEntries
put entriesHash

decodeRecordContents = do
_ <- getWord8 -- type offset: maintain consistency with Chunk pattern
entriesSize <- getWord32be
entries <- getByteString (fromIntegral entriesSize)
-- _ <- getWord64be -- end of entries marker
footer <- getFooter
pure Metadata{..}
where
-- getEntry = do
-- len <- getWord64be
-- if len == 0
-- then return Nothing
-- else do
-- subjectLen <- getWord64be
-- subjectBytes <- getByteString (fromIntegral subjectLen)
-- let subject = case parseURI strictURIParserOptions subjectBytes of
-- Left err -> error $ "Failed to parse URI: " ++ show err
-- Right uri -> uri
-- valueLen <- getWord64be
-- value <- getByteString (fromIntegral valueLen)
-- pure (Just MetadataEntry{..})

getFooter = do
totalEntries <- getWord64be
entriesHash <- get
pure MetadataFooter{..}

mkMetadata :: BS.ByteString -> Word64 -> Metadata
mkMetadata entries totalEntries =
let
-- entries = [MetadataEntry subject value | (subject, value) <- metadataEntries]
-- totalEntries = fromIntegral $ length entries
entriesHash = digest entries
footer = MetadataFooter{..}
in
Metadata{..}
where

-- digestEntries entries =
-- digest $ mconcat $ map entryDigestInput entries
-- where
-- entryDigestInput MetadataEntry{..} =
-- BS.singleton 0x01 <> serializeURIRef' subject <> value
16 changes: 10 additions & 6 deletions scls-format/src/Cardano/SCLS/Internal/Serializer/External/Impl.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ViewPatterns #-}

module Cardano.SCLS.Internal.Serializer.External.Impl (
Expand Down Expand Up @@ -52,18 +53,21 @@ serialize ::
-- | Slot of the current transaction
SlotNo ->
-- | Input stream of entries to serialize, can be unsorted
S.Stream (S.Of (InputChunk a)) IO () ->
DumpConfig a ->
IO ()
serialize resultFilePath network slotNo inputStream = do
serialize resultFilePath network slotNo (DumpConfig{..}) = do
let !hdr = mkHdr network slotNo
withTempDirectory (takeDirectory resultFilePath) "tmp.XXXXXX" \tmpDir -> do
prepareExternalSortNamespaced tmpDir inputStream
prepareExternalSortNamespaced tmpDir configChunkStream
handles <- newIORef []
onException
do
withBinaryFile resultFilePath WriteMode \handle -> do
dumpToHandle handle hdr $
sourceNs handles tmpDir
DumpConfigSorted $
DumpConfig
(sourceNs handles tmpDir)
configMetadataStream
do traverse hClose =<< readIORef handles

{- | Accepts an unordered stream of entries, and prepares a structure of
Expand Down Expand Up @@ -195,8 +199,8 @@ merge2 f1 f2 = do
loop

-- | Create a stream from the list of namespaces.
sourceNs :: IORef [Handle] -> FilePath -> DataStream RawBytes
sourceNs handles baseDir = DataStream do
sourceNs :: IORef [Handle] -> FilePath -> Stream (Of (InputChunk RawBytes)) IO ()
sourceNs handles baseDir = do
ns <- liftIO $ listDirectory baseDir
S.each ns & S.map (\n -> (T.pack n :> kMergeNs handles (baseDir </> n)))

Expand Down
52 changes: 47 additions & 5 deletions scls-format/src/Cardano/SCLS/Internal/Serializer/Reference/Dump.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
module Cardano.SCLS.Internal.Serializer.Reference.Dump (
DataStream (..),
InputChunk,
DumpConfig (..),
DumpConfigSorted (..),
newDumpConfig,
withChunks,
withMetadata,
dumpToHandle,
constructChunks_,
) where
Expand All @@ -15,6 +20,7 @@ import Cardano.SCLS.Internal.Hash (Digest (..))
import Cardano.SCLS.Internal.Record.Chunk
import Cardano.SCLS.Internal.Record.Hdr
import Cardano.SCLS.Internal.Record.Manifest
import Cardano.SCLS.Internal.Record.Metadata
import Cardano.SCLS.Internal.Serializer.ChunksBuilder.InMemory
import Crypto.Hash.MerkleTree.Incremental qualified as MT

Expand Down Expand Up @@ -54,16 +60,47 @@ This type is used as input to chunked serialization routines, which expect the d
-}
newtype DataStream a = DataStream {runDataStream :: Stream (Of (InputChunk a)) IO ()}

-- | Configuration for dumping data to a handle.
data DumpConfig a = (MemPack a, Typeable a) => DumpConfig
{ configChunkStream :: Stream (Of (InputChunk a)) IO ()
, configMetadataStream :: Maybe (Stream (Of Metadata) IO ())
-- Future fields for more dump configurations can be added here
-- e.g. configIsToBuildIndex, configDeltaStream, etc.
}

newtype DumpConfigSorted a = DumpConfigSorted {getDumpConfigSorted :: DumpConfig a}

-- | Create a new empty dump configuration.
newDumpConfig :: forall a. (MemPack a, Typeable a) => DumpConfig a
newDumpConfig = DumpConfig{configChunkStream = mempty, configMetadataStream = Nothing}

-- | Add a chunked data stream to the dump configuration.
withChunks :: (MemPack a, Typeable a) => Stream (Of (InputChunk a)) IO () -> DumpConfig a -> DumpConfig a
withChunks stream DumpConfig{..} =
DumpConfig
{ configChunkStream = configChunkStream <> stream
, configMetadataStream = configMetadataStream
}

-- | Add a metadata stream to the dump configuration.
withMetadata :: Stream (Of Metadata) IO () -> DumpConfig a -> DumpConfig a
withMetadata stream (DumpConfig{..}) =
DumpConfig
{ configChunkStream = configChunkStream
, configMetadataStream = Just stream
}

-- Dumps data to the handle, while splitting it into chunks.
--
-- This is reference implementation and it does not yet care about
-- proper working with the hardware, i.e. flushing and calling fsync
-- at the right moments.
dumpToHandle :: (MemPack a, Typeable a) => Handle -> Hdr -> DataStream a -> IO ()
dumpToHandle handle hdr orderedStream = do
dumpToHandle :: (MemPack a, Typeable a) => Handle -> Hdr -> DumpConfigSorted a -> IO ()
dumpToHandle handle hdr config = do
let DumpConfig{..} = getDumpConfigSorted config
_ <- hWriteFrame handle hdr
manifestData :: ManifestInfo <-
runDataStream orderedStream -- output our sorted stream
manifestData <-
configChunkStream -- output our sorted stream
& S.mapM
( \(namespace :> inner) -> do
inner
Expand All @@ -87,7 +124,12 @@ dumpToHandle handle hdr orderedStream = do
}
in Map.insert namespace ni rest
mempty
do \x -> ManifestInfo x
ManifestInfo

case configMetadataStream of
Nothing -> pure ()
Just metadataStream -> S.mapM_ (hWriteFrame handle) metadataStream

manifest <- mkManifest manifestData
_ <- hWriteFrame handle manifest
pure ()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE RecordWildCards #-}

module Cardano.SCLS.Internal.Serializer.Reference.Impl (
serialize,
Expand Down Expand Up @@ -34,15 +35,17 @@ serialize ::
NetworkId ->
-- | Slot of the current transaction
SlotNo ->
-- | Input stream of entries to serialize, can be unsorted
(S.Stream (S.Of (InputChunk a)) IO ()) ->
DumpConfig a ->
IO ()
serialize resultFilePath network slotNo stream = do
serialize resultFilePath network slotNo (DumpConfig{..}) = do
withBinaryFile resultFilePath WriteMode \handle -> do
let hdr = mkHdr network slotNo
!orderedStream <- mkVectors stream
dumpToHandle handle hdr do
DataStream (S.each [n S.:> S.each v | (n, v) <- Map.toList orderedStream])
!orderedStream <- mkVectors configChunkStream
dumpToHandle handle hdr $
DumpConfigSorted $
DumpConfig
((S.each [n S.:> S.each v | (n, v) <- Map.toList orderedStream]))
configMetadataStream
where
mkVectors :: (Ord a) => S.Stream (S.Of (InputChunk a)) IO () -> IO (Map Text (V.Vector a))
mkVectors = do
Expand Down
19 changes: 11 additions & 8 deletions scls-format/test/MultiNamespace.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import Cardano.SCLS.Internal.Hash (Digest (..))
import Cardano.SCLS.Internal.Reader (extractNamespaceHash, extractNamespaceList, extractRootHash, withNamespacedData)
import Cardano.SCLS.Internal.Serializer.External.Impl qualified as External (serialize)
import Cardano.SCLS.Internal.Serializer.MemPack
import Cardano.SCLS.Internal.Serializer.Reference.Impl (InputChunk)
import Cardano.SCLS.Internal.Serializer.Reference.Dump (DumpConfig, newDumpConfig, withChunks)
import Cardano.SCLS.Internal.Serializer.Reference.Impl qualified as Reference (serialize)
import Cardano.Types.Network (NetworkId (..))
import Cardano.Types.SlotNo (SlotNo (..))
Expand Down Expand Up @@ -68,7 +68,7 @@ mkTestsFor serialize = do
let input = [("ns0", []), ("ns1", ["data"]), ("ns2", [])]
roundtrip serialize input

type SerializeF = FilePath -> NetworkId -> SlotNo -> S.Stream (S.Of (InputChunk RawBytes)) IO () -> IO ()
type SerializeF = FilePath -> NetworkId -> SlotNo -> DumpConfig RawBytes -> IO ()

roundtrip :: SerializeF -> [(Text, [ByteString])] -> IO ()
roundtrip serialize input = do
Expand All @@ -79,7 +79,7 @@ roundtrip serialize input = do
fileName
Mainnet
(SlotNo 1)
mkStream
mkConfig
nsps <- extractNamespaceList fileName
annotate "Namespaces are as expected" do
(sort nsps) `shouldBe` (Map.keys nsData)
Expand Down Expand Up @@ -111,9 +111,12 @@ roundtrip serialize input = do
annotate "File hash matches expected" do
fileDigest `shouldBe` expectedDigest
where
mkStream =
S.each
[ n S.:> (S.each q & S.map RawBytes)
| (n, q) <- input
]
mkConfig =
newDumpConfig
& ( withChunks $
S.each
[ n S.:> (S.each q & S.map RawBytes)
| (n, q) <- input
]
)
nsData = Map.fromListWith (<>) input
Loading