diff --git a/include/libp2p/host/basic_host.hpp b/include/libp2p/host/basic_host.hpp index 13e891d2..fbfeb55c 100644 --- a/include/libp2p/host/basic_host.hpp +++ b/include/libp2p/host/basic_host.hpp @@ -224,6 +224,11 @@ namespace libp2p::host { */ event::Bus &getBus(); + /** + * Get list of protocols that were passed to `listenProtocol`. + */ + StreamProtocols getSupportedProtocols() const; + private: std::shared_ptr idmgr_; std::shared_ptr listener_; diff --git a/include/libp2p/protocol/identify.hpp b/include/libp2p/protocol/identify.hpp new file mode 100644 index 00000000..96032db9 --- /dev/null +++ b/include/libp2p/protocol/identify.hpp @@ -0,0 +1,77 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include + +namespace boost::asio { + class io_context; +} // namespace boost::asio + +namespace libp2p::connection { + class CapableConnection; +} // namespace libp2p::connection + +namespace libp2p::host { + class BasicHost; +} // namespace libp2p::host + +namespace libp2p::peer { + class IdentityManager; +} // namespace libp2p::peer + +namespace libp2p::protocol { + struct IdentifyInfo { + PeerId peer_id; + std::string protocol_version; + std::string agent_version; + std::vector listen_addresses; + Multiaddress observed_address; + StreamProtocols protocols; + }; + + using OnIdentifyChannel = event::channel_decl; + + struct IdentifyConfig { + // Fixes default field values with boost::di. + IdentifyConfig() = default; + + std::string protocol_version; + std::string agent_version; + std::vector listen_addresses; + }; + + class Identify : public std::enable_shared_from_this, + public BaseProtocol { + public: + Identify(std::shared_ptr io_context, + std::shared_ptr host, + std::shared_ptr id_mgr, + IdentifyConfig config); + + // Adaptor + StreamProtocols getProtocolIds() const override; + + // BaseProtocol + void handle(std::shared_ptr stream) override; + + void start(); + + private: + Coro recv_identify( + std::shared_ptr connection); + + std::shared_ptr io_context_; + std::shared_ptr host_; + std::shared_ptr id_mgr_; + IdentifyConfig config_; + event::Handle on_peer_connected_sub_; + }; +} // namespace libp2p::protocol diff --git a/include/libp2p/protocol/ping.hpp b/include/libp2p/protocol/ping.hpp new file mode 100644 index 00000000..c5207906 --- /dev/null +++ b/include/libp2p/protocol/ping.hpp @@ -0,0 +1,80 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include + +namespace boost::asio { + class io_context; +} // namespace boost::asio + +namespace libp2p::connection { + class CapableConnection; +} // namespace libp2p::connection + +namespace libp2p::crypto::random { + class RandomGenerator; +} // namespace libp2p::crypto::random + +namespace libp2p::host { + class BasicHost; +} // namespace libp2p::host + +namespace libp2p::protocol { + struct PingConfig { + // Fixes default field values with boost::di. + PingConfig() = default; + + /** + * Time to wait for response. + */ + std::chrono::seconds timeout{20}; + /** + * Time between ping requests. + */ + std::chrono::seconds interval{15}; + }; + + class Ping : public std::enable_shared_from_this, public BaseProtocol { + public: + enum Error { + INVALID_RESPONSE, + }; + Q_ENUM_ERROR_CODE_FRIEND(Error) { + using E = decltype(e); + switch (e) { + case E::INVALID_RESPONSE: + return "Ping received invalid response"; + } + abort(); + } + + Ping(std::shared_ptr io_context, + std::shared_ptr host, + std::shared_ptr random, + PingConfig config); + + // Adaptor + StreamProtocols getProtocolIds() const override; + + // BaseProtocol + void handle(std::shared_ptr stream) override; + + void start(); + + private: + Coro ping(std::shared_ptr connection); + + std::shared_ptr io_context_; + std::shared_ptr host_; + std::shared_ptr random_; + PingConfig config_; + event::Handle on_peer_connected_sub_; + }; +} // namespace libp2p::protocol diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 919ea19b..158598f2 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -21,48 +21,50 @@ libp2p_add_library(libp2p INTERFACE) # Start with basic component libraries and essential dependencies target_link_libraries(libp2p INTERFACE + Boost::boost + Boost::Boost.DI + fmt::fmt + libsecp256k1::secp256k1 + lsquic::lsquic + OpenSSL::Crypto + OpenSSL::SSL p2p_basic_host + p2p_byteutil + p2p_connection_manager + p2p_crypto_provider + p2p_dialer + p2p_ecdsa_provider + p2p_ed25519_provider p2p_gossip - p2p_peer_repository + p2p_hmac_provider + p2p_identity_manager p2p_inmem_address_repository p2p_inmem_key_repository - p2p_read_buffer - p2p_literals - p2p_varint_prefix_reader - p2p_dialer - p2p_connection_manager - p2p_transport_manager + p2p_inmem_protocol_repository + p2p_key_validator p2p_listener_manager - p2p_identity_manager + p2p_literals p2p_logger - p2p_peer_id p2p_multiaddress - p2p_byteutil + p2p_multibase_codec p2p_multiselect p2p_peer_address - p2p_multibase_codec + p2p_peer_id + p2p_peer_repository + p2p_protocol_identify + p2p_protocol_ping + p2p_quic p2p_random_generator - p2p_crypto_provider - p2p_secp256k1_provider - p2p_ecdsa_provider - p2p_key_validator - p2p_ed25519_provider - p2p_inmem_protocol_repository + p2p_read_buffer p2p_rsa_provider - p2p_hmac_provider + p2p_secp256k1_provider p2p_tls - p2p_quic - Boost::boost - fmt::fmt - OpenSSL::SSL - OpenSSL::Crypto + p2p_transport_manager + p2p_varint_prefix_reader + protobuf::libprotobuf soralog::soralog yaml-cpp::yaml-cpp - lsquic::lsquic ZLIB::ZLIB - libsecp256k1::secp256k1 - protobuf::libprotobuf - Boost::Boost.DI ) target_include_directories(libp2p diff --git a/src/host/basic_host.cpp b/src/host/basic_host.cpp index 23a0efef..cba6791e 100644 --- a/src/host/basic_host.cpp +++ b/src/host/basic_host.cpp @@ -235,4 +235,7 @@ namespace libp2p::host { connection_manager_->closeConnectionsToPeer(peer_id); } + StreamProtocols BasicHost::getSupportedProtocols() const { + return listener_->getSupportedProtocols(); + } } // namespace libp2p::host diff --git a/src/protocol/CMakeLists.txt b/src/protocol/CMakeLists.txt index ac7efc30..25aed42a 100644 --- a/src/protocol/CMakeLists.txt +++ b/src/protocol/CMakeLists.txt @@ -1,2 +1,11 @@ add_subdirectory(echo) add_subdirectory(gossip) +add_subdirectory(identify) + +libp2p_add_library(p2p_protocol_ping + ping.cpp + ) +target_link_libraries(p2p_protocol_ping + Boost::boost + p2p_logger + ) diff --git a/src/protocol/identify/CMakeLists.txt b/src/protocol/identify/CMakeLists.txt new file mode 100644 index 00000000..262289c8 --- /dev/null +++ b/src/protocol/identify/CMakeLists.txt @@ -0,0 +1,19 @@ +# +# Copyright Quadrivium LLC +# All Rights Reserved +# SPDX-License-Identifier: Apache-2.0 +# + + +add_proto_library(p2p_identify_proto + identify.proto + ) + +libp2p_add_library(p2p_protocol_identify + identify.cpp + ) +target_link_libraries(p2p_protocol_identify + Boost::boost + p2p_identify_proto + p2p_logger + ) diff --git a/src/protocol/identify/identify.cpp b/src/protocol/identify/identify.cpp new file mode 100644 index 00000000..bc735dd9 --- /dev/null +++ b/src/protocol/identify/identify.cpp @@ -0,0 +1,129 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace libp2p::protocol { + Identify::Identify(std::shared_ptr io_context, + std::shared_ptr host, + std::shared_ptr id_mgr, + IdentifyConfig config) + : io_context_{std::move(io_context)}, + host_{std::move(host)}, + id_mgr_{std::move(id_mgr)}, + config_{std::move(config)} {} + + StreamProtocols Identify::getProtocolIds() const { + return {"/ipfs/id/1.0.0"}; + } + + void Identify::handle(std::shared_ptr stream) { + libp2p_identify_pb::Identify pb_message; + *pb_message.mutable_protocolversion() = config_.protocol_version; + *pb_message.mutable_agentversion() = config_.agent_version; + *pb_message.mutable_publickey() = + qtils::byte2str(crypto::marshaller::KeyMarshaller{nullptr} + .marshal(id_mgr_->getKeyPair().publicKey) + .value() + .key); + for (auto &address : config_.listen_addresses) { + *pb_message.add_listenaddrs() = + qtils::byte2str(address.getBytesAddress()); + } + *pb_message.mutable_observedaddr() = + qtils::byte2str(stream->remoteMultiaddr().value().getBytesAddress()); + for (auto &protocol : host_->getSupportedProtocols()) { + pb_message.add_protocols(protocol); + } + coroSpawn(*io_context_, + [stream, encoded{protobufEncode(pb_message)}]() -> Coro { + std::ignore = co_await writeVarintMessage(stream, encoded); + stream->reset(); + }); + } + + void Identify::start() { + host_->listenProtocol(shared_from_this()); + auto on_peer_connected = + [WEAK_SELF]( + std::weak_ptr weak_connection) { + WEAK_LOCK(connection); + WEAK_LOCK(self); + coroSpawn(*self->io_context_, [self, connection]() -> Coro { + co_await self->recv_identify(connection); + }); + }; + on_peer_connected_sub_ = + host_->getBus() + .getChannel() + .subscribe(on_peer_connected); + } + + Coro Identify::recv_identify( + std::shared_ptr connection) { + auto peer_id = connection->remotePeer(); + co_await coroYield(); + auto stream_result = + co_await host_->newStream(connection, getProtocolIds()); + if (not stream_result.has_value()) { + co_return; + } + auto &stream = stream_result.value(); + std::ignore = co_await [&]() -> CoroOutcome { + Bytes encoded; + BOOST_OUTCOME_CO_TRY(co_await readVarintMessage(stream, encoded)); + BOOST_OUTCOME_CO_TRY( + auto pb_message, + protobufDecode(encoded)); + BOOST_OUTCOME_CO_TRY( + auto observed_address, + Multiaddress::create(qtils::str2byte(pb_message.observedaddr()))); + IdentifyInfo message{ + peer_id, + pb_message.protocolversion(), + pb_message.agentversion(), + {}, + observed_address, + {}, + }; + for (auto &pb_address : pb_message.listenaddrs()) { + BOOST_OUTCOME_CO_TRY(auto address, + Multiaddress::create(qtils::str2byte(pb_address))); + message.listen_addresses.emplace_back(address); + } + for (auto &protocol : pb_message.protocols()) { + message.protocols.emplace_back(protocol); + } + + auto &peer_repo = host_->getPeerRepository(); + peer_repo.getProtocolRepository() + .addProtocols(peer_id, message.protocols) + .value(); + auto &address_repo = peer_repo.getAddressRepository(); + address_repo + .upsertAddresses( + peer_id, message.listen_addresses, peer::ttl::kRecentlyConnected) + .value(); + + host_->getBus().getChannel().publish(message); + co_return outcome::success(); + }(); + stream->reset(); + } +} // namespace libp2p::protocol diff --git a/src/protocol/identify/identify.proto b/src/protocol/identify/identify.proto new file mode 100644 index 00000000..68388b6d --- /dev/null +++ b/src/protocol/identify/identify.proto @@ -0,0 +1,35 @@ +syntax = "proto2"; + +package libp2p_identify_pb; + +message Identify { + // protocolVersion determines compatibility between peers + optional string protocolVersion = 5; // e.g. ipfs/1.0.0 + + // agentVersion is like a UserAgent string in browsers, or client version in bittorrent + // includes the client name and client. + optional string agentVersion = 6; // e.g. go-ipfs/0.1.0 + + // publicKey is this node's public key (which also gives its node.ID) + // - may not need to be sent, as secure channel implies it has been sent. + // - then again, if we change / disable secure channel, may still want it. + optional bytes publicKey = 1; + + // listenAddrs are the multiaddrs the sender node listens for open connections on + repeated bytes listenAddrs = 2; + + // observedAddr is the multiaddr of the remote endpoint that the sender node perceives + // this is useful information to convey to the other side, as it helps the remote endpoint + // determine whether its connection to the local peer goes through NAT. + optional bytes observedAddr = 4; + + repeated string protocols = 3; + + // signedPeerRecord contains a serialized SignedEnvelope containing a PeerRecord, + // signed by the sending node. It contains the same addresses as the listenAddrs field, but + // in a form that lets us share authenticated addrs with other peers. + // see https://github.com/libp2p/rust-libp2p/blob/8ac5b5aac5f5c25a85f1065e292deeaf58290189/core/src/generated/envelope.proto#L12 + // and https://github.com/libp2p/rust-libp2p/blob/8ac5b5aac5f5c25a85f1065e292deeaf58290189/core/src/generated/peer_record.proto#L11 + // for message definitions. + optional bytes signedPeerRecord = 8; +} diff --git a/src/protocol/ping.cpp b/src/protocol/ping.cpp new file mode 100644 index 00000000..f6036f13 --- /dev/null +++ b/src/protocol/ping.cpp @@ -0,0 +1,111 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace libp2p::protocol { + constexpr size_t kPingSize = 32; + using PingMessage = qtils::ByteArr; + + Ping::Ping(std::shared_ptr io_context, + std::shared_ptr host, + std::shared_ptr random, + PingConfig config) + : io_context_{std::move(io_context)}, + host_{std::move(host)}, + random_{std::move(random)}, + config_{std::move(config)} {} + + StreamProtocols Ping::getProtocolIds() const { + return {"/ipfs/ping/1.0.0"}; + } + + void Ping::handle(std::shared_ptr stream) { + coroSpawn(*io_context_, [stream]() -> Coro { + PingMessage message; + while (true) { + auto r = co_await read(stream, message); + if (not r.has_value()) { + break; + } + r = co_await write(stream, message); + if (not r.has_value()) { + break; + } + } + }); + } + + void Ping::start() { + host_->listenProtocol(shared_from_this()); + auto on_peer_connected = + [WEAK_SELF]( + std::weak_ptr weak_connection) { + WEAK_LOCK(connection); + WEAK_LOCK(self); + coroSpawn(*self->io_context_, [self, connection]() -> Coro { + co_await self->ping(connection); + }); + }; + on_peer_connected_sub_ = + host_->getBus() + .getChannel() + .subscribe(on_peer_connected); + } + + Coro Ping::ping( + std::shared_ptr connection) { + co_await coroYield(); + boost::asio::steady_timer timer{*io_context_}; + std::shared_ptr stream; + while (true) { + if (stream == nullptr) { + auto stream_result = + co_await host_->newStream(connection, getProtocolIds()); + if (not stream_result.has_value()) { + break; + } + stream = stream_result.value(); + } + PingMessage message; + random_->fillRandomly(message); + timer.expires_after(config_.timeout); + timer.async_wait([stream](boost::system::error_code ec) { + if (not ec) { + stream->reset(); + } + }); + auto r = co_await write(stream, message); + if (r.has_value()) { + PingMessage reply; + r = co_await read(stream, reply); + if (r.has_value()) { + if (reply != message) { + r = Error::INVALID_RESPONSE; + } + } + } + if (not r.has_value()) { + stream->reset(); + stream.reset(); + } + timer.cancel(); + timer.expires_after(config_.interval); + co_await timer.async_wait(boost::asio::use_awaitable); + } + } +} // namespace libp2p::protocol diff --git a/src/transport/quic/engine.cpp b/src/transport/quic/engine.cpp index 5171f0b8..9654c135 100644 --- a/src/transport/quic/engine.cpp +++ b/src/transport/quic/engine.cpp @@ -19,6 +19,20 @@ #include namespace libp2p::transport::lsquic { + inline boost::asio::ip::udp::endpoint endpointFrom(const sockaddr *raw) { + boost::asio::ip::udp::endpoint endpoint; + size_t size = 0; + if (raw->sa_family == AF_INET) { + size = sizeof(sockaddr_in); + } else if (raw->sa_family == AF_INET6) { + size = sizeof(sockaddr_in6); + } else { + throw std::logic_error{"endpointFrom expects IPv4 or IPv6 address"}; + } + memcpy(endpoint.data(), raw, size); + return endpoint; + } + Engine::Engine(std::shared_ptr io_context, std::shared_ptr ssl_context, const muxer::MuxedConnectionConfig &mux_config, @@ -102,12 +116,15 @@ namespace libp2p::transport::lsquic { if (op and info.peer_id != op->peer) { return security::TlsError::TLS_UNEXPECTED_PEER_ID; } + const sockaddr *local_sockaddr = nullptr; + const sockaddr *peer_sockaddr = nullptr; + lsquic_conn_get_sockaddr(conn, &local_sockaddr, &peer_sockaddr); auto conn = std::make_shared( self->io_context_, conn_ctx, op.has_value(), self->local_, - detail::makeQuicAddr(op->remote).value(), + detail::makeQuicAddr(endpointFrom(peer_sockaddr)).value(), self->local_peer_, info.peer_id, info.public_key);