Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions DEPRECATED.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
Module system uses subscriptions and messages.\
For example:

```c++
struct Message { ... };

struct IModule {
// Send `Message` to subscription
virtual void dispatchMessage(std::shared_ptr<const Message> message) = 0;

// Received `Message` from subscription
virtual void onMessage(std::shared_ptr<const Message> message) = 0;
};
struct Module : IModule {
SimpleSubscription<Message> subscription_message_;

void start() {
subscription_message_.subscribe(*se_manager_, module_internal);
}

void dispatchMessage(std::shared_ptr<const Message> message) override {
log("dispatch Message");
dispatchDerive(subscription, message);
}

void onMessage(std::shared_ptr<const Message> message) override;
};
void Module::onMessage(std::shared_ptr<const Message> message) {
log("received Message");
}
```

That code may look like boilerplate and duplicate message type name,\
but don't try to simplify it using macro.\
The following code is forbidden and will not pass PR review.

```c++
struct Message { ... };

struct IModule {
// Send `Message` to subscription
VIRTUAL_DISPATCH(Message);

// Received `Message` from subscription
VIRTUAL_ON_DISPATCH(Message);
};
struct Module : IModule {
ON_DISPATCH_SUBSCRIPTION(Message);

void start() {
ON_DISPATCH_SUBSCRIBE(Message);
}

DISPATCH_OVERRIDE(Message) {
log("dispatch Message");
dispatchDerive(subscription, message);
}

ON_DISPATCH_OVERRIDE(Message);
};
ON_DISPATCH_IMPL(Module, Message) {
log("received Message");
}
```

Don't try to preserve type name case in variable or member name.\
Yes, it breaks relation between type name and field name.\
Yes, you need to change case manually.\
The following code is forbidden and will not pass PR review.

```c++
SimpleSubscription<Message> subscription_Message_;
```

Don't try to separate type name in function name.\
The following code is forbidden and will not pass PR review.

