diff --git a/.ci/.env b/.ci/.env index d17091b..46e9d13 100644 --- a/.ci/.env +++ b/.ci/.env @@ -6,6 +6,7 @@ LINUX_PACKAGES="make \ curl \ git \ libtool \ + nasm \ ninja-build \ pkg-config \ python3.12 \ @@ -23,6 +24,7 @@ MACOS_PACKAGES="make \ curl \ git \ libtool \ + nasm \ ninja \ pkg-config \ python@3.12 \ diff --git a/CMakeLists.txt b/CMakeLists.txt index 9a9edbd..8c32f62 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,17 +30,24 @@ find_package(Python3 REQUIRED) find_package(PkgConfig REQUIRED) #pkg_check_modules(libb2 REQUIRED IMPORTED_TARGET GLOBAL libb2) -find_package(Boost CONFIG REQUIRED COMPONENTS algorithm outcome program_options property_tree) -find_package(fmt CONFIG REQUIRED) -find_package(yaml-cpp CONFIG REQUIRED) -find_package(sszpp CONFIG REQUIRED) -find_package(soralog CONFIG REQUIRED) +find_package(Boost CONFIG REQUIRED COMPONENTS algorithm filesystem outcome program_options property_tree random) find_package(Boost.DI CONFIG REQUIRED) -find_package(qtils CONFIG REQUIRED) -find_package(prometheus-cpp CONFIG REQUIRED) +find_package(fmt CONFIG REQUIRED) +find_package(hashtree CONFIG REQUIRED) +find_package(libp2p CONFIG REQUIRED) +find_package(libsecp256k1 CONFIG REQUIRED) +find_package(lsquic CONFIG REQUIRED) find_package(OpenSSL CONFIG REQUIRED) +find_package(prometheus-cpp CONFIG REQUIRED) +find_package(Protobuf CONFIG REQUIRED) +find_package(qtils CONFIG REQUIRED) find_package(RocksDB CONFIG REQUIRED) -find_package(hashtree CONFIG REQUIRED) +find_package(snappy CONFIG REQUIRED) +find_package(soralog CONFIG REQUIRED) +find_package(sszpp CONFIG REQUIRED) +find_package(yaml-cpp CONFIG REQUIRED) + +include(vcpkg-overlay/cppcodec.cmake) # TODO Temporarily commented out until gcc is updated (gcc-13 crashes because of this). # if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ca71f33..5f951e3 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -7,6 +7,8 @@ # Executables (should contain `main()` function) add_subdirectory(executable) +add_subdirectory(serde) + # Application's thinks add_subdirectory(app) diff --git a/src/blockchain/block_tree.hpp b/src/blockchain/block_tree.hpp index c0ae685..4be1b9e 100644 --- a/src/blockchain/block_tree.hpp +++ b/src/blockchain/block_tree.hpp @@ -10,8 +10,10 @@ #include "types/justification.hpp" namespace lean { - struct BlockBody; struct Block; + struct BlockBody; + struct SignedBlock; + struct StatusMessage; } // namespace lean namespace lean::blockchain { @@ -150,6 +152,26 @@ namespace lean::blockchain { * @return hash of the block */ [[nodiscard]] virtual BlockIndex lastFinalized() const = 0; + + /** + * Get message for "/leanconsensus/req/status/1/ssz_snappy" protocol. + * Returns hash and slot for finalized and best blocks. + */ + virtual StatusMessage getStatusMessage() const = 0; + + /** + * Get `SignedBlock` for "/leanconsensus/req/blocks_by_root/1/ssz_snappy" + * protocol. + */ + virtual outcome::result> tryGetSignedBlock( + const BlockHash block_hash) const = 0; + + // TODO(turuslan): state transition function + /** + * Import pre-sorted batch of `SignedBlock`. + * May change best and finalized block. + */ + virtual void import(std::vector blocks) = 0; }; } // namespace lean::blockchain diff --git a/src/blockchain/impl/block_storage_impl.cpp b/src/blockchain/impl/block_storage_impl.cpp index 6bdc480..1e96584 100644 --- a/src/blockchain/impl/block_storage_impl.cpp +++ b/src/blockchain/impl/block_storage_impl.cpp @@ -169,7 +169,7 @@ namespace lean::blockchain { outcome::result BlockStorageImpl::putBlockHeader( const BlockHeader &header) { OUTCOME_TRY(encoded_header, encode(header)); - header.updateHash(*hasher_); + header.updateHash(); const auto &block_hash = header.hash(); OUTCOME_TRY(putToSpace(*storage_, storage::Space::Header, diff --git a/src/blockchain/impl/block_tree_impl.cpp b/src/blockchain/impl/block_tree_impl.cpp index f8d55bd..c94f098 100644 --- a/src/blockchain/impl/block_tree_impl.cpp +++ b/src/blockchain/impl/block_tree_impl.cpp @@ -21,6 +21,8 @@ #include "tests/testutil/literals.hpp" #include "types/block.hpp" #include "types/block_header.hpp" +#include "types/signed_block.hpp" +#include "types/status_message.hpp" namespace lean::blockchain { BlockTreeImpl::SafeBlockTreeData::SafeBlockTreeData(BlockTreeData data) @@ -126,13 +128,8 @@ namespace lean::blockchain { } const auto &parent = parent_opt.value(); - BlockHeader header; - header.slot = block.slot; - header.proposer_index = block.proposer_index; - header.parent_root = block.parent_root; - header.state_root = block.state_root; - header.body_root = {}; // ssz::hash_tree_root(block.body); - header.updateHash(*p.hasher_); + auto header = block.getHeader(); + header.updateHash(); SL_DEBUG(log_, "Adding block {}", header.index()); @@ -797,6 +794,48 @@ namespace lean::blockchain { [&](const BlockTreeData &p) { return getLastFinalizedNoLock(p); }); } + StatusMessage BlockTreeImpl::getStatusMessage() const { + auto finalized = lastFinalized(); + auto head = bestBlock(); + return StatusMessage{ + .finalized = {.root = finalized.hash, .slot = finalized.slot}, + .head = {.root = head.hash, .slot = head.slot}, + }; + } + + outcome::result> BlockTreeImpl::tryGetSignedBlock( + const BlockHash block_hash) const { + auto header_res = getBlockHeader(block_hash); + if (not header_res.has_value()) { + return std::nullopt; + } + auto &header = header_res.value(); + auto body_res = getBlockBody(block_hash); + if (not body_res.has_value()) { + return std::nullopt; + } + auto &body = body_res.value(); + return SignedBlock{ + .message = + { + .slot = header.slot, + .proposer_index = header.proposer_index, + .parent_root = header.parent_root, + .state_root = header.state_root, + .body = std::move(body), + }, + // TODO(turuslan): signature + .signature = {}, + }; + } + + void BlockTreeImpl::import(std::vector blocks) { + for (auto &block : blocks) { + // TODO(turuslan): signature + std::ignore = addBlock(block.message); + } + } + outcome::result BlockTreeImpl::reorgAndPrune( const BlockTreeData &p, const ReorgAndPrune &changes) { OUTCOME_TRY(p.storage_->setBlockTreeLeaves(p.tree_->leafHashes())); diff --git a/src/blockchain/impl/block_tree_impl.hpp b/src/blockchain/impl/block_tree_impl.hpp index 955f0e5..5788ee5 100644 --- a/src/blockchain/impl/block_tree_impl.hpp +++ b/src/blockchain/impl/block_tree_impl.hpp @@ -31,6 +31,10 @@ namespace lean::blockchain { class BlockTreeInitializer; } +namespace lean::crypto { + class Hasher; +} // namespace lean::crypto + namespace lean::blockchain { class BlockTreeImpl final @@ -104,6 +108,11 @@ namespace lean::blockchain { BlockIndex lastFinalized() const override; + StatusMessage getStatusMessage() const override; + outcome::result> tryGetSignedBlock( + const BlockHash block_hash) const override; + void import(std::vector blocks) override; + // BlockHeaderRepository methods outcome::result getNumberByHash( diff --git a/src/blockchain/impl/genesis_block_header_impl.cpp b/src/blockchain/impl/genesis_block_header_impl.cpp index a7c47f0..4d434a3 100644 --- a/src/blockchain/impl/genesis_block_header_impl.cpp +++ b/src/blockchain/impl/genesis_block_header_impl.cpp @@ -26,7 +26,7 @@ namespace lean::blockchain { res.error()); qtils::raise_on_err(res); } - updateHash(*hasher); + updateHash(); } } // namespace lean::blockchain diff --git a/src/injector/CMakeLists.txt b/src/injector/CMakeLists.txt index 2c69677..8353471 100644 --- a/src/injector/CMakeLists.txt +++ b/src/injector/CMakeLists.txt @@ -8,18 +8,19 @@ add_library(node_injector node_injector.cpp ) target_link_libraries(node_injector - Boost::Boost.DI - logger app_configurator - chain_spec app_state_manager application - metrics + blockchain + Boost::Boost.DI + chain_spec clock hasher - se_async + logger + metrics modules + p2p::libp2p + se_async storage - blockchain timeline ) diff --git a/src/libp2p/peer/peer_id_from.hpp b/src/libp2p/peer/peer_id_from.hpp new file mode 100644 index 0000000..ceb9567 --- /dev/null +++ b/src/libp2p/peer/peer_id_from.hpp @@ -0,0 +1,29 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include + +namespace libp2p { + /** + * Convert secp256k1 public key to `PeerId`. + */ + inline PeerId peerIdFromSecp256k1( + const crypto::secp256k1::PublicKey &public_key) { + return libp2p::PeerId::fromPublicKey( + libp2p::crypto::marshaller::KeyMarshaller{nullptr} + .marshal(libp2p::crypto::PublicKey{ + libp2p::crypto::Key::Type::Secp256k1, + qtils::ByteVec(public_key), + }) + .value()) + .value(); + } +} // namespace libp2p diff --git a/src/loaders/impl/networking_loader.hpp b/src/loaders/impl/networking_loader.hpp index 4336e78..d452b7d 100644 --- a/src/loaders/impl/networking_loader.hpp +++ b/src/loaders/impl/networking_loader.hpp @@ -17,6 +17,10 @@ #include "modules/shared/networking_types.tmp.hpp" #include "se/subscription.hpp" +namespace lean::blockchain { + class BlockTree; +} // namespace lean::blockchain + namespace lean::loaders { class NetworkingLoader final @@ -24,21 +28,22 @@ namespace lean::loaders { public Loader, public modules::NetworkingLoader { log::Logger logger_; + qtils::SharedRef block_tree_; std::shared_ptr> on_init_complete_; std::shared_ptr> on_loading_finished_; - std::shared_ptr< - BaseSubscriber>> - on_block_request_; + ON_DISPATCH_SUBSCRIPTION(SendSignedBlock); + ON_DISPATCH_SUBSCRIPTION(SendSignedVote); public: NetworkingLoader(std::shared_ptr logsys, - std::shared_ptr se_manager) + std::shared_ptr se_manager, + qtils::SharedRef block_tree) : Loader(std::move(logsys), std::move(se_manager)), - logger_(logsys_->getLogger("Networking", "networking_module")) {} + logger_(logsys_->getLogger("Networking", "networking_module")), + block_tree_{std::move(block_tree)} {} NetworkingLoader(const NetworkingLoader &) = delete; NetworkingLoader &operator=(const NetworkingLoader &) = delete; @@ -51,14 +56,15 @@ namespace lean::loaders { get_module() ->getFunctionFromLibrary, modules::NetworkingLoader &, - std::shared_ptr>( + std::shared_ptr, + qtils::SharedRef>( "query_module_instance"); if (not module_accessor) { return; } - auto module_internal = (*module_accessor)(*this, logsys_); + auto module_internal = (*module_accessor)(*this, logsys_, block_tree_); on_init_complete_ = se::SubscriberCreator::template create< EventTypes::NetworkingIsLoaded>( @@ -83,20 +89,8 @@ namespace lean::loaders { } }); - on_block_request_ = se::SubscriberCreator< - qtils::Empty, - std::shared_ptr>:: - template create( - *se_manager_, - SubscriptionEngineHandlers::kTest, - [module_internal, this](auto &, const auto &msg) { - if (auto m = module_internal.lock()) { - SL_TRACE( - logger_, "Handle BlockRequest; rid={}", msg->ctx.rid); - m->on_block_request(msg); - } - }); - + ON_DISPATCH_SUBSCRIBE(SendSignedBlock); + ON_DISPATCH_SUBSCRIBE(SendSignedVote); se_manager_->notify(lean::EventTypes::NetworkingIsLoaded); } @@ -113,16 +107,21 @@ namespace lean::loaders { se_manager_->notify(lean::EventTypes::PeerDisconnected, msg); } - void dispatch_block_announce( - std::shared_ptr msg) override { - SL_TRACE(logger_, "Dispatch BlockAnnounceReceived"); - se_manager_->notify(lean::EventTypes::BlockAnnounceReceived, msg); + void dispatch_StatusMessageReceived( + std::shared_ptr message) + override { + SL_TRACE(logger_, + "Dispatch StatusMessageReceived peer={} finalized={} head={}", + message->from_peer, + message->notification.finalized.slot, + message->notification.head.slot); + dispatchDerive(*se_manager_, message); } - void dispatch_block_response( - std::shared_ptr msg) override { - SL_TRACE(logger_, "Dispatch BlockResponse; rid={}", msg->ctx.rid); - se_manager_->notify(lean::EventTypes::BlockResponse, std::move(msg)); + void dispatch_SignedVoteReceived( + std::shared_ptr message) override { + SL_TRACE(logger_, "Dispatch SignedVoteReceived"); + dispatchDerive(*se_manager_, message); } }; } // namespace lean::loaders diff --git a/src/loaders/impl/production_loader.hpp b/src/loaders/impl/production_loader.hpp index 70bc090..be6dfdd 100644 --- a/src/loaders/impl/production_loader.hpp +++ b/src/loaders/impl/production_loader.hpp @@ -131,6 +131,11 @@ namespace lean::loaders { void dispatch_block_produced(std::shared_ptr msg) override { se_manager_->notify(EventTypes::BlockProduced, msg); } + + void dispatch_SendSignedBlock( + std::shared_ptr message) override { + dispatchDerive(*se_manager_, message); + } }; } // namespace lean::loaders diff --git a/src/loaders/impl/synchronizer_loader.hpp b/src/loaders/impl/synchronizer_loader.hpp index e337ab3..04a855f 100644 --- a/src/loaders/impl/synchronizer_loader.hpp +++ b/src/loaders/impl/synchronizer_loader.hpp @@ -25,16 +25,6 @@ namespace lean::loaders { using InitCompleteSubscriber = BaseSubscriber; std::shared_ptr on_init_complete_; - std::shared_ptr< - BaseSubscriber>> - on_block_announce_; - - std::shared_ptr< - BaseSubscriber>> - on_block_response_; - public: SynchronizerLoader(std::shared_ptr logsys, std::shared_ptr se_manager) @@ -72,41 +62,8 @@ namespace lean::loaders { } }); - on_block_announce_ = se::SubscriberCreator< - qtils::Empty, - std::shared_ptr>:: - create( - *se_manager_, - SubscriptionEngineHandlers::kTest, - [module_internal, this](auto &, const auto &msg) { - if (auto m = module_internal.lock()) { - SL_TRACE(logger_, "Handle BlockAnnounceReceived"); - m->on_block_announce(msg); - } - }); - - on_block_response_ = se::SubscriberCreator< - qtils::Empty, - std::shared_ptr>:: - create( - *se_manager_, - SubscriptionEngineHandlers::kTest, - [module_internal, this](auto &, const auto &msg) { - if (auto m = module_internal.lock()) { - SL_TRACE( - logger_, "Handle BlockResponse; rid={}", msg->ctx.rid); - m->on_block_response(msg); - } - }); - se_manager_->notify(EventTypes::SynchronizerIsLoaded); } - - void dispatch_block_request( - std::shared_ptr msg) override { - SL_TRACE(logger_, "Dispatch BlockRequest; rid={}", msg->ctx.rid); - se_manager_->notify(EventTypes::BlockRequest, msg); - } }; } // namespace lean::loaders diff --git a/src/log/logger.hpp b/src/log/logger.hpp index 4bc3c37..f30d902 100644 --- a/src/log/logger.hpp +++ b/src/log/logger.hpp @@ -80,6 +80,10 @@ namespace lean::log { return logging_system_->resetLevelOfLogger(logger_name); } + auto &getSoralog() const { + return logging_system_; + } + private: std::shared_ptr logging_system_; }; diff --git a/src/modules/module_loader.cpp b/src/modules/module_loader.cpp index 46f6562..6478281 100644 --- a/src/modules/module_loader.cpp +++ b/src/modules/module_loader.cpp @@ -45,7 +45,7 @@ namespace lean::modules { if (fs::is_directory(entry)) { OUTCOME_TRY(recursive_search(entry_path, modules)); - } else if (fs::is_regular_file(entry) + } else if ((fs::is_regular_file(entry) or fs::is_symlink(entry)) && entry_path.extension() == ".so") { OUTCOME_TRY(load_module(entry_path.string(), modules)); } diff --git a/src/modules/networking/CMakeLists.txt b/src/modules/networking/CMakeLists.txt index 1d6cda2..155bb80 100644 --- a/src/modules/networking/CMakeLists.txt +++ b/src/modules/networking/CMakeLists.txt @@ -14,7 +14,9 @@ add_lean_module(networking DEFINITIONS SOME_FLAG=1 LIBRARIES + p2p::libp2p qtils::qtils + Snappy::snappy soralog::soralog - hashtree + sszpp ) \ No newline at end of file diff --git a/src/modules/networking/interfaces.hpp b/src/modules/networking/interfaces.hpp index fec0483..02c3607 100644 --- a/src/modules/networking/interfaces.hpp +++ b/src/modules/networking/interfaces.hpp @@ -8,6 +8,8 @@ #include +#include "modules/shared/macro.hpp" + namespace lean::modules { struct NetworkingLoader { @@ -19,11 +21,10 @@ namespace lean::modules { virtual void dispatch_peer_disconnected( std::shared_ptr msg) = 0; - virtual void dispatch_block_announce( - std::shared_ptr msg) = 0; - - virtual void dispatch_block_response( - std::shared_ptr msg) = 0; + virtual void dispatch_StatusMessageReceived( + std::shared_ptr message) = 0; + virtual void dispatch_SignedVoteReceived( + std::shared_ptr message) = 0; }; struct Networking { @@ -33,8 +34,10 @@ namespace lean::modules { virtual void on_loading_is_finished() = 0; - virtual void on_block_request( - std::shared_ptr msg) = 0; + virtual void on_dispatch_SendSignedBlock( + std::shared_ptr message) = 0; + virtual void on_dispatch_SendSignedVote( + std::shared_ptr message) = 0; }; } // namespace lean::modules diff --git a/src/modules/networking/module.cpp b/src/modules/networking/module.cpp index 1e4be59..c1052b5 100644 --- a/src/modules/networking/module.cpp +++ b/src/modules/networking/module.cpp @@ -9,6 +9,10 @@ #define MODULE_C_API extern "C" __attribute__((visibility("default"))) #define MODULE_API __attribute__((visibility("default"))) +namespace lean::blockchain { + class BlockTree; +} // namespace lean::blockchain + MODULE_C_API const char *loader_id() { return "NetworkingLoader"; } @@ -24,10 +28,11 @@ static std::shared_ptr module_instance; MODULE_C_API std::weak_ptr query_module_instance( lean::modules::NetworkingLoader &loader, - std::shared_ptr logsys) { + std::shared_ptr logsys, + qtils::SharedRef block_tree) { if (!module_instance) { - module_instance = - lean::modules::NetworkingImpl::create_shared(loader, std::move(logsys)); + module_instance = lean::modules::NetworkingImpl::create_shared( + loader, std::move(logsys), block_tree); } return module_instance; } diff --git a/src/modules/networking/networking.cpp b/src/modules/networking/networking.cpp index 0316c1b..4af2856 100644 --- a/src/modules/networking/networking.cpp +++ b/src/modules/networking/networking.cpp @@ -7,35 +7,483 @@ #include "modules/networking/networking.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "blockchain/block_tree.hpp" +#include "serde/serialization.hpp" +#include "serde/snappy.hpp" +#include "types/block_request.hpp" +#include "utils/__debug_env.hpp" +#include "utils/sample_peer.hpp" namespace lean::modules { + // TODO(turuslan): gossip [from,seqno,signature,key]=None + + inline auto gossipTopic(std::string_view type) { + return std::format("/leanconsensus/devnet0/{}/ssz_snappy", type); + } + + auto encodeSszSnappy(const auto &t) { + return snappyCompress(encode(t).value()); + } + + template + outcome::result decodeSszSnappy(qtils::BytesIn compressed) { + BOOST_OUTCOME_TRY(auto uncompressed, snappyUncompress(compressed)); + return decode(uncompressed); + } + + libp2p::protocol::gossip::MessageId gossipMessageId( + const libp2p::protocol::gossip::Message &message) { + constexpr qtils::ByteArr<4> MESSAGE_DOMAIN_INVALID_SNAPPY{0, 0, 0, 0}; + constexpr qtils::ByteArr<4> MESSAGE_DOMAIN_VALID_SNAPPY{1, 0, 0, 0}; + libp2p::crypto::Sha256 hasher; + auto hash_topic = [&] { + qtils::ByteArr size; + boost::endian::store_little_u64(size.data(), message.topic.size()); + hasher.write(size).value(); + hasher.write(message.topic).value(); + }; + if (auto uncompressed_res = snappyUncompress(message.data)) { + auto &uncompressed = uncompressed_res.value(); + hash_topic(); + hasher.write(MESSAGE_DOMAIN_VALID_SNAPPY).value(); + hasher.write(uncompressed).value(); + } else { + hasher.write(MESSAGE_DOMAIN_INVALID_SNAPPY).value(); + hash_topic(); + hasher.write(message.data).value(); + } + auto hash = hasher.digest().value(); + hash.resize(20); + return hash; + } + + class StatusProtocol : public std::enable_shared_from_this, + public libp2p::protocol::BaseProtocol { + public: + using GetStatus = std::function; + using OnStatus = std::function; + + StatusProtocol(std::shared_ptr io_context, + std::shared_ptr host, + GetStatus get_status, + OnStatus on_status) + : io_context_{std::move(io_context)}, + host_{std::move(host)}, + get_status_{std::move(get_status)}, + on_status_{std::move(on_status)} {} + + // BaseProtocol + libp2p::StreamProtocols getProtocolIds() const override { + return {"/leanconsensus/req/status/1/ssz_snappy"}; + } + void handle(std::shared_ptr stream) override { + libp2p::coroSpawn( + *io_context_, + [self{shared_from_this()}, stream]() -> libp2p::Coro { + std::ignore = co_await self->coroHandle(stream); + }); + } + + void start() { + host_->listenProtocol(shared_from_this()); + } + + libp2p::CoroOutcome connect( + std::shared_ptr connection) { + BOOST_OUTCOME_CO_TRY( + auto stream, co_await host_->newStream(connection, getProtocolIds())); + BOOST_OUTCOME_CO_TRY(co_await coroHandle(stream)); + co_return outcome::success(); + } + + private: + libp2p::CoroOutcome coroHandle( + std::shared_ptr stream) { + auto peer_id = stream->remotePeerId(); + BOOST_OUTCOME_CO_TRY(co_await libp2p::writeVarintMessage( + stream, encodeSszSnappy(get_status_()))); + qtils::ByteVec encoded; + BOOST_OUTCOME_CO_TRY(co_await libp2p::readVarintMessage(stream, encoded)); + BOOST_OUTCOME_CO_TRY(auto status, + decodeSszSnappy(encoded)); + on_status_(messages::StatusMessageReceived{ + .from_peer = peer_id, + .notification = status, + }); + co_return outcome::success(); + } + + std::shared_ptr io_context_; + std::shared_ptr host_; + GetStatus get_status_; + OnStatus on_status_; + }; + + class BlockRequestProtocol + : public std::enable_shared_from_this, + public libp2p::protocol::BaseProtocol { + public: + BlockRequestProtocol(std::shared_ptr io_context, + std::shared_ptr host, + qtils::SharedRef block_tree) + : io_context_{std::move(io_context)}, + host_{std::move(host)}, + block_tree_{std::move(block_tree)} {} + + // BaseProtocol + libp2p::StreamProtocols getProtocolIds() const override { + return {"/leanconsensus/req/blocks_by_root/1/ssz_snappy"}; + } + void handle(std::shared_ptr stream) override { + libp2p::coroSpawn( + *io_context_, + [self{shared_from_this()}, stream]() -> libp2p::Coro { + std::ignore = co_await self->coroRespond(stream); + }); + } + + void start() { + host_->listenProtocol(shared_from_this()); + } + + libp2p::CoroOutcome request(libp2p::PeerId peer_id, + BlockRequest request) { + BOOST_OUTCOME_CO_TRY( + auto stream, co_await host_->newStream(peer_id, getProtocolIds())); + BOOST_OUTCOME_CO_TRY(co_await libp2p::writeVarintMessage( + stream, encodeSszSnappy(request))); + qtils::ByteVec encoded; + BOOST_OUTCOME_CO_TRY(co_await libp2p::readVarintMessage(stream, encoded)); + BOOST_OUTCOME_CO_TRY(auto response, + decodeSszSnappy(encoded)); + co_return response; + } + + private: + libp2p::CoroOutcome coroRespond( + std::shared_ptr stream) { + qtils::ByteVec encoded; + BOOST_OUTCOME_CO_TRY(co_await libp2p::readVarintMessage(stream, encoded)); + BOOST_OUTCOME_CO_TRY(auto request, + decodeSszSnappy(encoded)); + BlockResponse response; + for (auto &block_hash : request.blocks) { + BOOST_OUTCOME_CO_TRY(auto block, + block_tree_->tryGetSignedBlock(block_hash)); + if (block.has_value()) { + response.blocks.push_back(std::move(block.value())); + } + } + BOOST_OUTCOME_CO_TRY(co_await libp2p::writeVarintMessage( + stream, encodeSszSnappy(response))); + co_return outcome::success(); + } + + std::shared_ptr io_context_; + std::shared_ptr host_; + qtils::SharedRef block_tree_; + }; NetworkingImpl::NetworkingImpl( NetworkingLoader &loader, - qtils::SharedRef logging_system) + qtils::SharedRef logging_system, + qtils::SharedRef block_tree) : loader_(loader), - logger_(logging_system->getLogger("Networking", "networking_module")) {} + logger_(logging_system->getLogger("Networking", "networking_module")), + block_tree_{std::move(block_tree)} { + libp2p::log::setLoggingSystem(logging_system->getSoralog()); + } + + NetworkingImpl::~NetworkingImpl() { + if (io_thread_.has_value()) { + io_context_->stop(); + io_thread_->join(); + } + } void NetworkingImpl::on_loaded_success() { SL_INFO(logger_, "Loaded success"); + + SamplePeer sample_peer{getPeerIndex()}; + + auto injector = qtils::toSharedPtr(libp2p::injector::makeHostInjector( + libp2p::injector::useKeyPair(sample_peer.keypair), + libp2p::injector::useTransportAdaptors< + libp2p::transport::QuicTransport>())); + injector_ = injector; + io_context_ = injector->create>(); + + auto host = injector->create>(); + + if (auto r = host->listen(sample_peer.listen); not r.has_value()) { + SL_WARN(logger_, "listen {} error: {}", sample_peer.listen, r.error()); + } + host->start(); + + auto on_peer_connected = + [weak_self{weak_from_this()}]( + std::weak_ptr + weak_connection) { + auto connection = weak_connection.lock(); + if (not connection) { + return; + } + auto self = weak_self.lock(); + if (not self) { + return; + } + auto peer_id = connection->remotePeer(); + self->loader_.dispatch_peer_connected( + qtils::toSharedPtr(messages::PeerConnectedMessage{peer_id})); + if (connection->isInitiator()) { + libp2p::coroSpawn( + *self->io_context_, + [status_protocol{self->status_protocol_}, + connection]() -> libp2p::Coro { + std::ignore = co_await status_protocol->connect(connection); + }); + } + }; + auto on_peer_disconnected = + [weak_self{weak_from_this()}](libp2p::PeerId peer_id) { + auto self = weak_self.lock(); + if (not self) { + return; + } + self->loader_.dispatch_peer_disconnected( + qtils::toSharedPtr(messages::PeerDisconnectedMessage{peer_id})); + }; + on_peer_connected_sub_ = + host->getBus() + .getChannel() + .subscribe(on_peer_connected); + on_peer_disconnected_sub_ = + host->getBus() + .getChannel() + .subscribe(on_peer_disconnected); + + status_protocol_ = std::make_shared( + io_context_, + host, + [block_tree{block_tree_}]() { return block_tree->getStatusMessage(); }, + [weak_self{weak_from_this()}](messages::StatusMessageReceived message) { + auto self = weak_self.lock(); + if (not self) { + return; + } + self->receiveStatus(message); + }); + status_protocol_->start(); + + block_request_protocol_ = + std::make_shared(io_context_, host, block_tree_); + block_request_protocol_->start(); + + gossip_ = + injector->create>(); + gossip_->start(); + + gossip_blocks_topic_ = gossipSubscribe( + "block", [weak_self{weak_from_this()}](SignedBlock &&block) { + auto self = weak_self.lock(); + if (not self) { + return; + } + block.message.setHash(); + self->receiveBlock(std::nullopt, std::move(block)); + }); + gossip_votes_topic_ = gossipSubscribe( + "vote", [weak_self{weak_from_this()}](SignedVote &&vote) { + auto self = weak_self.lock(); + if (not self) { + return; + } + self->loader_.dispatch_SignedVoteReceived( + std::make_shared(std::move(vote))); + }); + + if (sample_peer.index != 0) { + libp2p::coroSpawn( + *io_context_, + [weak_self{weak_from_this()}, host]() -> libp2p::Coro { + SamplePeer sample_peer{0}; + + if (auto r = co_await host->connect(sample_peer.connect_info); + not r.has_value()) { + auto self = weak_self.lock(); + if (not self) { + co_return; + } + SL_WARN(self->logger_, + "connect {} error: {}", + sample_peer.connect, + r.error()); + } + }); + } + + io_thread_.emplace([io_context{io_context_}] { + auto work_guard = boost::asio::make_work_guard(*io_context); + io_context->run(); + }); } void NetworkingImpl::on_loading_is_finished() { SL_INFO(logger_, "Loading is finished"); + } + + void NetworkingImpl::on_dispatch_SendSignedBlock( + std::shared_ptr message) { + boost::asio::post(*io_context_, [self{shared_from_this()}, message] { + self->gossip_blocks_topic_->publish( + encodeSszSnappy(message->notification)); + }); + } - // tmp entry point for experiments - auto x = std::make_shared(); - loader_.dispatch_block_announce(std::move(x)); + void NetworkingImpl::on_dispatch_SendSignedVote( + std::shared_ptr message) { + boost::asio::post(*io_context_, [self{shared_from_this()}, message] { + self->gossip_votes_topic_->publish( + encodeSszSnappy(message->notification)); + }); } - void NetworkingImpl::on_block_request( - std::shared_ptr msg) { - SL_INFO(logger_, "Block requested"); + template + std::shared_ptr + NetworkingImpl::gossipSubscribe(std::string_view type, auto f) { + auto topic = gossip_->subscribe(gossipTopic(type)); + libp2p::coroSpawn(*io_context_, + [topic, f{std::move(f)}]() -> libp2p::Coro { + while (auto raw_result = co_await topic->receive()) { + auto &raw = raw_result.value(); + if (auto uncompressed_res = snappyUncompress(raw)) { + auto &uncompressed = uncompressed_res.value(); + if (auto r = decode(uncompressed)) { + f(std::move(r.value())); + } + } + } + }); + return topic; + } - // tmp entry point for experiments - auto x = std::make_shared( - messages::BlockResponseMessage{.ctx = msg->ctx, .result = Block{}}); - loader_.dispatch_block_response(std::move(x)); - }; + void NetworkingImpl::receiveStatus( + const messages::StatusMessageReceived &message) { + BlockIndex finalized{ + message.notification.finalized.slot, + message.notification.finalized.root, + }; + BlockIndex head{ + message.notification.head.slot, + message.notification.head.root, + }; + if (not statusFinalizedIsGood(finalized) + or not statusFinalizedIsGood(head)) { + return; + } + if (not block_cache_.contains(head.hash) and not block_tree_->has(head.hash) + and head.slot > block_tree_->lastFinalized().slot) { + SL_TRACE(logger_, "receiveStatus {} => request", head.slot); + requestBlock(message.from_peer, head.hash); + } + loader_.dispatch_StatusMessageReceived(qtils::toSharedPtr(message)); + } + + void NetworkingImpl::requestBlock(const libp2p::PeerId &peer_id, + const BlockHash &block_hash) { + libp2p::coroSpawn(*io_context_, + [self{shared_from_this()}, + peer_id, + block_hash]() -> libp2p::Coro { + auto response_res = + co_await self->block_request_protocol_->request( + peer_id, {.blocks = {{block_hash}}}); + if (response_res.has_value()) { + auto &blocks = response_res.value().blocks.data(); + if (blocks.empty()) { + co_return; + } + auto &block = blocks.at(0); + block.message.setHash(); + self->receiveBlock(peer_id, std::move(block)); + } + }); + } + // TODO(turuslan): detect finalized change + void NetworkingImpl::receiveBlock(std::optional from_peer, + SignedBlock &&block) { + auto slot_hash = block.message.slotHash(); + SL_TRACE(logger_, "receiveBlock {}", slot_hash.slot); + auto remove = [&](auto f) { + std::vector queue{slot_hash.hash}; + while (not queue.empty()) { + auto hash = queue.back(); + queue.pop_back(); + auto [begin, end] = block_children_.equal_range(hash); + for (auto it = begin; it != end; it = block_children_.erase(it)) { + f(it->second); + queue.emplace_back(it->second); + } + } + }; + auto parent_hash = block.message.parent_root; + if (block_cache_.contains(slot_hash.hash)) { + SL_TRACE(logger_, "receiveBlock {} => ignore cached", slot_hash.slot); + return; + } + if (block_tree_->has(slot_hash.hash)) { + SL_TRACE(logger_, "receiveBlock {} => ignore db", slot_hash.slot); + return; + } + if (slot_hash.slot <= block_tree_->lastFinalized().slot) { + SL_TRACE( + logger_, "receiveBlock {} => ignore finalized fork", slot_hash.slot); + remove( + [&](const BlockHash &block_hash) { block_cache_.erase(block_hash); }); + return; + } + if (block_tree_->has(parent_hash)) { + std::vector blocks{std::move(block)}; + remove([&](const BlockHash &block_hash) { + blocks.emplace_back(block_cache_.extract(block_hash).mapped()); + }); + std::string __s; + for (auto &block : blocks) { + __s += std::format(" {}", block.message.slot); + } + SL_TRACE(logger_, "receiveBlock {} => import{}", slot_hash.slot, __s); + block_tree_->import(std::move(blocks)); + return; + } + block_cache_.emplace(slot_hash.hash, std::move(block)); + block_children_.emplace(parent_hash, slot_hash.hash); + if (block_cache_.contains(parent_hash)) { + SL_TRACE(logger_, "receiveBlock {} => has parent", slot_hash.slot); + return; + } + if (not from_peer) { + return; + } + requestBlock(from_peer.value(), parent_hash); + SL_TRACE(logger_, "receiveBlock {} => request parent", slot_hash.slot); + } + + bool NetworkingImpl::statusFinalizedIsGood(const BlockIndex &slot_hash) { + if (auto expected = block_tree_->getNumberByHash(slot_hash.hash)) { + return slot_hash.slot == expected.value(); + } + return slot_hash.slot > block_tree_->lastFinalized().slot; + } } // namespace lean::modules diff --git a/src/modules/networking/networking.hpp b/src/modules/networking/networking.hpp index 6a24984..6d866f1 100644 --- a/src/modules/networking/networking.hpp +++ b/src/modules/networking/networking.hpp @@ -6,31 +6,89 @@ #pragma once +#include + +#include #include #include #include #include #include +namespace boost::asio { + class io_context; +} // namespace boost::asio + +namespace lean::blockchain { + class BlockTree; +} // namespace lean::blockchain + +namespace libp2p::protocol::gossip { + class Gossip; + class Topic; +} // namespace libp2p::protocol::gossip + namespace lean::modules { + class StatusProtocol; + class BlockRequestProtocol; + /** + * Network module. + * + * Sends produced blocks and signed votes. + * Syncs blocks from other peers. + * Receives votes from other peers. + * + * Protocols: + * - Status handshake protocol (best and finalized block info). + * - Block request protocol (`SignedBlock` by hash). + * - `SignedBlock` and `SignedVote` gossip protocol. + */ class NetworkingImpl final : public Singleton, public Networking { NetworkingImpl(NetworkingLoader &loader, - qtils::SharedRef logging_system); + qtils::SharedRef logging_system, + qtils::SharedRef block_tree); public: CREATE_SHARED_METHOD(NetworkingImpl); - void on_loaded_success() override; + ~NetworkingImpl() override; + // Networking + void on_loaded_success() override; void on_loading_is_finished() override; - - void on_block_request( - std::shared_ptr msg) override; + void on_dispatch_SendSignedBlock( + std::shared_ptr message) override; + void on_dispatch_SendSignedVote( + std::shared_ptr message) override; private: + template + std::shared_ptr gossipSubscribe( + std::string_view type, auto f); + + void receiveStatus(const messages::StatusMessageReceived &message); + void requestBlock(const libp2p::PeerId &peer_id, + const BlockHash &block_hash); + void receiveBlock(std::optional peer_id, + SignedBlock &&block); + bool statusFinalizedIsGood(const BlockIndex &slot_hash); + NetworkingLoader &loader_; log::Logger logger_; + qtils::SharedRef block_tree_; + std::shared_ptr injector_; + std::shared_ptr io_context_; + std::optional io_thread_; + libp2p::event::Handle on_peer_connected_sub_; + libp2p::event::Handle on_peer_disconnected_sub_; + std::shared_ptr status_protocol_; + std::shared_ptr block_request_protocol_; + std::shared_ptr gossip_; + std::shared_ptr gossip_blocks_topic_; + std::shared_ptr gossip_votes_topic_; + std::unordered_map block_cache_; + std::unordered_multimap block_children_; }; } // namespace lean::modules diff --git a/src/modules/production/interfaces.hpp b/src/modules/production/interfaces.hpp index 772cf18..54ddf63 100644 --- a/src/modules/production/interfaces.hpp +++ b/src/modules/production/interfaces.hpp @@ -8,6 +8,9 @@ #include +#include "modules/shared/macro.hpp" +#include "modules/shared/networking_types.tmp.hpp" + namespace lean::messages { struct SlotStarted; struct Finalized; @@ -23,6 +26,9 @@ namespace lean::modules { virtual ~ProductionLoader() = default; virtual void dispatch_block_produced(std::shared_ptr) = 0; + + virtual void dispatch_SendSignedBlock( + std::shared_ptr message) = 0; }; struct ProductionModule { diff --git a/src/modules/production/production.cpp b/src/modules/production/production.cpp index 99fcab0..0d5a20c 100644 --- a/src/modules/production/production.cpp +++ b/src/modules/production/production.cpp @@ -8,9 +8,11 @@ #include "blockchain/block_tree.hpp" #include "crypto/hasher.hpp" +#include "modules/shared/networking_types.tmp.hpp" #include "modules/shared/prodution_types.tmp.hpp" #include "types/block_data.hpp" #include "types/signed_block.hpp" +#include "utils/__debug_env.hpp" namespace lean::modules { ProductionModuleImpl::ProductionModuleImpl( @@ -38,7 +40,8 @@ namespace lean::modules { SL_INFO(logger_, "Epoch changed to {}", msg->epoch); } - auto is_producer = msg->slot % 3 == 2; // qdrvm validator indices for dev + auto producer_index = msg->slot % getValidatorCount(); + auto is_producer = getPeerIndex() == producer_index; SL_INFO(logger_, "Slot {} is started{}", @@ -48,28 +51,11 @@ namespace lean::modules { if (is_producer) { auto parent_hash = block_tree_->bestBlock().hash; // Produce block - BlockBody body; - - BlockHeader header; - header.slot = msg->slot; - header.proposer_index = 2; - header.parent_root = parent_hash; - header.state_root = {}; - header.body_root = {}; - header.updateHash(*hasher_); - - BlockData block_data; - block_data.hash = header.hash(); - block_data.header.emplace(header); - block_data.body.emplace(body); - block_data.signature = {}; - Block block; block.slot = msg->slot; - block.proposer_index = 2; + block.proposer_index = producer_index; block.parent_root = parent_hash; block.state_root = {}; - block.body = body; // Add a block into the block tree auto res = block_tree_->addBlock(block); @@ -81,6 +67,11 @@ namespace lean::modules { // Notify subscribers loader_.dispatch_block_produced(std::make_shared(block)); + + // TODO(turuslan): signature + loader_.dispatch_SendSignedBlock( + std::make_shared( + SignedBlock{.message = block})); } } diff --git a/src/modules/shared/macro.hpp b/src/modules/shared/macro.hpp new file mode 100644 index 0000000..3708504 --- /dev/null +++ b/src/modules/shared/macro.hpp @@ -0,0 +1,70 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +/** + * Macro to work with subscriptions and messages. + * + * struct Message { ... }; + * + * struct IModule { + * // Send `Message` to subscription + * VIRTUAL_DISPATCH(Message); + * + * // Received `Message` from subscription + * VIRTUAL_ON_DISPATCH(Message); + * }; + * struct Module : IModule { + * ON_DISPATCH_SUBSCRIPTION(Message); + * + * void start() { + * ON_DISPATCH_SUBSCRIBE(Message); + * } + * + * DISPATCH_OVERRIDE(Message) { + * log("dispatch Message"); + * dispatchDerive(subscription, message); + * } + * + * ON_DISPATCH_OVERRIDE(Message); + * }; + * ON_DISPATCH_IMPL(Module, Message) { + * log("received Message"); + * } + */ + +#define VIRTUAL_DISPATCH(T) \ + virtual void dispatch_##T(std::shared_ptr message) = 0 +#define DISPATCH_OVERRIDE(T) \ + void dispatch_##T(std::shared_ptr message) override + +#define VIRTUAL_ON_DISPATCH(T) \ + virtual void on_dispatch_##T(std::shared_ptr message) = 0 +#define ON_DISPATCH_OVERRIDE(T) \ + void on_dispatch_##T(std::shared_ptr message) override +#define ON_DISPATCH_IMPL(C, T) \ + void C::on_dispatch_##T(std::shared_ptr message) + + +#define ON_DISPATCH_SUBSCRIPTION(T) \ + std::shared_ptr< \ + BaseSubscriber>> \ + on_dispatch_subscription_##T +#define ON_DISPATCH_SUBSCRIBE(T) \ + on_dispatch_subscription_##T = \ + se::SubscriberCreator>:: \ + create(*se_manager_, \ + SubscriptionEngineHandlers::kTest, \ + DeriveEventType::get(), \ + [module_internal]( \ + std::nullptr_t, \ + const std::shared_ptr &msg) { \ + if (auto m = module_internal.lock()) { \ + m->on_dispatch_##T(msg); \ + } \ + }) diff --git a/src/modules/shared/networking_types.tmp.hpp b/src/modules/shared/networking_types.tmp.hpp index d5c7f30..ccdc9be 100644 --- a/src/modules/shared/networking_types.tmp.hpp +++ b/src/modules/shared/networking_types.tmp.hpp @@ -6,44 +6,44 @@ #pragma once -#include "types/block.hpp" -#include "types/block_header.hpp" -#include "types/types.hpp" -#include "utils/request_id.hpp" +#include + +#include "types/signed_block.hpp" +#include "types/signed_vote.hpp" +#include "types/status_message.hpp" namespace lean::messages { + template + struct NotificationReceived { + libp2p::PeerId from_peer; + Notification notification; + }; + + template + struct GossipNotificationReceived { + Notification notification; + }; + + template + struct BroadcastNotification { + Notification notification; + }; struct PeerConnectedMessage { - PeerId peer; + libp2p::PeerId peer; // address? // initial view? }; struct PeerDisconnectedMessage { - PeerId peer; + libp2p::PeerId peer; // reason? }; - struct BlockAnnounce { - BlockHeader header; - PeerId peer; - }; - - struct BlockAnnounceMessage { - BlockAnnounce header; - PeerId peer; - }; + using StatusMessageReceived = NotificationReceived; - struct BlockRequestMessage { - RequestCxt ctx; - // BlocksRequest request; - PeerId peer; - }; - - struct BlockResponseMessage { - RequestCxt ctx; - outcome::result result; - PeerId peer; - }; + using SendSignedBlock = BroadcastNotification; + using SendSignedVote = BroadcastNotification; + using SignedVoteReceived = GossipNotificationReceived; } // namespace lean::messages diff --git a/src/modules/shared/synchronizer_types.tmp.hpp b/src/modules/shared/synchronizer_types.tmp.hpp index 5f72456..a68e4f7 100644 --- a/src/modules/shared/synchronizer_types.tmp.hpp +++ b/src/modules/shared/synchronizer_types.tmp.hpp @@ -6,13 +6,15 @@ #pragma once +#include + #include "types/types.hpp" namespace lean::messages { struct BlockDiscoveredMessage { BlockIndex index; - PeerId peer; + libp2p::PeerId peer; }; } // namespace lean::messages diff --git a/src/modules/synchronizer/interfaces.hpp b/src/modules/synchronizer/interfaces.hpp index 2e1c485..6504082 100644 --- a/src/modules/synchronizer/interfaces.hpp +++ b/src/modules/synchronizer/interfaces.hpp @@ -13,27 +13,15 @@ namespace lean::modules { struct SynchronizerLoader { virtual ~SynchronizerLoader() = default; - - virtual void dispatch_block_request( - std::shared_ptr msg) = 0; }; struct Synchronizer { virtual ~Synchronizer() = default; virtual void on_loaded_success() = 0; - /// New block discovered by block announce - /// Expected from a network subsystem - virtual void on_block_announce( - std::shared_ptr msg) = 0; - /// New block discovered (i.e., by peer's state view update) virtual void on_block_index_discovered( std::shared_ptr msg) = 0; - - /// BlockResponse has received - virtual void on_block_response( - std::shared_ptr msg) = 0; }; } // namespace lean::modules diff --git a/src/modules/synchronizer/synchronizer.cpp b/src/modules/synchronizer/synchronizer.cpp index 049fe50..d2cabf6 100644 --- a/src/modules/synchronizer/synchronizer.cpp +++ b/src/modules/synchronizer/synchronizer.cpp @@ -27,35 +27,5 @@ namespace lean::modules { void SynchronizerImpl::on_block_index_discovered( std::shared_ptr msg) { SL_INFO(logger_, "Block discovered"); - }; - - void SynchronizerImpl::on_block_announce( - std::shared_ptr msg) { - SL_INFO(logger_, "Block announced"); - - // tmp - static const size_t s = reinterpret_cast(this); - static size_t n = 0; - auto x = std::make_shared( - messages::BlockRequestMessage{.ctx = {{s, ++n}}}); - - // block_response_callbacks_.emplace(x->ctx.rid, [&](auto& msg) { - // SL_INFO(logger_, "Block response has been handled; rid={}", - // msg->ctx.rid); - // }); - loader_.dispatch_block_request(std::move(x)); - }; - - void SynchronizerImpl::on_block_response( - std::shared_ptr msg) { - auto it = block_response_callbacks_.find(msg->ctx.rid); - if (it == block_response_callbacks_.end()) { - SL_TRACE(logger_, "Received a response to someone else's request"); - return; - } - - SL_INFO(logger_, "Block response is received; rid={}", msg->ctx.rid); - // it->second(msg); } - } // namespace lean::modules diff --git a/src/modules/synchronizer/synchronizer.hpp b/src/modules/synchronizer/synchronizer.hpp index a55a25e..168c896 100644 --- a/src/modules/synchronizer/synchronizer.hpp +++ b/src/modules/synchronizer/synchronizer.hpp @@ -28,21 +28,9 @@ namespace lean::modules { void on_block_index_discovered( std::shared_ptr msg) override; - void on_block_announce( - std::shared_ptr msg) override; - - void on_block_response( - std::shared_ptr msg) override; - private: SynchronizerLoader &loader_; log::Logger logger_; - - std::unordered_map< - RequestId, - std::function msg)>> - block_response_callbacks_; }; } // namespace lean::modules diff --git a/src/qtils/to_shared_ptr.hpp b/src/qtils/to_shared_ptr.hpp new file mode 100644 index 0000000..d3b54d6 --- /dev/null +++ b/src/qtils/to_shared_ptr.hpp @@ -0,0 +1,19 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +namespace qtils { + /** + * Make `shared_ptr` from object of type `T`. + */ + template + auto toSharedPtr(T &&t) { + return std::make_shared>(std::forward(t)); + } +} // namespace qtils diff --git a/src/se/subscription.hpp b/src/se/subscription.hpp index 57bd972..7926faa 100644 --- a/src/se/subscription.hpp +++ b/src/se/subscription.hpp @@ -77,24 +77,33 @@ namespace lean::se { * * @return A @c std::shared_ptr to the created @ref BaseSubscriber. */ - template + template static auto create(Subscription &se, SubscriptionEngineHandlers tid, + EventTypes key, F &&callback, Args &&...args) { auto subscriber = BaseSubscriber::create( se.getEngine(), std::forward(args)...); subscriber->setCallback( - [f{std::forward(callback)}](auto /*set_id*/, - auto &context, - auto event_key, - EventData... args) mutable { + [key, f{std::forward(callback)}](auto /*set_id*/, + auto &context, + auto event_key, + EventData... args) mutable { assert(key == event_key); std::forward(f)(context, std::move(args)...); }); subscriber->subscribe(0, key, static_cast(tid)); return subscriber; } + template + static auto create(Subscription &se, + SubscriptionEngineHandlers tid, + F &&callback, + Args &&...args) { + return create( + se, tid, key, std::forward(callback), std::forward(args)...); + } }; } // namespace lean::se diff --git a/src/se/subscription_fwd.hpp b/src/se/subscription_fwd.hpp index 0edae2e..5b1ba46 100644 --- a/src/se/subscription_fwd.hpp +++ b/src/se/subscription_fwd.hpp @@ -7,6 +7,7 @@ #pragma once #include +#include namespace lean { enum class SubscriptionEngineHandlers { @@ -83,8 +84,39 @@ namespace lean { /// New slot started SlotStarted, + + /// Used by `DeriveEventType::get` + Derive, + }; + + /** + * Get `EventType` auto-assigned to type `T`. + */ + class DeriveEventType { + public: + template + static EventTypes get() { + static auto type = static_cast( + std::to_underlying(EventTypes::Derive) + nextIndex()); + return type; + } + + private: + static size_t nextIndex() { + static size_t index = 0; + return index++; + } }; + /** + * Call `notify` with `EventType` auto-assigned to type `T`. + */ + template + void dispatchDerive(auto &subscription, const std::shared_ptr &message) { + subscription.notify(DeriveEventType::get>(), + message); + } + static constexpr uint32_t kThreadPoolSize = 3u; namespace se { diff --git a/src/serde/CMakeLists.txt b/src/serde/CMakeLists.txt new file mode 100644 index 0000000..3294e41 --- /dev/null +++ b/src/serde/CMakeLists.txt @@ -0,0 +1,13 @@ +# +# Copyright Quadrivium LLC +# All Rights Reserved +# SPDX-License-Identifier: Apache-2.0 +# + +add_library(enr + enr.cpp +) +target_link_libraries(enr + cppcodec + p2p::libp2p +) diff --git a/src/serde/enr.cpp b/src/serde/enr.cpp new file mode 100644 index 0000000..ebafa0f --- /dev/null +++ b/src/serde/enr.cpp @@ -0,0 +1,278 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "serde/enr.hpp" + +#include + +#include +#include +#include + +namespace lean::rlp { + constexpr uint8_t kMaxPrefix1 = 55; + constexpr uint8_t kBytesPrefix1 = 0x80; + constexpr uint8_t kListPrefix1 = 0xc0; + + struct Decoder { + qtils::BytesIn input_; + + bool empty() const { + return input_.empty(); + } + + qtils::BytesIn _take(size_t n) { + assert(n <= input_.size()); + auto r = input_.first(n); + input_ = input_.subspan(n); + return r; + } + + template + static T _uint(qtils::BytesIn be) { + T v = 0; + for (auto &x : be) { + v = (v << 8) | x; + } + return v; + } + + template + qtils::BytesIn _bytes() { + constexpr auto base2 = base1 + kMaxPrefix1; + assert(base1 <= input_[0]); + if (input_[0] <= base2) { + auto n = input_[0] - base1; + _take(1); + return _take(n); + } + auto n1 = input_[0] - base2; + _take(1); + auto n2 = _uint(_take(n1)); + return _take(n2); + } + + bool is_list() const { + return not empty() and kListPrefix1 <= input_[0]; + } + + Decoder list() { + assert(not empty()); + return Decoder{_bytes()}; + } + + qtils::BytesIn bytes() { + assert(not empty()); + assert(input_[0] < kListPrefix1); + if (input_[0] < kBytesPrefix1) { + return _take(1); + } + return _bytes(); + } + + template + requires std::is_default_constructible_v + and requires(T t) { qtils::BytesOut{t}; } + T bytes_n() { + auto raw = bytes(); + T r; + assert(raw.size() == r.size()); + memcpy(r.data(), raw.data(), r.size()); + return r; + } + + template + qtils::ByteArr bytes_n() { + return bytes_n>(); + } + + std::string_view str() { + return qtils::byte2str(bytes()); + } + + void str(std::string_view expected) { + auto actual = str(); + assert(actual == expected); + } + + template + T uint() { + auto be = bytes(); + return _uint(be); + } + + void skip() { + if (is_list()) { + list(); + } else { + bytes(); + } + } + }; + + struct EncodeBuffer { + qtils::ByteArr<1 + 8> buffer_{}; + uint8_t size_ = 0; + + operator qtils::BytesIn() const { + return qtils::BytesIn{buffer_}.first(size_); + } + bool is_short() const { + return buffer_[0] < kBytesPrefix1; + } + template + void _uint(uint64_t v) { + auto n = sizeof(uint64_t) - std::countl_zero(v) / 8; + size_ = 1 + n; + buffer_[0] = base + n; + for (size_t i = 1; i < size_; ++i) { + buffer_[i] = v >> (8 * (n - i)); + } + } + template + void _bytes(size_t bytes) { + if (bytes <= kMaxPrefix1) { + size_ = 1; + buffer_[0] = base + bytes; + } else { + _uint(bytes); + } + } + }; + + struct EncodeUint : EncodeBuffer { + EncodeUint(uint64_t v) { + if (0 < v and v < kBytesPrefix1) { + size_ = 1; + buffer_[0] = v; + } else { + _uint(v); + } + } + }; + + struct EncodeBytes : EncodeBuffer { + EncodeBytes(qtils::BytesIn bytes) { + if (bytes.size() == 1 and bytes[0] < kBytesPrefix1) { + size_ = 1; + buffer_[0] = bytes[0]; + } else { + _bytes(bytes.size()); + } + } + }; + + struct EncodeList : EncodeBuffer { + EncodeList(size_t bytes) { + _bytes(bytes); + } + }; + + struct Encoder { + qtils::ByteVec output_; + + void uint(uint64_t v) { + output_.put(EncodeUint{v}); + } + + void bytes(qtils::BytesIn bytes) { + EncodeBytes prefix{bytes}; + output_.put(prefix); + if (not prefix.is_short()) { + output_.put(bytes); + } + } + + void str(std::string_view s) { + bytes(qtils::str2byte(s)); + } + + qtils::ByteVec list() { + qtils::ByteVec r; + r.put(EncodeList{output_.size()}); + r.put(output_); + return r; + } + }; +} // namespace lean::rlp + +namespace lean::enr { + libp2p::PeerId Enr::peerId() const { + return libp2p::peerIdFromSecp256k1(public_key); + } + + libp2p::Multiaddress Enr::listenAddress() const { + return libp2p::Multiaddress::create( + std::format("/ip4/0.0.0.0/udp/{}/quic-v1", port.value())) + .value(); + } + + libp2p::Multiaddress Enr::connectAddress() const { + auto &ip = this->ip.value(); + return libp2p::Multiaddress::create( + std::format("/ip4/{}.{}.{}.{}/udp/{}/quic-v1/p2p/{}", + ip[3], + ip[2], + ip[1], + ip[0], + port.value(), + peerId().toBase58())) + .value(); + } + + libp2p::PeerInfo Enr::connectInfo() const { + return {peerId(), {connectAddress()}}; + } + + Enr decode(std::string_view str) { + constexpr std::string_view s_enr{"enr:"}; + assert(str.starts_with(s_enr)); + str.remove_prefix(s_enr.size()); + auto rlp_bytes = cppcodec::base64_url_unpadded::decode(str); + rlp::Decoder rlp{rlp_bytes}; + rlp = rlp.list(); + Enr enr; + enr.signature = rlp.bytes_n(); + enr.sequence = rlp.uint(); + std::string_view key; + key = rlp.str(); + while (key != "id") { + rlp.skip(); + key = rlp.str(); + } + assert(key == "id"); + rlp.str("v4"); + key = rlp.str(); + if (key == "ip") { + enr.ip = rlp.bytes_n(); + key = rlp.str(); + } + assert(key == "secp256k1"); + enr.public_key = rlp.bytes_n(); + if (not rlp.empty()) { + rlp.str("udp"); + enr.port = rlp.uint(); + } + assert(rlp.empty()); + return enr; + } + + std::string encode(const Secp256k1PublicKey &public_key, Port port) { + Enr enr{Secp256k1Signature{}, 1, public_key, Ip{1, 0, 0, 127}, port}; + rlp::Encoder rlp; + rlp.bytes(enr.signature); + rlp.uint(enr.sequence); + rlp.str("id"); + rlp.str("v4"); + rlp.str("ip"); + rlp.bytes(enr.ip.value()); + rlp.str("secp256k1"); + rlp.bytes(enr.public_key); + rlp.str("udp"); + rlp.uint(enr.port.value()); + return "enr:" + cppcodec::base64_url_unpadded::encode(rlp.list()); + } +} // namespace lean::enr diff --git a/src/serde/enr.hpp b/src/serde/enr.hpp new file mode 100644 index 0000000..ce98780 --- /dev/null +++ b/src/serde/enr.hpp @@ -0,0 +1,38 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#include +#include +#include + +namespace lean::enr { + using Secp256k1Signature = qtils::ByteArr<64>; + using Sequence = uint64_t; + using Secp256k1PublicKey = qtils::ByteArr<33>; + using Ip = qtils::ByteArr<4>; + using Port = uint16_t; + + struct Enr { + Secp256k1Signature signature; + Sequence sequence; + Secp256k1PublicKey public_key; + std::optional ip; + std::optional port; + + libp2p::PeerId peerId() const; + libp2p::Multiaddress listenAddress() const; + libp2p::Multiaddress connectAddress() const; + libp2p::PeerInfo connectInfo() const; + }; + + Enr decode(std::string_view str); + + std::string encode(const Secp256k1PublicKey &public_key, Port port); +} // namespace lean::enr diff --git a/src/serde/serialization.hpp b/src/serde/serialization.hpp index 29bc561..3b9b016 100644 --- a/src/serde/serialization.hpp +++ b/src/serde/serialization.hpp @@ -57,4 +57,11 @@ namespace lean { } } + auto sszHash(const auto &v) { + auto hash1 = ssz::hash_tree_root(v); + qtils::ByteArr<32> hash2; + static_assert(hash1.size() == hash2.size()); + memcpy(hash2.data(), hash1.data(), hash1.size()); + return hash2; + } } // namespace lean diff --git a/src/serde/snappy.hpp b/src/serde/snappy.hpp new file mode 100644 index 0000000..5553a65 --- /dev/null +++ b/src/serde/snappy.hpp @@ -0,0 +1,54 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#include +#include + +namespace lean { + enum class SnappyError { + UNCOMPRESS_TOO_LONG, + UNCOMPRESS_INVALID, + }; + Q_ENUM_ERROR_CODE(SnappyError) { + using E = decltype(e); + switch (e) { + case E::UNCOMPRESS_TOO_LONG: + return "SnappyError::UNCOMPRESS_TOO_LONG"; + case E::UNCOMPRESS_INVALID: + return "SnappyError::UNCOMPRESS_INVALID"; + } + abort(); + } + + qtils::ByteVec snappyCompress(qtils::BytesIn input) { + std::string compressed; + snappy::Compress(qtils::byte2str(input.data()), input.size(), &compressed); + return qtils::ByteVec{qtils::str2byte(std::as_const(compressed))}; + } + + outcome::result snappyUncompress(qtils::BytesIn compressed, + size_t max_size = 4 << 20) { + size_t size = 0; + if (not snappy::GetUncompressedLength( + qtils::byte2str(compressed.data()), compressed.size(), &size)) { + return SnappyError::UNCOMPRESS_INVALID; + } + if (size > max_size) { + return SnappyError::UNCOMPRESS_TOO_LONG; + } + std::string uncompressed; + if (not snappy::Uncompress(qtils::byte2str(compressed.data()), + compressed.size(), + &uncompressed)) { + return SnappyError::UNCOMPRESS_INVALID; + } + return qtils::ByteVec{qtils::str2byte(std::as_const(uncompressed))}; + } +} // namespace lean diff --git a/src/types/block.hpp b/src/types/block.hpp index d9b689d..d759f0e 100644 --- a/src/types/block.hpp +++ b/src/types/block.hpp @@ -6,18 +6,42 @@ #pragma once -#include - #include "types/block_body.hpp" +#include "types/block_header.hpp" namespace lean { - - struct Block { + struct Block : ssz::ssz_container { uint64_t slot; uint64_t proposer_index; qtils::ByteArr<32> parent_root; qtils::ByteArr<32> state_root; BlockBody body; - }; + SSZ_CONT(slot, proposer_index, parent_root, state_root, body); + + BlockHeader getHeader() const { + BlockHeader header; + header.slot = slot; + header.proposer_index = proposer_index; + header.parent_root = parent_root; + header.state_root = state_root; + header.body_root = sszHash(body); + return header; + } + + std::optional hash_cached; + const BlockHash &hash() const { + return hash_cached.value(); + } + void setHash() { + BOOST_ASSERT(not hash_cached.has_value()); + auto header = getHeader(); + header.updateHash(); + hash_cached = header.hash(); + } + + BlockIndex slotHash() const { + return {slot, hash()}; + } + }; } // namespace lean diff --git a/src/types/block_hash.hpp b/src/types/block_hash.hpp new file mode 100644 index 0000000..105b371 --- /dev/null +++ b/src/types/block_hash.hpp @@ -0,0 +1,13 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +namespace lean { + using BlockHash = qtils::ByteArr<32>; +} // namespace lean diff --git a/src/types/block_header.hpp b/src/types/block_header.hpp index c1fe152..e316538 100644 --- a/src/types/block_header.hpp +++ b/src/types/block_header.hpp @@ -6,8 +6,6 @@ #pragma once -#include -#include #include #include @@ -50,21 +48,12 @@ namespace lean { return hash_opt.value(); } - void updateHash(const crypto::Hasher &hasher) const { - auto enc_res = encode(*this); - BOOST_ASSERT_MSG(enc_res.has_value(), - "Header should be encoded errorless"); - hash_opt.emplace(hasher.sha2_256(enc_res.value())); + void updateHash() const { + hash_opt = sszHash(*this); } BlockIndex index() const { return {slot, hash()}; } }; - - inline void calculateBlockHash(const BlockHeader &header, - const crypto::Hasher &hasher) { - header.updateHash(hasher); - } - } // namespace lean diff --git a/src/types/block_request.hpp b/src/types/block_request.hpp new file mode 100644 index 0000000..230f52e --- /dev/null +++ b/src/types/block_request.hpp @@ -0,0 +1,23 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "types/signed_block.hpp" + +namespace lean { + struct BlockRequest : ssz::ssz_container { + ssz::list blocks; + + SSZ_CONT(blocks); + }; + + struct BlockResponse : ssz::ssz_container { + ssz::list blocks; + + SSZ_CONT(blocks); + }; +} // namespace lean diff --git a/src/types/chekpoint.hpp b/src/types/checkpoint.hpp similarity index 100% rename from src/types/chekpoint.hpp rename to src/types/checkpoint.hpp diff --git a/src/types/signed_block.hpp b/src/types/signed_block.hpp index cd7024c..4996307 100644 --- a/src/types/signed_block.hpp +++ b/src/types/signed_block.hpp @@ -12,9 +12,11 @@ namespace lean { - struct SignedBlock { + struct SignedBlock : ssz::ssz_container { Block message; qtils::ByteArr<32> signature; + + SSZ_CONT(message, signature); }; } // namespace lean diff --git a/src/types/signed_vote.hpp b/src/types/signed_vote.hpp index 6501a47..2443c95 100644 --- a/src/types/signed_vote.hpp +++ b/src/types/signed_vote.hpp @@ -6,14 +6,18 @@ #pragma once +#include "types/vote.hpp" + namespace lean { - struct SignedVote { + struct SignedVote : ssz::ssz_container { Vote data; /// @note The signature type is still to be determined so Bytes32 is used in /// the interim. The actual signature size is expected to be a lot larger /// (~3 KiB). qtils::ByteArr<32> signature; + + SSZ_CONT(data, signature); }; } // namespace lean diff --git a/src/types/slot.hpp b/src/types/slot.hpp new file mode 100644 index 0000000..c511811 --- /dev/null +++ b/src/types/slot.hpp @@ -0,0 +1,13 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +namespace lean { + using Slot = uint64_t; +} // namespace lean diff --git a/src/types/status_message.hpp b/src/types/status_message.hpp new file mode 100644 index 0000000..e06fd15 --- /dev/null +++ b/src/types/status_message.hpp @@ -0,0 +1,19 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "types/checkpoint.hpp" + +namespace lean { + // https://github.com/leanEthereum/leanSpec/blob/main/docs/client/networking.md#status-v1 + struct StatusMessage : ssz::ssz_container { + Checkpoint finalized; + Checkpoint head; + + SSZ_CONT(finalized, head); + }; +} // namespace lean diff --git a/src/types/types.hpp b/src/types/types.hpp index 5d7e670..06810e6 100644 --- a/src/types/types.hpp +++ b/src/types/types.hpp @@ -11,6 +11,9 @@ #include #include +#include "types/block_hash.hpp" +#include "types/slot.hpp" + namespace lean { // stub types. must be refactored in future @@ -20,45 +23,13 @@ namespace lean { using OpaqueHash = qtils::ByteArr<32>; - using BlockHash = OpaqueHash; using HeaderHash = OpaqueHash; using StateRoot = OpaqueHash; using BodyRoot = OpaqueHash; - using Slot = uint64_t; using Epoch = uint64_t; // is needed? using ProposerIndex = uint64_t; - - // networking types - - using PeerId = qtils::Tagged; // STUB - - // /// Direction, in which to retrieve ordered data - // enum class Direction : uint8_t { - // /// from child to parent - // ASCENDING = 0, - // /// from parent to canonical child - // DESCENDING = 1 - // }; - // - // /// Request for blocks to another peer - // struct BlocksRequest { - // /// start from this block - // BlockIndex from{}; - // /// sequence direction - // Direction direction{}; - // /// maximum number of blocks to return; an implementation defined maximum - // is - // /// used when unspecified - // std::optional max{}; - // bool multiple_justifications = true; - // }; - // - // struct BlockAnnounce { - // BlockAnnounce(const BlockAnnounce &) = delete; - // }; - } // namespace lean template <> diff --git a/src/types/vote.hpp b/src/types/vote.hpp index 03662f9..e18dd48 100644 --- a/src/types/vote.hpp +++ b/src/types/vote.hpp @@ -6,7 +6,7 @@ #pragma once -#include +#include namespace lean { diff --git a/src/utils/__debug_env.hpp b/src/utils/__debug_env.hpp new file mode 100644 index 0000000..6f23f6b --- /dev/null +++ b/src/utils/__debug_env.hpp @@ -0,0 +1,26 @@ +#pragma once + +#include +#include + +// TODO(turuslan): config +inline size_t getPeerIndex() { + static size_t i = [] { + if (auto s = getenv("PeerIndex")) { + return std::stoul(s); + } else { + return 0ul; + } + }(); + return i; +} +inline size_t getValidatorCount() { + static size_t i = [] { + if (auto s = getenv("ValidatorCount")) { + return std::stoul(s); + } else { + return 1ul; + } + }(); + return i; +} diff --git a/src/utils/sample_peer.hpp b/src/utils/sample_peer.hpp new file mode 100644 index 0000000..f1cbf34 --- /dev/null +++ b/src/utils/sample_peer.hpp @@ -0,0 +1,25 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#include "serde/enr.hpp" + +namespace lean { + struct SamplePeer : libp2p::SamplePeer { + SamplePeer(size_t index) : libp2p::SamplePeer{makeSecp256k1(index)} {} + + std::string enr() const { + enr::Secp256k1PublicKey public_key; + assert(keypair.publicKey.data.size() == public_key.size()); + memcpy( + public_key.data(), keypair.publicKey.data.data(), public_key.size()); + return enr::encode(public_key, port); + } + }; +} // namespace lean diff --git a/tests/unit/blockchain/block_storage_test.cpp b/tests/unit/blockchain/block_storage_test.cpp index b97d91a..d9e40d8 100644 --- a/tests/unit/blockchain/block_storage_test.cpp +++ b/tests/unit/blockchain/block_storage_test.cpp @@ -231,8 +231,8 @@ TEST_F(BlockStorageTest, PutWithStorageError) { BlockHeader header; header.slot = 1; header.parent_root = genesis_block_hash; - header.body_root = {};//ssz::hash_tree_root(body); - header.updateHash(*hasher); + header.body_root = {}; // ssz::hash_tree_root(body); + header.updateHash(); BlockData block; block.header.emplace(header); @@ -240,11 +240,7 @@ TEST_F(BlockStorageTest, PutWithStorageError) { block.header->parent_root = genesis_block_hash; block.body.emplace(body); - auto encoded_header = ByteVec(encode(*block.header).value()); - ON_CALL(*hasher, sha2_256(encoded_header.view())) - .WillByDefault(Return(regular_block_hash)); - - ByteVec key{regular_block_hash}; + ByteVec key{block.header->hash()}; EXPECT_CALL(*spaces[Space::Body], put(key.view(), _)) .WillOnce(Return(lean::storage::StorageError::IO_ERROR)); diff --git a/vcpkg-overlay/cppcodec.cmake b/vcpkg-overlay/cppcodec.cmake new file mode 100644 index 0000000..a4f6666 --- /dev/null +++ b/vcpkg-overlay/cppcodec.cmake @@ -0,0 +1,3 @@ +find_path(CPPCODEC_INCLUDE_DIRS "cppcodec/base32_crockford.hpp") +add_library(cppcodec INTERFACE) +target_include_directories(cppcodec INTERFACE ${CPPCODEC_INCLUDE_DIRS}) diff --git a/vcpkg-overlay/leanp2p/portfile.cmake b/vcpkg-overlay/leanp2p/portfile.cmake new file mode 100644 index 0000000..da27cd1 --- /dev/null +++ b/vcpkg-overlay/leanp2p/portfile.cmake @@ -0,0 +1,10 @@ +vcpkg_check_linkage(ONLY_STATIC_LIBRARY) +vcpkg_from_github( + OUT_SOURCE_PATH SOURCE_PATH + REPO qdrvm/leanp2p + REF 29f1937bf74f2d9dfddcc3774d670240d131b9d4 + SHA512 66a4a9e329aadb031dc7274aa73c358b4fb2372af7d0a86043584571a65e7951454f78c7a3955cbe05dda9fbac3587a1108b3bdf63390e130d5ea7e806e59499 +) +vcpkg_cmake_configure(SOURCE_PATH "${SOURCE_PATH}") +vcpkg_cmake_install() +file(REMOVE_RECURSE "${CURRENT_PACKAGES_DIR}/debug/include") diff --git a/vcpkg-overlay/leanp2p/vcpkg.json b/vcpkg-overlay/leanp2p/vcpkg.json new file mode 100644 index 0000000..5f60445 --- /dev/null +++ b/vcpkg-overlay/leanp2p/vcpkg.json @@ -0,0 +1,24 @@ +{ + "name": "leanp2p", + "version": "0.1.0", + "dependencies": [ + "vcpkg-cmake", + "vcpkg-cmake-config", + + "boost-asio", + "boost-beast", + "boost-di", + "boost-filesystem", + "boost-program-options", + "boost-random", + "boost-signals2", + "fmt", + "liblsquic", + "libsecp256k1", + "protobuf", + "qtils", + "soralog", + "tsl-hat-trie", + "zlib" + ] +} diff --git a/vcpkg-overlay/liblsquic/disable-asan.patch b/vcpkg-overlay/liblsquic/disable-asan.patch new file mode 100644 index 0000000..2b05d0e --- /dev/null +++ b/vcpkg-overlay/liblsquic/disable-asan.patch @@ -0,0 +1,23 @@ +diff --git a/CMakeLists.txt b/CMakeLists.txt +index 65c4776..5d4086a 100644 +--- a/CMakeLists.txt ++++ b/CMakeLists.txt +@@ -60,12 +60,12 @@ ENDIF() + + IF(CMAKE_BUILD_TYPE STREQUAL "Debug") + SET(MY_CMAKE_FLAGS "${MY_CMAKE_FLAGS} -O0 -g3") +- IF(CMAKE_C_COMPILER MATCHES "clang" AND +- NOT "$ENV{TRAVIS}" MATCHES "^true$" AND +- NOT "$ENV{EXTRA_CFLAGS}" MATCHES "-fsanitize") +- SET(MY_CMAKE_FLAGS "${MY_CMAKE_FLAGS} -fsanitize=address") +- SET(LIBS ${LIBS} -fsanitize=address) +- ENDIF() ++ # IF(CMAKE_C_COMPILER MATCHES "clang" AND ++ # NOT "$ENV{TRAVIS}" MATCHES "^true$" AND ++ # NOT "$ENV{EXTRA_CFLAGS}" MATCHES "-fsanitize") ++ # SET(MY_CMAKE_FLAGS "${MY_CMAKE_FLAGS} -fsanitize=address") ++ # SET(LIBS ${LIBS} -fsanitize=address) ++ # ENDIF() + # Uncomment to enable cleartext protocol mode (no crypto): + #SET (MY_CMAKE_FLAGS "${MY_CMAKE_FLAGS} -DLSQUIC_ENABLE_HANDSHAKE_DISABLE=1") + ELSE() diff --git a/vcpkg-overlay/liblsquic/fix-found-boringssl.patch b/vcpkg-overlay/liblsquic/fix-found-boringssl.patch new file mode 100644 index 0000000..a3a632c --- /dev/null +++ b/vcpkg-overlay/liblsquic/fix-found-boringssl.patch @@ -0,0 +1,53 @@ +diff --git a/CMakeLists.txt b/CMakeLists.txt +index 5d4086a..e085a83 100644 +--- a/CMakeLists.txt ++++ b/CMakeLists.txt +@@ -120,10 +120,12 @@ IF(CMAKE_BUILD_TYPE STREQUAL "Debug") + SET(MY_CMAKE_FLAGS "${MY_CMAKE_FLAGS} -Od") + #SET (MY_CMAKE_FLAGS "${MY_CMAKE_FLAGS} -DFIU_ENABLE=1") + #SET(LIBS ${LIBS} fiu) ++ SET(LIB_NAME ssld cryptod) + ELSE() + SET(MY_CMAKE_FLAGS "${MY_CMAKE_FLAGS} -Ox") + # Comment out the following line to compile out debug messages: + #SET(MY_CMAKE_FLAGS "${MY_CMAKE_FLAGS} -DLSQUIC_LOWEST_LOG_LEVEL=LSQ_LOG_INFO") ++ SET(LIB_NAME ssl crypto) + ENDIF() + + ENDIF() #MSVC +@@ -191,7 +193,7 @@ IF (NOT DEFINED BORINGSSL_LIB AND DEFINED BORINGSSL_DIR) + ELSE() + + +- FOREACH(LIB_NAME ssl crypto) ++ FOREACH(LIB ${LIB_NAME}) + # If BORINGSSL_LIB is defined, try find each lib. Otherwise, user should define BORINGSSL_LIB_ssl, + # BORINGSSL_LIB_crypto and so on explicitly. For example, including boringssl and lsquic both via + # add_subdirectory: +@@ -201,20 +203,20 @@ ELSE() + # add_subdirectory(third_party/lsquic) + IF (DEFINED BORINGSSL_LIB) + IF (CMAKE_SYSTEM_NAME STREQUAL Windows) +- FIND_LIBRARY(BORINGSSL_LIB_${LIB_NAME} +- NAMES ${LIB_NAME} ++ FIND_LIBRARY(BORINGSSL_LIB_${LIB} ++ NAMES ${LIB} + PATHS ${BORINGSSL_LIB} + PATH_SUFFIXES Debug Release MinSizeRel RelWithDebInfo + NO_DEFAULT_PATH) + ELSE() +- FIND_LIBRARY(BORINGSSL_LIB_${LIB_NAME} +- NAMES lib${LIB_NAME}${LIB_SUFFIX} ++ FIND_LIBRARY(BORINGSSL_LIB_${LIB} ++ NAMES lib${LI}${LIB_SUFFIX} + PATHS ${BORINGSSL_LIB} +- PATH_SUFFIXES ${LIB_NAME} ++ PATH_SUFFIXES ${LIB} + NO_DEFAULT_PATH) + ENDIF() + ENDIF() +- IF(BORINGSSL_LIB_${LIB_NAME}) ++ IF(BORINGSSL_LIB_${LIB}) + MESSAGE(STATUS "Found ${LIB_NAME} library: ${BORINGSSL_LIB_${LIB_NAME}}") + ELSE() + MESSAGE(FATAL_ERROR "BORINGSSL_LIB_${LIB_NAME} library not found") diff --git a/vcpkg-overlay/liblsquic/lsquic_conn_ssl.patch b/vcpkg-overlay/liblsquic/lsquic_conn_ssl.patch new file mode 100644 index 0000000..ae7be54 --- /dev/null +++ b/vcpkg-overlay/liblsquic/lsquic_conn_ssl.patch @@ -0,0 +1,80 @@ +diff --git a/include/lsquic.h b/include/lsquic.h +index 389fbcc..c38d027 100644 +--- a/include/lsquic.h ++++ b/include/lsquic.h +@@ -1671,6 +1671,10 @@ int lsquic_stream_close(lsquic_stream_t *s); + int + lsquic_stream_has_unacked_data (lsquic_stream_t *s); + ++/* Return SSL object associated with this connection */ ++struct ssl_st * ++lsquic_conn_ssl(struct lsquic_conn *conn); ++ + /** + * Get certificate chain returned by the server. This can be used for + * server certificate verification. +diff --git a/src/liblsquic/lsquic_conn.c b/src/liblsquic/lsquic_conn.c +index f76550d..31e5285 100644 +--- a/src/liblsquic/lsquic_conn.c ++++ b/src/liblsquic/lsquic_conn.c +@@ -128,6 +128,12 @@ lsquic_conn_crypto_alg_keysize (const lsquic_conn_t *lconn) + } + + ++struct ssl_st * ++lsquic_conn_ssl(struct lsquic_conn *lconn) { ++ return lconn->cn_esf_c->esf_get_ssl(lconn->cn_enc_session); ++} ++ ++ + struct stack_st_X509 * + lsquic_conn_get_server_cert_chain (struct lsquic_conn *lconn) + { +diff --git a/src/liblsquic/lsquic_enc_sess.h b/src/liblsquic/lsquic_enc_sess.h +index f45c15f..3505fbd 100644 +--- a/src/liblsquic/lsquic_enc_sess.h ++++ b/src/liblsquic/lsquic_enc_sess.h +@@ -115,6 +115,9 @@ struct enc_session_funcs_common + (*esf_decrypt_packet)(enc_session_t *, struct lsquic_engine_public *, + const struct lsquic_conn *, struct lsquic_packet_in *); + ++ struct ssl_st * ++ (*esf_get_ssl)(enc_session_t *); ++ + struct stack_st_X509 * + (*esf_get_server_cert_chain) (enc_session_t *); + +diff --git a/src/liblsquic/lsquic_enc_sess_ietf.c b/src/liblsquic/lsquic_enc_sess_ietf.c +index 66329c1..076c4c5 100644 +--- a/src/liblsquic/lsquic_enc_sess_ietf.c ++++ b/src/liblsquic/lsquic_enc_sess_ietf.c +@@ -2519,6 +2519,13 @@ iquic_esf_global_cleanup (void) + } + + ++static struct ssl_st * ++iquic_esf_get_ssl(enc_session_t *enc_session_p) { ++ struct enc_sess_iquic *const enc_sess = enc_session_p; ++ return enc_sess->esi_ssl; ++} ++ ++ + static struct stack_st_X509 * + iquic_esf_get_server_cert_chain (enc_session_t *enc_session_p) + { +@@ -2744,6 +2751,7 @@ const struct enc_session_funcs_common lsquic_enc_session_common_ietf_v1 = + .esf_global_cleanup = iquic_esf_global_cleanup, + .esf_global_init = iquic_esf_global_init, + .esf_tag_len = IQUIC_TAG_LEN, ++ .esf_get_ssl = iquic_esf_get_ssl, + .esf_get_server_cert_chain + = iquic_esf_get_server_cert_chain, + .esf_get_sni = iquic_esf_get_sni, +@@ -2763,6 +2771,7 @@ const struct enc_session_funcs_common lsquic_enc_session_common_ietf_v1_no_flush + .esf_global_cleanup = iquic_esf_global_cleanup, + .esf_global_init = iquic_esf_global_init, + .esf_tag_len = IQUIC_TAG_LEN, ++ .esf_get_ssl = iquic_esf_get_ssl, + .esf_get_server_cert_chain + = iquic_esf_get_server_cert_chain, + .esf_get_sni = iquic_esf_get_sni, diff --git a/vcpkg-overlay/liblsquic/portfile.cmake b/vcpkg-overlay/liblsquic/portfile.cmake new file mode 100644 index 0000000..3602c59 --- /dev/null +++ b/vcpkg-overlay/liblsquic/portfile.cmake @@ -0,0 +1,78 @@ +if(VCPKG_TARGET_IS_WINDOWS) + # The lib uses CMAKE_WINDOWS_EXPORT_ALL_SYMBOLS, at least until + # https://github.com/litespeedtech/lsquic/pull/371 or similar is merged + vcpkg_check_linkage(ONLY_STATIC_LIBRARY) +endif() + +vcpkg_from_github(OUT_SOURCE_PATH SOURCE_PATH + REPO litespeedtech/lsquic + REF v${VERSION} + SHA512 40d742779bfa2dc6fdaf0ee8e9349498d373dcffcc6dd27867c18d87309a288ea6811d693043b5d98364d816b818b49445214497475844201241193c0f37b349 + HEAD_REF master + PATCHES + disable-asan.patch + fix-found-boringssl.patch + lsquic_conn_ssl.patch +) + +# Submodules +vcpkg_from_github(OUT_SOURCE_PATH LSQPACK_SOURCE_PATH + REPO litespeedtech/ls-qpack + REF v2.5.3 + HEAD_REF master + SHA512 f90502c763abc84532f33d1b8f952aea7869e4e0c5f6bd344532ddd51c4a180958de4086d88b9ec96673a059c806eec9e70007651d4d4e1a73395919dee47ce0 +) +if(NOT EXISTS "${SOURCE_PATH}/src/ls-hpack/CMakeLists.txt") + file(REMOVE_RECURSE "${SOURCE_PATH}/src/liblsquic/ls-qpack") + file(RENAME "${LSQPACK_SOURCE_PATH}" "${SOURCE_PATH}/src/liblsquic/ls-qpack") +endif() + +vcpkg_from_github(OUT_SOURCE_PATH LSHPACK_SOURCE_PATH + REPO litespeedtech/ls-hpack + REF v2.3.2 + HEAD_REF master + SHA512 45d6c8296e8eee511e6a083f89460d5333fc9a49bc078dac55fdec6c46db199de9f150379f02e054571f954a5e3c79af3864dbc53dc57d10a8d2ed26a92d4278 +) +if(NOT EXISTS "${SOURCE_PATH}/src/lshpack/CMakeLists.txt") + file(REMOVE_RECURSE "${SOURCE_PATH}/src/lshpack") + file(RENAME "${LSHPACK_SOURCE_PATH}" "${SOURCE_PATH}/src/lshpack") +endif() + +# Configuration +vcpkg_find_acquire_program(PERL) + +string(COMPARE EQUAL "${VCPKG_LIBRARY_LINKAGE}" "dynamic" LSQUIC_SHARED_LIB) + +vcpkg_cmake_configure( + SOURCE_PATH "${SOURCE_PATH}" + OPTIONS + "-DPERL=${PERL}" + "-DPERL_EXECUTABLE=${PERL}" + "-DLSQUIC_SHARED_LIB=${LSQUIC_SHARED_LIB}" + "-DBORINGSSL_INCLUDE=${CURRENT_INSTALLED_DIR}/include" + -DLSQUIC_BIN=OFF + -DLSQUIC_TESTS=OFF + OPTIONS_RELEASE + "-DBORINGSSL_LIB=${CURRENT_INSTALLED_DIR}/lib" + OPTIONS_DEBUG + "-DBORINGSSL_LIB=${CURRENT_INSTALLED_DIR}/debug/lib" + -DLSQUIC_DEVEL=ON +) + +vcpkg_cmake_install() +if(VCPKG_TARGET_IS_WINDOWS) + # Upstream removed installation of this header after merging changes + file(INSTALL "${SOURCE_PATH}/wincompat/vc_compat.h" DESTINATION "${CURRENT_INSTALLED_DIR}/include/lsquic") +endif() + +vcpkg_cmake_config_fixup(PACKAGE_NAME lsquic) + +# Concatenate license files and install +vcpkg_install_copyright(FILE_LIST + "${SOURCE_PATH}/LICENSE" + "${SOURCE_PATH}/LICENSE.chrome" +) + +# Remove duplicated include directory +file(REMOVE_RECURSE "${CURRENT_PACKAGES_DIR}/debug/include") + diff --git a/vcpkg-overlay/liblsquic/vcpkg.json b/vcpkg-overlay/liblsquic/vcpkg.json new file mode 100644 index 0000000..ec90032 --- /dev/null +++ b/vcpkg-overlay/liblsquic/vcpkg.json @@ -0,0 +1,25 @@ +{ + "name": "liblsquic", + "version": "3.3.2", + "port-version": 1, + "description": "An implementation of the QUIC and HTTP/3 protocols.", + "homepage": "https://github.com/litespeedtech/lsquic", + "license": "MIT AND BSD-3-Clause", + "supports": "!x86", + "dependencies": [ + "boringssl", + { + "name": "getopt", + "platform": "windows" + }, + { + "name": "vcpkg-cmake", + "host": true + }, + { + "name": "vcpkg-cmake-config", + "host": true + }, + "zlib" + ] +} diff --git a/vcpkg-overlay/libsecp256k1/portfile.cmake b/vcpkg-overlay/libsecp256k1/portfile.cmake new file mode 100644 index 0000000..bfbc93c --- /dev/null +++ b/vcpkg-overlay/libsecp256k1/portfile.cmake @@ -0,0 +1,11 @@ +vcpkg_check_linkage(ONLY_STATIC_LIBRARY) +vcpkg_from_github( + OUT_SOURCE_PATH SOURCE_PATH + REPO qdrvm/libsecp256k1 + REF 4370b9c336f86457c0b7bb97cd1ac6a281d951fa + SHA512 ed4660a66d8d74d5d5a27e54a247fe89d0fab4a5618ace27fe384690eebe296b89ababb7eb8a0184d64c339a27c7882306ecefb3fc8bf8c554ca3e244df627e5 +) +vcpkg_cmake_configure(SOURCE_PATH "${SOURCE_PATH}") +vcpkg_cmake_install() +#vcpkg_cmake_config_fixup(PACKAGE_NAME "libsecp256k1") +file(REMOVE_RECURSE "${CURRENT_PACKAGES_DIR}/debug/include") diff --git a/vcpkg-overlay/libsecp256k1/vcpkg.json b/vcpkg-overlay/libsecp256k1/vcpkg.json new file mode 100644 index 0000000..8143946 --- /dev/null +++ b/vcpkg-overlay/libsecp256k1/vcpkg.json @@ -0,0 +1,8 @@ +{ + "name": "libsecp256k1", + "version": "0.5.1", + "dependencies": [ + { "name": "vcpkg-cmake", "host": true }, + { "name": "vcpkg-cmake-config", "host": true } + ] +} diff --git a/vcpkg-overlay/qtils/portfile.cmake b/vcpkg-overlay/qtils/portfile.cmake index fa16bea..5e21bab 100644 --- a/vcpkg-overlay/qtils/portfile.cmake +++ b/vcpkg-overlay/qtils/portfile.cmake @@ -2,10 +2,8 @@ vcpkg_check_linkage(ONLY_STATIC_LIBRARY) vcpkg_from_github( OUT_SOURCE_PATH SOURCE_PATH REPO qdrvm/qtils - REF refs/tags/v0.1.3 - SHA512 09e759f82ce273b602ec851ed7b5bb5fe1e0471a35a9874d04190946a38b8cf5dcea0923af91db798c53c01970c7bfe3452fb956efd66e067ecbec3e0c99fb02 - PATCHES - vcpkg.patch + REF 4eb3f8024817d66932cec0c52e74e127c137a78a + SHA512 c02b90803a1cbf09dcb0e4707c84b3afdc83449d12ad1771e2918a3cdb40b8d01bda4f93fcb50491e35593fd060ec53c8a4b0b425dbb3df936a32312e5b99859 ) vcpkg_cmake_configure(SOURCE_PATH "${SOURCE_PATH}") vcpkg_cmake_install() diff --git a/vcpkg-overlay/sszpp/compute_hashtree_size_inline.patch b/vcpkg-overlay/sszpp/compute_hashtree_size_inline.patch deleted file mode 100644 index 69947e4..0000000 --- a/vcpkg-overlay/sszpp/compute_hashtree_size_inline.patch +++ /dev/null @@ -1,13 +0,0 @@ -diff --git a/lib/merkleize.hpp b/lib/merkleize.hpp -index d64c089..c6d0012 100644 ---- a/lib/merkleize.hpp -+++ b/lib/merkleize.hpp -@@ -100,7 +100,7 @@ namespace _detail { - * It is faster to loop over computing the actual size than overestimating using logarithms and allocating unnecessary - * memory - */ --auto compute_hashtree_size(std::size_t chunk_count, std::size_t depth) { -+inline auto compute_hashtree_size(std::size_t chunk_count, std::size_t depth) { - std::size_t ret{}; - if (depth == 0) return BYTES_PER_CHUNK; - while (depth > 0) { diff --git a/vcpkg-overlay/sszpp/portfile.cmake b/vcpkg-overlay/sszpp/portfile.cmake index 8e0769a..09b05c4 100644 --- a/vcpkg-overlay/sszpp/portfile.cmake +++ b/vcpkg-overlay/sszpp/portfile.cmake @@ -5,7 +5,6 @@ vcpkg_from_github( SHA512 a5abea3ad6a1d706428886acfea2a8990623925d5488b23b86c027179282aed0b98928317e946cfb2cbc27e3e230550e6a728cd9e888c6e54b69752b503bf6c9 PATCHES vcpkg.patch - compute_hashtree_size_inline.patch ) set(VCPKG_BUILD_TYPE release) # header-only port vcpkg_cmake_configure( diff --git a/vcpkg-overlay/sszpp/vcpkg.patch b/vcpkg-overlay/sszpp/vcpkg.patch index 23fc722..738b9e6 100644 --- a/vcpkg-overlay/sszpp/vcpkg.patch +++ b/vcpkg-overlay/sszpp/vcpkg.patch @@ -119,6 +119,28 @@ index 3174735..b086711 100644 }); } +diff --git a/lib/container.hpp b/lib/container.hpp +index dc31773..3e42de7 100644 +--- a/lib/container.hpp ++++ b/lib/container.hpp +@@ -46,7 +46,7 @@ struct is_ssz_object : std::true_type {}; + // Serialization + #define SSZ_CONT(...) \ + constexpr std::size_t ssz_size() const noexcept { return ssz::compute_total_length(__VA_ARGS__); } \ +- constexpr void serialize(ssz::ssz_iterator auto result) const { ssz::serialize(result, __VA_ARGS__); } \ ++ constexpr void serialize(ssz::ssz_iterator auto result) const { ssz::serialize_container(result, __VA_ARGS__); } \ + constexpr void deserialize(const std::ranges::sized_range auto &bytes) { \ + ssz::deserialize_container(bytes, __VA_ARGS__); \ + } \ +@@ -75,7 +75,7 @@ constexpr std::uint32_t compute_total_length(const ssz_object auto &...members) + return (... + size_plus_placeholder(members)); + } + +-constexpr void serialize(ssz_iterator auto result, const ssz_object auto &...members) { ++constexpr void serialize_container(ssz_iterator auto result, const ssz_object auto &...members) { + auto fsize = compute_fixed_length(members...); + auto variable = result + fsize; + auto begin = result; diff --git a/lib/cxx23/ranges/chunk.hpp b/lib/cxx23/ranges/chunk.hpp new file mode 100644 index 0000000..4c73bd2 @@ -572,3 +594,16 @@ index aaf75ef..e2c596e 100644 for (size_t j = 0; j < CHAR_BIT && CHAR_BIT * i + j < N; ++j) if (std::to_integer(b >> j) & 1) ret.set(CHAR_BIT * i + j); +diff --git a/lib/merkleize.hpp b/lib/merkleize.hpp +index d64c089..c6d0012 100644 +--- a/lib/merkleize.hpp ++++ b/lib/merkleize.hpp +@@ -100,7 +100,7 @@ namespace _detail { + * It is faster to loop over computing the actual size than overestimating using logarithms and allocating unnecessary + * memory + */ +-auto compute_hashtree_size(std::size_t chunk_count, std::size_t depth) { ++inline auto compute_hashtree_size(std::size_t chunk_count, std::size_t depth) { + std::size_t ret{}; + if (depth == 0) return BYTES_PER_CHUNK; + while (depth > 0) { diff --git a/vcpkg-overlay/tsl-hat-trie/portfile.cmake b/vcpkg-overlay/tsl-hat-trie/portfile.cmake new file mode 100644 index 0000000..597fada --- /dev/null +++ b/vcpkg-overlay/tsl-hat-trie/portfile.cmake @@ -0,0 +1,11 @@ +vcpkg_check_linkage(ONLY_STATIC_LIBRARY) +vcpkg_from_github( + OUT_SOURCE_PATH SOURCE_PATH + REPO masterjedy/hat-trie + REF 4fdfc75e75276185eed4b748ea09671601101b8e + SHA512 1f8e216037d06909a80dc89550a667cb1a8c64270c91b0ea5585c98f318fdbfe863a9766c9fadfb3da581b248fcd6b6b13576a2f855c61b7587516c38947c457 +) +vcpkg_cmake_configure(SOURCE_PATH "${SOURCE_PATH}") +vcpkg_cmake_install() +#vcpkg_cmake_config_fixup(PACKAGE_NAME "tsl-hat-trie") +file(REMOVE_RECURSE "${CURRENT_PACKAGES_DIR}/debug/include") diff --git a/vcpkg-overlay/tsl-hat-trie/vcpkg.json b/vcpkg-overlay/tsl-hat-trie/vcpkg.json new file mode 100644 index 0000000..0ad4d7b --- /dev/null +++ b/vcpkg-overlay/tsl-hat-trie/vcpkg.json @@ -0,0 +1,8 @@ +{ + "name": "tsl-hat-trie", + "version": "1.0.0", + "dependencies": [ + { "name": "vcpkg-cmake", "host": true }, + { "name": "vcpkg-cmake-config", "host": true } + ] +} diff --git a/vcpkg.json b/vcpkg.json index 1f82dc9..33a8a43 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -2,17 +2,19 @@ "name": "qlean-mini", "version": "0.0.1", "dependencies": [ - "qtils", + "boost-asio", + "boost-beast", "boost-di", "boost-program-options", "boost-property-tree", - "soralog", - "openssl", + "cppcodec", + "leanp2p", "prometheus-cpp", - "boost-asio", + "qtils", "rocksdb", - "sszpp", - "boost-beast" + "snappy", + "soralog", + "sszpp" ], "features": { "test": { "description": "Test", "dependencies": ["gtest"]}