diff --git a/CMakeLists.txt b/CMakeLists.txt index 1fc13f57..b36e5d96 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -36,6 +36,7 @@ find_package(PkgConfig REQUIRED) find_package(Boost CONFIG REQUIRED COMPONENTS algorithm filesystem outcome program_options property_tree random) find_package(Boost.DI CONFIG REQUIRED) +find_package(Crc32c CONFIG REQUIRED) find_package(fmt CONFIG REQUIRED) find_package(hashtree CONFIG REQUIRED) find_package(libp2p CONFIG REQUIRED) diff --git a/src/modules/networking/CMakeLists.txt b/src/modules/networking/CMakeLists.txt index 33897306..3dfad8d4 100644 --- a/src/modules/networking/CMakeLists.txt +++ b/src/modules/networking/CMakeLists.txt @@ -20,7 +20,7 @@ add_lean_module(networking bootnodes p2p::libp2p qtils::qtils - Snappy::snappy + snappy soralog::soralog sszpp validator_registry diff --git a/src/modules/networking/block_request_protocol.cpp b/src/modules/networking/block_request_protocol.cpp index 116217fd..477f46ea 100644 --- a/src/modules/networking/block_request_protocol.cpp +++ b/src/modules/networking/block_request_protocol.cpp @@ -12,6 +12,7 @@ #include #include "blockchain/block_tree.hpp" +#include "modules/networking/response_status.hpp" #include "modules/networking/ssz_snappy.hpp" namespace lean::modules { @@ -24,7 +25,7 @@ namespace lean::modules { block_tree_{std::move(block_tree)} {} libp2p::StreamProtocols BlockRequestProtocol::getProtocolIds() const { - return {"/leanconsensus/req/blocks_by_root/1/ssz_snappy"}; + return {"/leanconsensus/req/lean_blocks_by_root/1/ssz_snappy"}; } void BlockRequestProtocol::handle(std::shared_ptr stream) { @@ -43,12 +44,13 @@ namespace lean::modules { libp2p::PeerId peer_id, BlockRequest request) { BOOST_OUTCOME_CO_TRY(auto stream, co_await host_->newStream(peer_id, getProtocolIds())); - BOOST_OUTCOME_CO_TRY( - co_await libp2p::writeVarintMessage(stream, encodeSszSnappy(request))); + BOOST_OUTCOME_CO_TRY(co_await libp2p::writeVarintMessage( + stream, encodeSszSnappyFramed(request))); + BOOST_OUTCOME_CO_TRY(co_await readResponseStatus(stream)); qtils::ByteVec encoded; BOOST_OUTCOME_CO_TRY(co_await libp2p::readVarintMessage(stream, encoded)); BOOST_OUTCOME_CO_TRY(auto response, - decodeSszSnappy(encoded)); + decodeSszSnappyFramed(encoded)); co_return response; } @@ -56,7 +58,8 @@ namespace lean::modules { std::shared_ptr stream) { qtils::ByteVec encoded; BOOST_OUTCOME_CO_TRY(co_await libp2p::readVarintMessage(stream, encoded)); - BOOST_OUTCOME_CO_TRY(auto request, decodeSszSnappy(encoded)); + BOOST_OUTCOME_CO_TRY(auto request, + decodeSszSnappyFramed(encoded)); BlockResponse response; for (auto &block_hash : request.blocks) { BOOST_OUTCOME_CO_TRY(auto block, @@ -65,8 +68,9 @@ namespace lean::modules { response.blocks.push_back(std::move(block.value())); } } - BOOST_OUTCOME_CO_TRY( - co_await libp2p::writeVarintMessage(stream, encodeSszSnappy(response))); + BOOST_OUTCOME_CO_TRY(co_await writeResponseStatus(stream)); + BOOST_OUTCOME_CO_TRY(co_await libp2p::writeVarintMessage( + stream, encodeSszSnappyFramed(response))); co_return outcome::success(); } } // namespace lean::modules diff --git a/src/modules/networking/networking.cpp b/src/modules/networking/networking.cpp index 68816a10..5289fb22 100644 --- a/src/modules/networking/networking.cpp +++ b/src/modules/networking/networking.cpp @@ -58,7 +58,7 @@ namespace lean::modules { hasher.write(size).value(); hasher.write(message.topic).value(); }; - if (auto uncompressed_res = snappyUncompress(message.data)) { + if (auto uncompressed_res = snappy::uncompress(message.data)) { auto &uncompressed = uncompressed_res.value(); hash_topic(); hasher.write(MESSAGE_DOMAIN_VALID_SNAPPY).value(); diff --git a/src/modules/networking/response_status.hpp b/src/modules/networking/response_status.hpp new file mode 100644 index 00000000..4d471ab3 --- /dev/null +++ b/src/modules/networking/response_status.hpp @@ -0,0 +1,27 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include + +namespace lean { + inline libp2p::CoroOutcome writeResponseStatus( + std::shared_ptr writer) { + qtils::ByteArr<1> status{0}; + BOOST_OUTCOME_CO_TRY(co_await libp2p::write(writer, status)); + co_return outcome::success(); + } + + inline libp2p::CoroOutcome readResponseStatus( + std::shared_ptr reader) { + qtils::ByteArr<1> status; + BOOST_OUTCOME_CO_TRY(co_await libp2p::read(reader, status)); + co_return outcome::success(); + } +} // namespace lean diff --git a/src/modules/networking/ssz_snappy.hpp b/src/modules/networking/ssz_snappy.hpp index 910d8da9..13fffd90 100644 --- a/src/modules/networking/ssz_snappy.hpp +++ b/src/modules/networking/ssz_snappy.hpp @@ -11,12 +11,22 @@ namespace lean { auto encodeSszSnappy(const auto &t) { - return snappyCompress(encode(t).value()); + return snappy::compress(encode(t).value()); } template outcome::result decodeSszSnappy(qtils::BytesIn compressed) { - BOOST_OUTCOME_TRY(auto uncompressed, snappyUncompress(compressed)); + BOOST_OUTCOME_TRY(auto uncompressed, snappy::uncompress(compressed)); + return decode(uncompressed); + } + + auto encodeSszSnappyFramed(const auto &t) { + return snappy::compressFramed(encode(t).value()); + } + + template + outcome::result decodeSszSnappyFramed(qtils::BytesIn compressed) { + BOOST_OUTCOME_TRY(auto uncompressed, snappy::uncompressFramed(compressed)); return decode(uncompressed); } } // namespace lean diff --git a/src/modules/networking/status_protocol.cpp b/src/modules/networking/status_protocol.cpp index e07466e9..caf1ed0f 100644 --- a/src/modules/networking/status_protocol.cpp +++ b/src/modules/networking/status_protocol.cpp @@ -11,6 +11,7 @@ #include #include +#include "modules/networking/response_status.hpp" #include "modules/networking/ssz_snappy.hpp" namespace lean::modules { @@ -44,22 +45,38 @@ namespace lean::modules { std::shared_ptr connection) { BOOST_OUTCOME_CO_TRY( auto stream, co_await host_->newStream(connection, getProtocolIds())); - BOOST_OUTCOME_CO_TRY(co_await coroHandle(stream)); + BOOST_OUTCOME_CO_TRY(co_await write(stream)); + BOOST_OUTCOME_CO_TRY(co_await readResponseStatus(stream)); + BOOST_OUTCOME_CO_TRY(co_await read(stream)); co_return outcome::success(); } - libp2p::CoroOutcome StatusProtocol::coroHandle( + libp2p::CoroOutcome StatusProtocol::read( std::shared_ptr stream) { auto peer_id = stream->remotePeerId(); - BOOST_OUTCOME_CO_TRY(co_await libp2p::writeVarintMessage( - stream, encodeSszSnappy(get_status_()))); qtils::ByteVec encoded; BOOST_OUTCOME_CO_TRY(co_await libp2p::readVarintMessage(stream, encoded)); - BOOST_OUTCOME_CO_TRY(auto status, decodeSszSnappy(encoded)); + BOOST_OUTCOME_CO_TRY(auto status, + decodeSszSnappyFramed(encoded)); on_status_(messages::StatusMessageReceived{ .from_peer = peer_id, .notification = status, }); co_return outcome::success(); } + + libp2p::CoroOutcome StatusProtocol::write( + std::shared_ptr stream) { + BOOST_OUTCOME_CO_TRY(co_await libp2p::writeVarintMessage( + stream, encodeSszSnappyFramed(get_status_()))); + co_return outcome::success(); + } + + libp2p::CoroOutcome StatusProtocol::coroHandle( + std::shared_ptr stream) { + BOOST_OUTCOME_CO_TRY(co_await read(stream)); + BOOST_OUTCOME_CO_TRY(co_await writeResponseStatus(stream)); + BOOST_OUTCOME_CO_TRY(co_await write(stream)); + co_return outcome::success(); + } } // namespace lean::modules diff --git a/src/modules/networking/status_protocol.hpp b/src/modules/networking/status_protocol.hpp index d433eef3..374d3b4b 100644 --- a/src/modules/networking/status_protocol.hpp +++ b/src/modules/networking/status_protocol.hpp @@ -42,6 +42,8 @@ namespace lean::modules { std::shared_ptr connection); private: + libp2p::CoroOutcome read(std::shared_ptr stream); + libp2p::CoroOutcome write(std::shared_ptr stream); libp2p::CoroOutcome coroHandle( std::shared_ptr stream); diff --git a/src/serde/CMakeLists.txt b/src/serde/CMakeLists.txt index 8f54ae70..c198cf6c 100644 --- a/src/serde/CMakeLists.txt +++ b/src/serde/CMakeLists.txt @@ -12,3 +12,10 @@ target_link_libraries(enr p2p::libp2p p2p::p2p_key_validator ) + +add_library(snappy INTERFACE) +target_link_libraries(snappy INTERFACE + Crc32c::crc32c + qtils::qtils + Snappy::snappy +) diff --git a/src/serde/snappy.hpp b/src/serde/snappy.hpp index e02526cd..0f21297b 100644 --- a/src/serde/snappy.hpp +++ b/src/serde/snappy.hpp @@ -8,13 +8,20 @@ #include +#include +#include +#include #include #include -namespace lean { +namespace lean::snappy { enum class SnappyError { UNCOMPRESS_TOO_LONG, UNCOMPRESS_INVALID, + UNCOMPRESS_TRUNCATED, + UNCOMPRESS_UNKNOWN_IDENTIFIER, + UNCOMPRESS_UNKNOWN_TYPE, + UNCOMPRESS_CRC_MISMATCH, }; Q_ENUM_ERROR_CODE(SnappyError) { using E = decltype(e); @@ -23,20 +30,42 @@ namespace lean { return "SnappyError::UNCOMPRESS_TOO_LONG"; case E::UNCOMPRESS_INVALID: return "SnappyError::UNCOMPRESS_INVALID"; + case E::UNCOMPRESS_TRUNCATED: + return "SnappyError::UNCOMPRESS_TRUNCATED"; + case E::UNCOMPRESS_UNKNOWN_IDENTIFIER: + return "SnappyError::UNCOMPRESS_UNKNOWN_IDENTIFIER"; + case E::UNCOMPRESS_UNKNOWN_TYPE: + return "SnappyError::UNCOMPRESS_UNKNOWN_TYPE"; + case E::UNCOMPRESS_CRC_MISMATCH: + return "SnappyError::UNCOMPRESS_CRC_MISMATCH"; } abort(); } - inline qtils::ByteVec snappyCompress(qtils::BytesIn input) { + constexpr size_t kHeaderSize = 4; + constexpr auto kMaxBlockSize = size_t{1} << 16; + constexpr auto kDefaultMaxSize = size_t{4} << 20; + + constexpr qtils::ByteArr<6> kStreamIdentifier{'s', 'N', 'a', 'P', 'p', 'Y'}; + + enum ChunkType : uint8_t { + Stream = 0xFF, + Compressed = 0x00, + Uncompressed = 0x01, + Padding = 0xFE, + }; + + inline qtils::ByteVec compress(qtils::BytesIn input) { std::string compressed; - snappy::Compress(qtils::byte2str(input.data()), input.size(), &compressed); + ::snappy::Compress( + qtils::byte2str(input.data()), input.size(), &compressed); return qtils::ByteVec{qtils::str2byte(std::as_const(compressed))}; } - inline outcome::result snappyUncompress( - qtils::BytesIn compressed, size_t max_size = 4 << 20) { + inline outcome::result uncompress( + qtils::BytesIn compressed, size_t max_size = kDefaultMaxSize) { size_t size = 0; - if (not snappy::GetUncompressedLength( + if (not ::snappy::GetUncompressedLength( qtils::byte2str(compressed.data()), compressed.size(), &size)) { return SnappyError::UNCOMPRESS_INVALID; } @@ -44,11 +73,86 @@ namespace lean { return SnappyError::UNCOMPRESS_TOO_LONG; } std::string uncompressed; - if (not snappy::Uncompress(qtils::byte2str(compressed.data()), - compressed.size(), - &uncompressed)) { + if (not ::snappy::Uncompress(qtils::byte2str(compressed.data()), + compressed.size(), + &uncompressed)) { return SnappyError::UNCOMPRESS_INVALID; } return qtils::ByteVec{qtils::str2byte(std::as_const(uncompressed))}; } -} // namespace lean + + using Crc32 = qtils::ByteArr<4>; + inline Crc32 hashCrc32(qtils::BytesIn input) { + auto v = crc32c::Crc32c(input.data(), input.size()); + v = ((v >> 15) | (v << 17)) + 0xa282ead8; + Crc32 crc; + boost::endian::store_little_u32(crc.data(), v); + return crc; + } + + inline qtils::ByteVec compressFramed(qtils::BytesIn input) { + qtils::ByteVec framed; + auto write_header = [&](ChunkType type, size_t size) { + framed.putUint8(type); + qtils::ByteArr<3> size_bytes; + boost::endian::store_little_u24(size_bytes.data(), size); + framed.put(size_bytes); + }; + write_header(ChunkType::Stream, kStreamIdentifier.size()); + framed.put(kStreamIdentifier); + while (not input.empty()) { + auto chunk = input.first(std::min(input.size(), kMaxBlockSize)); + auto crc = hashCrc32(chunk); + input = input.subspan(chunk.size()); + auto compressed = compress(chunk); + write_header(ChunkType::Compressed, Crc32::size() + compressed.size()); + framed.put(crc); + framed.put(compressed); + } + return framed; + } + + inline outcome::result uncompressFramed( + qtils::BytesIn compressed, size_t max_size = kDefaultMaxSize) { + qtils::ByteVec result; + while (not compressed.empty()) { + if (compressed.size() < kHeaderSize) { + return SnappyError::UNCOMPRESS_TRUNCATED; + } + auto type = ChunkType{compressed[0]}; + auto size = boost::endian::load_little_u24(compressed.data() + 1); + if (compressed.size() < kHeaderSize + size) { + return SnappyError::UNCOMPRESS_TRUNCATED; + } + auto content = compressed.subspan(kHeaderSize, size); + compressed = compressed.subspan(kHeaderSize + size); + if (type == ChunkType::Stream) { + if (qtils::ByteView{content} != kStreamIdentifier) { + return SnappyError::UNCOMPRESS_UNKNOWN_IDENTIFIER; + } + } else if (type == ChunkType::Compressed + or type == ChunkType::Uncompressed) { + qtils::ByteVec buffer; + auto expected_crc = content.first(Crc32::size()); + auto uncompressed = content.subspan(Crc32::size()); + if (type == ChunkType::Compressed) { + BOOST_OUTCOME_TRY( + buffer, + uncompress(uncompressed, + libp2p::saturating_sub(max_size, result.size()))); + uncompressed = buffer; + } + auto actual_crc = hashCrc32(uncompressed); + if (qtils::ByteView{actual_crc} != expected_crc) { + return SnappyError::UNCOMPRESS_CRC_MISMATCH; + } + result.put(uncompressed); + } else if (type == ChunkType::Padding) { + // skip padding + } else { + return SnappyError::UNCOMPRESS_UNKNOWN_TYPE; + } + } + return result; + } +} // namespace lean::snappy diff --git a/tests/unit/serde/CMakeLists.txt b/tests/unit/serde/CMakeLists.txt index ea7aa76a..c7c1a84b 100644 --- a/tests/unit/serde/CMakeLists.txt +++ b/tests/unit/serde/CMakeLists.txt @@ -7,8 +7,14 @@ addtest(enr_test enr_test.cpp ) - target_link_libraries(enr_test enr ) +addtest(snappy_test + snappy_test.cpp +) +target_link_libraries(snappy_test + snappy +) + diff --git a/tests/unit/serde/snappy_test.cpp b/tests/unit/serde/snappy_test.cpp new file mode 100644 index 00000000..24c037da --- /dev/null +++ b/tests/unit/serde/snappy_test.cpp @@ -0,0 +1,22 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "serde/snappy.hpp" + +#include + +TEST(SnappyTest, Framed) { + auto compressed = + qtils::ByteVec::fromHex( + "ff060000734e61507059002f00009243dc2a5080215db8fcf7f8ea795d9c370d0654" + "cc7822936053f7a0b583213792bd483191e1000d0104215d962800") + .value(); + auto uncompressed = lean::snappy::uncompressFramed(compressed).value(); + auto compressed2 = lean::snappy::compressFramed(uncompressed); + // c++ compressor may return different bytes than rust compressor + auto uncompressed2 = lean::snappy::uncompressFramed(compressed2).value(); + EXPECT_EQ(uncompressed2, uncompressed); +} diff --git a/vcpkg.json b/vcpkg.json index dc16aaaa..dc92f53a 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -8,6 +8,7 @@ "boost-program-options", "boost-property-tree", "cppcodec", + "crc32c", "leanp2p", "prometheus-cpp", "qdrvm-crates",