```c++
void dispatch_Message(std::shared_ptr<const Message> message);
```
6 changes: 0 additions & 6 deletions src/blockchain/block_tree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,6 @@ namespace lean::blockchain {
*/
[[nodiscard]] virtual BlockIndex lastFinalized() const = 0;

/**
* Get message for "/leanconsensus/req/status/1/ssz_snappy" protocol.
* Returns hash and slot for finalized and best blocks.
*/
virtual StatusMessage getStatusMessage() const = 0;

/**
* Get `SignedBlock` for "/leanconsensus/req/blocks_by_root/1/ssz_snappy"
* protocol.
Expand Down
10 changes: 0 additions & 10 deletions src/blockchain/impl/block_tree_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "types/block.hpp"
#include "types/block_header.hpp"
#include "types/signed_block.hpp"
#include "types/status_message.hpp"

namespace lean::blockchain {
BlockTreeImpl::SafeBlockTreeData::SafeBlockTreeData(BlockTreeData data)
Expand Down Expand Up @@ -787,15 +786,6 @@ namespace lean::blockchain {
[&](const BlockTreeData &p) { return getLastFinalizedNoLock(p); });
}

StatusMessage BlockTreeImpl::getStatusMessage() const {
auto finalized = lastFinalized();
auto head = bestBlock();
return StatusMessage{
.finalized = {.root = finalized.hash, .slot = finalized.slot},
.head = {.root = head.hash, .slot = head.slot},
};
}

outcome::result<std::optional<SignedBlock>> BlockTreeImpl::tryGetSignedBlock(
const BlockHash block_hash) const {
auto header_res = getBlockHeader(block_hash);
Expand Down
1 change: 0 additions & 1 deletion src/blockchain/impl/block_tree_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ namespace lean::blockchain {

BlockIndex lastFinalized() const override;

StatusMessage getStatusMessage() const override;
outcome::result<std::optional<SignedBlock>> tryGetSignedBlock(
const BlockHash block_hash) const override;
void import(std::vector<SignedBlock> blocks) override;
Expand Down
18 changes: 12 additions & 6 deletions src/loaders/impl/networking_loader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,14 @@ namespace lean::loaders {

std::shared_ptr<BaseSubscriber<qtils::Empty>> on_loading_finished_;

ON_DISPATCH_SUBSCRIPTION(SendSignedBlock);
ON_DISPATCH_SUBSCRIPTION(SendSignedVote);
SimpleSubscription<messages::SendSignedBlock,
modules::Networking,
&modules::Networking::onSendSignedBlock>
subscription_send_signed_block_;
SimpleSubscription<messages::SendSignedVote,
modules::Networking,
&modules::Networking::onSendSignedVote>
subscription_send_signed_vote_;

public:
NetworkingLoader(std::shared_ptr<log::LoggingSystem> logsys,
Expand Down Expand Up @@ -89,8 +95,8 @@ namespace lean::loaders {
}
});

ON_DISPATCH_SUBSCRIBE(SendSignedBlock);
ON_DISPATCH_SUBSCRIBE(SendSignedVote);
subscription_send_signed_block_.subscribe(*se_manager_, module_internal);
subscription_send_signed_vote_.subscribe(*se_manager_, module_internal);

se_manager_->notify(lean::EventTypes::NetworkingIsLoaded);
}
Expand All @@ -107,7 +113,7 @@ namespace lean::loaders {
se_manager_->notify(lean::EventTypes::PeerDisconnected, msg);
}

void dispatch_StatusMessageReceived(
void dispatchStatusMessageReceived(
std::shared_ptr<const messages::StatusMessageReceived> message)
override {
SL_TRACE(logger_,
Expand All @@ -118,7 +124,7 @@ namespace lean::loaders {
dispatchDerive(*se_manager_, message);
}

void dispatch_SignedVoteReceived(
void dispatchSignedVoteReceived(
std::shared_ptr<const messages::SignedVoteReceived> message) override {
SL_TRACE(logger_, "Dispatch SignedVoteReceived");
dispatchDerive(*se_manager_, message);
Expand Down
2 changes: 1 addition & 1 deletion src/loaders/impl/production_loader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ namespace lean::loaders {
se_manager_->notify(EventTypes::BlockProduced, msg);
}

void dispatch_SendSignedBlock(
void dispatchSendSignedBlock(
std::shared_ptr<const messages::SendSignedBlock> message) override {
dispatchDerive(*se_manager_, message);
}
Expand Down
4 changes: 3 additions & 1 deletion src/modules/networking/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

add_lean_module(networking
SOURCE
block_request_protocol.cpp
networking.cpp
status_protocol.cpp
INCLUDE_DIRS
${CMAKE_SOURCE_DIR}
${CMAKE_SOURCE_DIR}/src
Expand All @@ -19,4 +21,4 @@ add_lean_module(networking
Snappy::snappy
soralog::soralog
sszpp
)
)
72 changes: 72 additions & 0 deletions src/modules/networking/block_request_protocol.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#include "modules/networking/block_request_protocol.hpp"

#include <libp2p/basic/read_varint.hpp>
#include <libp2p/basic/write_varint.hpp>
#include <libp2p/coro/spawn.hpp>
#include <libp2p/host/basic_host.hpp>

#include "blockchain/block_tree.hpp"
#include "modules/networking/ssz_snappy.hpp"

namespace lean::modules {
BlockRequestProtocol::BlockRequestProtocol(
std::shared_ptr<boost::asio::io_context> io_context,
std::shared_ptr<libp2p::host::BasicHost> host,
qtils::SharedRef<blockchain::BlockTree> block_tree)
: io_context_{std::move(io_context)},
host_{std::move(host)},
block_tree_{std::move(block_tree)} {}

libp2p::StreamProtocols BlockRequestProtocol::getProtocolIds() const {
return {"/leanconsensus/req/blocks_by_root/1/ssz_snappy"};
}

void BlockRequestProtocol::handle(std::shared_ptr<libp2p::Stream> stream) {
libp2p::coroSpawn(
*io_context_,
[self{shared_from_this()}, stream]() -> libp2p::Coro<void> {
std::ignore = co_await self->coroRespond(stream);
});
}

void BlockRequestProtocol::start() {
host_->listenProtocol(shared_from_this());
}

libp2p::CoroOutcome<BlockResponse> BlockRequestProtocol::request(
libp2p::PeerId peer_id, BlockRequest request) {
BOOST_OUTCOME_CO_TRY(auto stream,
co_await host_->newStream(peer_id, getProtocolIds()));
BOOST_OUTCOME_CO_TRY(
co_await libp2p::writeVarintMessage(stream, encodeSszSnappy(request)));
qtils::ByteVec encoded;
BOOST_OUTCOME_CO_TRY(co_await libp2p::readVarintMessage(stream, encoded));
BOOST_OUTCOME_CO_TRY(auto response,
decodeSszSnappy<BlockResponse>(encoded));
co_return response;
}

libp2p::CoroOutcome<void> BlockRequestProtocol::coroRespond(
std::shared_ptr<libp2p::Stream> stream) {
qtils::ByteVec encoded;
BOOST_OUTCOME_CO_TRY(co_await libp2p::readVarintMessage(stream, encoded));
BOOST_OUTCOME_CO_TRY(auto request, decodeSszSnappy<BlockRequest>(encoded));
BlockResponse response;
for (auto &block_hash : request.blocks) {
BOOST_OUTCOME_CO_TRY(auto block,
block_tree_->tryGetSignedBlock(block_hash));
if (block.has_value()) {
response.blocks.push_back(std::move(block.value()));
}
}
BOOST_OUTCOME_CO_TRY(
co_await libp2p::writeVarintMessage(stream, encodeSszSnappy(response)));
co_return outcome::success();
}
} // namespace lean::modules
54 changes: 54 additions & 0 deletions src/modules/networking/block_request_protocol.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <memory>

#include <libp2p/protocol/base_protocol.hpp>
#include <qtils/shared_ref.hpp>

#include "modules/networking/types.hpp"

namespace boost::asio {
class io_context;
} // namespace boost::asio

namespace libp2p::host {
class BasicHost;
} // namespace libp2p::host

namespace lean::blockchain {
class BlockTree;
} // namespace lean::blockchain

namespace lean::modules {
class BlockRequestProtocol
: public std::enable_shared_from_this<BlockRequestProtocol>,
public libp2p::protocol::BaseProtocol {
public:
BlockRequestProtocol(std::shared_ptr<boost::asio::io_context> io_context,
std::shared_ptr<libp2p::host::BasicHost> host,
qtils::SharedRef<blockchain::BlockTree> block_tree);

// BaseProtocol
libp2p::StreamProtocols getProtocolIds() const override;
void handle(std::shared_ptr<libp2p::Stream> stream) override;

void start();

libp2p::CoroOutcome<BlockResponse> request(libp2p::PeerId peer_id,
BlockRequest request);

private:
libp2p::CoroOutcome<void> coroRespond(
std::shared_ptr<libp2p::Stream> stream);

std::shared_ptr<boost::asio::io_context> io_context_;
std::shared_ptr<libp2p::host::BasicHost> host_;
qtils::SharedRef<blockchain::BlockTree> block_tree_;
};
} // namespace lean::modules
10 changes: 4 additions & 6 deletions src/modules/networking/interfaces.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

#include <modules/shared/networking_types.tmp.hpp>

#include "modules/shared/macro.hpp"

namespace lean::modules {

struct NetworkingLoader {
Expand All @@ -21,9 +19,9 @@ namespace lean::modules {
virtual void dispatch_peer_disconnected(
std::shared_ptr<const messages::PeerDisconnectedMessage> msg) = 0;

virtual void dispatch_StatusMessageReceived(
virtual void dispatchStatusMessageReceived(
std::shared_ptr<const messages::StatusMessageReceived> message) = 0;
virtual void dispatch_SignedVoteReceived(
virtual void dispatchSignedVoteReceived(
std::shared_ptr<const messages::SignedVoteReceived> message) = 0;
};

Expand All @@ -34,9 +32,9 @@ namespace lean::modules {

virtual void on_loading_is_finished() = 0;

virtual void on_dispatch_SendSignedBlock(
virtual void onSendSignedBlock(
std::shared_ptr<const messages::SendSignedBlock> message) = 0;
virtual void on_dispatch_SendSignedVote(
virtual void onSendSignedVote(
std::shared_ptr<const messages::SendSignedVote> message) = 0;
};

Expand Down
Loading
Loading