diff --git a/scls-format/scls-format.cabal b/scls-format/scls-format.cabal index a25ee4d..3d4bec8 100644 --- a/scls-format/scls-format.cabal +++ b/scls-format/scls-format.cabal @@ -28,6 +28,7 @@ library Cardano.SCLS.Internal.Record.Hdr Cardano.SCLS.Internal.Record.Internal.Class Cardano.SCLS.Internal.Record.Manifest + Cardano.SCLS.Internal.Record.Metadata Cardano.SCLS.Internal.Serializer.ChunksBuilder.InMemory Cardano.SCLS.Internal.Serializer.External.Impl Cardano.SCLS.Internal.Serializer.MemPack diff --git a/scls-format/src/Cardano/SCLS/Internal/Reader.hs b/scls-format/src/Cardano/SCLS/Internal/Reader.hs index df20541..0607a21 100644 --- a/scls-format/src/Cardano/SCLS/Internal/Reader.hs +++ b/scls-format/src/Cardano/SCLS/Internal/Reader.hs @@ -5,6 +5,7 @@ module Cardano.SCLS.Internal.Reader ( withNamespacedData, + withRecordData, extractRootHash, extractNamespaceList, extractNamespaceHash, @@ -30,6 +31,7 @@ import Data.Typeable import System.IO import System.IO qualified as IO +import Cardano.SCLS.Internal.Record.Internal.Class (IsFrameRecord) import Streaming qualified as S import Streaming.Prelude qualified as S @@ -57,6 +59,24 @@ withNamespacedData filePath namespace f = drain rest go next_record +{- | Stream all records for a particular record type in the file. +No optimized access, just a full scan of the file. +-} +withRecordData :: forall t b r. (IsFrameRecord t b) => FilePath -> (S.Stream (S.Of b) IO () -> IO r) -> IO r +withRecordData filePath f = + IO.withBinaryFile filePath ReadMode \handle -> f (stream handle) + where + stream handle = do + flip fix headerOffset \go record -> do + next <- S.liftIO do + fetchNextFrame handle record + for_ next \next_record -> do + dataRecord <- S.liftIO do + fetchOffsetFrame handle next_record + for_ (decodeFrame dataRecord) \metadataRecord -> do + S.yield (frameViewContent metadataRecord) + go next_record + {- | Extract the root hash from the file at the given offset. This function does not provide additional checks. diff --git a/scls-format/src/Cardano/SCLS/Internal/Record/Metadata.hs b/scls-format/src/Cardano/SCLS/Internal/Record/Metadata.hs new file mode 100644 index 0000000..d97004d --- /dev/null +++ b/scls-format/src/Cardano/SCLS/Internal/Record/Metadata.hs @@ -0,0 +1,100 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE RecordWildCards #-} + +{- | +Record to hold metadata information. +-} +module Cardano.SCLS.Internal.Record.Metadata (Metadata, mkMetadata) where + +import Cardano.SCLS.Internal.Hash (Digest, digest) +import Cardano.SCLS.Internal.Record.Internal.Class +import Data.Binary (Binary (get), Word32, put) +import Data.Binary.Get (getByteString, getWord32be, getWord64be, getWord8) +import Data.Binary.Put (putByteString, putWord32be, putWord64be) +import Data.ByteString qualified as BS +import Data.Word (Word64) + +-- TODO: reintroduce MetadataEntry ? +-- data MetadataEntry = MetadataEntry +-- { subject :: URI +-- , value :: BS.ByteString -- CBOR +-- } +-- deriving (Show) + +data MetadataFooter = MetadataFooter + { totalEntries :: Word64 + , entriesHash :: Digest + } + deriving (Show, Eq) + +data Metadata = Metadata + { entries :: BS.ByteString -- TODO: reintroduce [MetadataEntry] ? + , footer :: MetadataFooter + } + deriving (Show, Eq) + +instance IsFrameRecord 0x31 Metadata where + encodeRecordContents Metadata{..} = do + putWord32be (fromIntegral (BS.length entries) :: Word32) + putByteString entries + -- putWord64be 0 -- end of entries + putFooter footer + where + -- putEntry MetadataEntry{..} = do + -- let subjectBytes = serializeURIRef' subject + -- subjectLen = BS.length subjectBytes + -- valueLen = BS.length value + + -- putWord64be (fromIntegral subjectLen) + -- putByteString subjectBytes + -- putWord64be (fromIntegral valueLen) + -- putByteString value + + putFooter MetadataFooter{..} = do + putWord64be totalEntries + put entriesHash + + decodeRecordContents = do + _ <- getWord8 -- type offset: maintain consistency with Chunk pattern + entriesSize <- getWord32be + entries <- getByteString (fromIntegral entriesSize) + -- _ <- getWord64be -- end of entries marker + footer <- getFooter + pure Metadata{..} + where + -- getEntry = do + -- len <- getWord64be + -- if len == 0 + -- then return Nothing + -- else do + -- subjectLen <- getWord64be + -- subjectBytes <- getByteString (fromIntegral subjectLen) + -- let subject = case parseURI strictURIParserOptions subjectBytes of + -- Left err -> error $ "Failed to parse URI: " ++ show err + -- Right uri -> uri + -- valueLen <- getWord64be + -- value <- getByteString (fromIntegral valueLen) + -- pure (Just MetadataEntry{..}) + + getFooter = do + totalEntries <- getWord64be + entriesHash <- get + pure MetadataFooter{..} + +mkMetadata :: BS.ByteString -> Word64 -> Metadata +mkMetadata entries totalEntries = + let + -- entries = [MetadataEntry subject value | (subject, value) <- metadataEntries] + -- totalEntries = fromIntegral $ length entries + entriesHash = digest entries + footer = MetadataFooter{..} + in + Metadata{..} + where + +-- digestEntries entries = +-- digest $ mconcat $ map entryDigestInput entries +-- where +-- entryDigestInput MetadataEntry{..} = +-- BS.singleton 0x01 <> serializeURIRef' subject <> value diff --git a/scls-format/src/Cardano/SCLS/Internal/Serializer/External/Impl.hs b/scls-format/src/Cardano/SCLS/Internal/Serializer/External/Impl.hs index 5c2a72a..53a9aa8 100644 --- a/scls-format/src/Cardano/SCLS/Internal/Serializer/External/Impl.hs +++ b/scls-format/src/Cardano/SCLS/Internal/Serializer/External/Impl.hs @@ -1,5 +1,6 @@ {-# LANGUAGE BlockArguments #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ViewPatterns #-} module Cardano.SCLS.Internal.Serializer.External.Impl ( @@ -52,18 +53,21 @@ serialize :: -- | Slot of the current transaction SlotNo -> -- | Input stream of entries to serialize, can be unsorted - S.Stream (S.Of (InputChunk a)) IO () -> + DumpConfig a -> IO () -serialize resultFilePath network slotNo inputStream = do +serialize resultFilePath network slotNo (DumpConfig{..}) = do let !hdr = mkHdr network slotNo withTempDirectory (takeDirectory resultFilePath) "tmp.XXXXXX" \tmpDir -> do - prepareExternalSortNamespaced tmpDir inputStream + prepareExternalSortNamespaced tmpDir configChunkStream handles <- newIORef [] onException do withBinaryFile resultFilePath WriteMode \handle -> do dumpToHandle handle hdr $ - sourceNs handles tmpDir + DumpConfigSorted $ + DumpConfig + (sourceNs handles tmpDir) + configMetadataStream do traverse hClose =<< readIORef handles {- | Accepts an unordered stream of entries, and prepares a structure of @@ -195,8 +199,8 @@ merge2 f1 f2 = do loop -- | Create a stream from the list of namespaces. -sourceNs :: IORef [Handle] -> FilePath -> DataStream RawBytes -sourceNs handles baseDir = DataStream do +sourceNs :: IORef [Handle] -> FilePath -> Stream (Of (InputChunk RawBytes)) IO () +sourceNs handles baseDir = do ns <- liftIO $ listDirectory baseDir S.each ns & S.map (\n -> (T.pack n :> kMergeNs handles (baseDir n))) diff --git a/scls-format/src/Cardano/SCLS/Internal/Serializer/Reference/Dump.hs b/scls-format/src/Cardano/SCLS/Internal/Serializer/Reference/Dump.hs index 2f2e846..140cb80 100644 --- a/scls-format/src/Cardano/SCLS/Internal/Serializer/Reference/Dump.hs +++ b/scls-format/src/Cardano/SCLS/Internal/Serializer/Reference/Dump.hs @@ -6,6 +6,11 @@ module Cardano.SCLS.Internal.Serializer.Reference.Dump ( DataStream (..), InputChunk, + DumpConfig (..), + DumpConfigSorted (..), + newDumpConfig, + withChunks, + withMetadata, dumpToHandle, constructChunks_, ) where @@ -15,6 +20,7 @@ import Cardano.SCLS.Internal.Hash (Digest (..)) import Cardano.SCLS.Internal.Record.Chunk import Cardano.SCLS.Internal.Record.Hdr import Cardano.SCLS.Internal.Record.Manifest +import Cardano.SCLS.Internal.Record.Metadata import Cardano.SCLS.Internal.Serializer.ChunksBuilder.InMemory import Crypto.Hash.MerkleTree.Incremental qualified as MT @@ -54,16 +60,47 @@ This type is used as input to chunked serialization routines, which expect the d -} newtype DataStream a = DataStream {runDataStream :: Stream (Of (InputChunk a)) IO ()} +-- | Configuration for dumping data to a handle. +data DumpConfig a = (MemPack a, Typeable a) => DumpConfig + { configChunkStream :: Stream (Of (InputChunk a)) IO () + , configMetadataStream :: Maybe (Stream (Of Metadata) IO ()) + -- Future fields for more dump configurations can be added here + -- e.g. configIsToBuildIndex, configDeltaStream, etc. + } + +newtype DumpConfigSorted a = DumpConfigSorted {getDumpConfigSorted :: DumpConfig a} + +-- | Create a new empty dump configuration. +newDumpConfig :: forall a. (MemPack a, Typeable a) => DumpConfig a +newDumpConfig = DumpConfig{configChunkStream = mempty, configMetadataStream = Nothing} + +-- | Add a chunked data stream to the dump configuration. +withChunks :: (MemPack a, Typeable a) => Stream (Of (InputChunk a)) IO () -> DumpConfig a -> DumpConfig a +withChunks stream DumpConfig{..} = + DumpConfig + { configChunkStream = configChunkStream <> stream + , configMetadataStream = configMetadataStream + } + +-- | Add a metadata stream to the dump configuration. +withMetadata :: Stream (Of Metadata) IO () -> DumpConfig a -> DumpConfig a +withMetadata stream (DumpConfig{..}) = + DumpConfig + { configChunkStream = configChunkStream + , configMetadataStream = Just stream + } + -- Dumps data to the handle, while splitting it into chunks. -- -- This is reference implementation and it does not yet care about -- proper working with the hardware, i.e. flushing and calling fsync -- at the right moments. -dumpToHandle :: (MemPack a, Typeable a) => Handle -> Hdr -> DataStream a -> IO () -dumpToHandle handle hdr orderedStream = do +dumpToHandle :: (MemPack a, Typeable a) => Handle -> Hdr -> DumpConfigSorted a -> IO () +dumpToHandle handle hdr config = do + let DumpConfig{..} = getDumpConfigSorted config _ <- hWriteFrame handle hdr - manifestData :: ManifestInfo <- - runDataStream orderedStream -- output our sorted stream + manifestData <- + configChunkStream -- output our sorted stream & S.mapM ( \(namespace :> inner) -> do inner @@ -87,7 +124,12 @@ dumpToHandle handle hdr orderedStream = do } in Map.insert namespace ni rest mempty - do \x -> ManifestInfo x + ManifestInfo + + case configMetadataStream of + Nothing -> pure () + Just metadataStream -> S.mapM_ (hWriteFrame handle) metadataStream + manifest <- mkManifest manifestData _ <- hWriteFrame handle manifest pure () diff --git a/scls-format/src/Cardano/SCLS/Internal/Serializer/Reference/Impl.hs b/scls-format/src/Cardano/SCLS/Internal/Serializer/Reference/Impl.hs index 8d5af00..cfcb8f2 100644 --- a/scls-format/src/Cardano/SCLS/Internal/Serializer/Reference/Impl.hs +++ b/scls-format/src/Cardano/SCLS/Internal/Serializer/Reference/Impl.hs @@ -1,4 +1,5 @@ {-# LANGUAGE BlockArguments #-} +{-# LANGUAGE RecordWildCards #-} module Cardano.SCLS.Internal.Serializer.Reference.Impl ( serialize, @@ -34,15 +35,17 @@ serialize :: NetworkId -> -- | Slot of the current transaction SlotNo -> - -- | Input stream of entries to serialize, can be unsorted - (S.Stream (S.Of (InputChunk a)) IO ()) -> + DumpConfig a -> IO () -serialize resultFilePath network slotNo stream = do +serialize resultFilePath network slotNo (DumpConfig{..}) = do withBinaryFile resultFilePath WriteMode \handle -> do let hdr = mkHdr network slotNo - !orderedStream <- mkVectors stream - dumpToHandle handle hdr do - DataStream (S.each [n S.:> S.each v | (n, v) <- Map.toList orderedStream]) + !orderedStream <- mkVectors configChunkStream + dumpToHandle handle hdr $ + DumpConfigSorted $ + DumpConfig + ((S.each [n S.:> S.each v | (n, v) <- Map.toList orderedStream])) + configMetadataStream where mkVectors :: (Ord a) => S.Stream (S.Of (InputChunk a)) IO () -> IO (Map Text (V.Vector a)) mkVectors = do diff --git a/scls-format/test/MultiNamespace.hs b/scls-format/test/MultiNamespace.hs index e6e15cc..a6124a1 100644 --- a/scls-format/test/MultiNamespace.hs +++ b/scls-format/test/MultiNamespace.hs @@ -9,7 +9,7 @@ import Cardano.SCLS.Internal.Hash (Digest (..)) import Cardano.SCLS.Internal.Reader (extractNamespaceHash, extractNamespaceList, extractRootHash, withNamespacedData) import Cardano.SCLS.Internal.Serializer.External.Impl qualified as External (serialize) import Cardano.SCLS.Internal.Serializer.MemPack -import Cardano.SCLS.Internal.Serializer.Reference.Impl (InputChunk) +import Cardano.SCLS.Internal.Serializer.Reference.Dump (DumpConfig, newDumpConfig, withChunks) import Cardano.SCLS.Internal.Serializer.Reference.Impl qualified as Reference (serialize) import Cardano.Types.Network (NetworkId (..)) import Cardano.Types.SlotNo (SlotNo (..)) @@ -68,7 +68,7 @@ mkTestsFor serialize = do let input = [("ns0", []), ("ns1", ["data"]), ("ns2", [])] roundtrip serialize input -type SerializeF = FilePath -> NetworkId -> SlotNo -> S.Stream (S.Of (InputChunk RawBytes)) IO () -> IO () +type SerializeF = FilePath -> NetworkId -> SlotNo -> DumpConfig RawBytes -> IO () roundtrip :: SerializeF -> [(Text, [ByteString])] -> IO () roundtrip serialize input = do @@ -79,7 +79,7 @@ roundtrip serialize input = do fileName Mainnet (SlotNo 1) - mkStream + mkConfig nsps <- extractNamespaceList fileName annotate "Namespaces are as expected" do (sort nsps) `shouldBe` (Map.keys nsData) @@ -111,9 +111,12 @@ roundtrip serialize input = do annotate "File hash matches expected" do fileDigest `shouldBe` expectedDigest where - mkStream = - S.each - [ n S.:> (S.each q & S.map RawBytes) - | (n, q) <- input - ] + mkConfig = + newDumpConfig + & ( withChunks $ + S.each + [ n S.:> (S.each q & S.map RawBytes) + | (n, q) <- input + ] + ) nsData = Map.fromListWith (<>) input diff --git a/scls-format/test/Roundtrip.hs b/scls-format/test/Roundtrip.hs index fdb8a0a..ee0d868 100644 --- a/scls-format/test/Roundtrip.hs +++ b/scls-format/test/Roundtrip.hs @@ -6,10 +6,11 @@ module Roundtrip ( import Cardano.SCLS.CDDL (namespaces) import Cardano.SCLS.Internal.Hash (Digest (..)) -import Cardano.SCLS.Internal.Reader (extractRootHash, withNamespacedData) +import Cardano.SCLS.Internal.Reader (extractRootHash, withNamespacedData, withRecordData) +import Cardano.SCLS.Internal.Record.Metadata (mkMetadata) import Cardano.SCLS.Internal.Serializer.External.Impl qualified as External (serialize) import Cardano.SCLS.Internal.Serializer.MemPack -import Cardano.SCLS.Internal.Serializer.Reference.Impl (InputChunk) +import Cardano.SCLS.Internal.Serializer.Reference.Dump (DumpConfig, newDumpConfig, withChunks, withMetadata) import Cardano.SCLS.Internal.Serializer.Reference.Impl qualified as Reference (serialize) import Cardano.Types.Network (NetworkId (..)) import Cardano.Types.SlotNo (SlotNo (..)) @@ -59,13 +60,20 @@ mkRoundtripTestsFor groupName serialize = replicateM 1024 $ applyAtomicGen (generateCBORTerm' mt (Name (T.pack "record_entry") mempty)) globalStdGen let encoded_data = [toStrictByteString (encodeTerm term) | term <- data_] + sorted_encoded_data = sort encoded_data let fileName = (fn "data.scls") _ <- serialize fileName Mainnet (SlotNo 1) - (S.each [(namespace S.:> (S.each encoded_data & S.map RawBytes))]) + $ ( newDumpConfig + & withChunks + (S.each [(namespace S.:> (S.each encoded_data & S.map RawBytes))]) + -- TODO: metadata entry supposedly is { subject: URI, entries: CBOR-encoded bytes} + -- reuse encoded_data for metadata for now + & withMetadata (S.each encoded_data & S.map (\bytes -> mkMetadata bytes 1024)) + ) withNamespacedData fileName namespace @@ -74,16 +82,26 @@ mkRoundtripTestsFor groupName serialize = annotate "Stream roundtrip successful" $ [b | RawBytes b <- decoded_data] - `shouldBe` (sort encoded_data) + `shouldBe` sorted_encoded_data ) -- Check roundtrip of root hash file_digest <- extractRootHash fileName expected_digest <- - S.each (sort encoded_data) + S.each sorted_encoded_data & S.fold_ MT.add (MT.empty undefined) (Digest . MT.merkleRootHash . MT.finalize) annotate "Root hash roundtrip successful" $ file_digest `shouldBe` (Digest $ MT.merkleRootHash $ MT.finalize $ MT.add (MT.empty undefined) expected_digest) -type SerializeF = FilePath -> NetworkId -> SlotNo -> S.Stream (S.Of (InputChunk RawBytes)) IO () -> IO () + withRecordData + fileName + ( \stream -> do + decoded_metadata <- S.toList_ stream + annotate + "Metadata stream roundtrip successful" + $ (decoded_metadata) + `shouldBe` ([mkMetadata bytes 1024 | bytes <- encoded_data]) -- TODO: should be sorted + ) + +type SerializeF = FilePath -> NetworkId -> SlotNo -> DumpConfig RawBytes -> IO ()