diff --git a/include/libp2p/protocol/gossip/config.hpp b/include/libp2p/protocol/gossip/config.hpp index 95d3ff9..f61e7f5 100644 --- a/include/libp2p/protocol/gossip/config.hpp +++ b/include/libp2p/protocol/gossip/config.hpp @@ -49,6 +49,8 @@ namespace libp2p::protocol::gossip { std::optional seqno; TopicHash topic; std::optional signature; + + std::optional received_from; }; using MessageIdFn = std::function; diff --git a/include/libp2p/protocol/gossip/gossip.hpp b/include/libp2p/protocol/gossip/gossip.hpp index b7676b8..bb17021 100644 --- a/include/libp2p/protocol/gossip/gossip.hpp +++ b/include/libp2p/protocol/gossip/gossip.hpp @@ -146,13 +146,14 @@ namespace libp2p::protocol::gossip { public: /** Receive next message published to this topic (awaitable). */ CoroOutcome receive(); + CoroOutcome receiveMessage(); /** Publish a payload to this topic (signed locally). */ void publish(BytesIn message); /** Count outbound peers currently in the mesh. */ size_t meshOutCount(); std::weak_ptr weak_gossip_; TopicHash topic_hash_; - CoroOutcomeChannel receive_channel_; + CoroOutcomeChannel receive_channel_; History history_; TopicBackoff backoff_; std::unordered_set peers_; // all peers subscribed diff --git a/include/libp2p/transport/quic/engine.hpp b/include/libp2p/transport/quic/engine.hpp index c202e2d..4bc0d79 100644 --- a/include/libp2p/transport/quic/engine.hpp +++ b/include/libp2p/transport/quic/engine.hpp @@ -83,6 +83,7 @@ namespace libp2p::transport::lsquic { * Stream read operation arguments. */ std::optional> reading; + std::optional> writing; bool want_flush = false; }; diff --git a/include/libp2p/transport/quic/error.hpp b/include/libp2p/transport/quic/error.hpp index c9fb5e9..1eb15d8 100644 --- a/include/libp2p/transport/quic/error.hpp +++ b/include/libp2p/transport/quic/error.hpp @@ -14,6 +14,7 @@ namespace libp2p { CONN_CLOSED, STREAM_CLOSED, STREAM_READ_IN_PROGRESS, + STREAM_WRITE_IN_PROGRESS, TOO_MANY_STREAMS, CANT_CREATE_CONNECTION, CANT_OPEN_STREAM, @@ -29,6 +30,8 @@ namespace libp2p { return "STREAM_CLOSED"; case E::STREAM_READ_IN_PROGRESS: return "STREAM_READ_IN_PROGRESS"; + case E::STREAM_WRITE_IN_PROGRESS: + return "STREAM_WRITE_IN_PROGRESS"; case E::TOO_MANY_STREAMS: return "TOO_MANY_STREAMS"; case E::CANT_CREATE_CONNECTION: diff --git a/src/protocol/gossip/gossip.cpp b/src/protocol/gossip/gossip.cpp index df13656..9ff8dfa 100644 --- a/src/protocol/gossip/gossip.cpp +++ b/src/protocol/gossip/gossip.cpp @@ -183,6 +183,11 @@ namespace libp2p::protocol::gossip { // Receive next message payload from a subscribed topic (for local consumer). CoroOutcome Topic::receive() { + BOOST_OUTCOME_CO_TRY(auto message, co_await receiveMessage()); + co_return message.data; + } + + CoroOutcome Topic::receiveMessage() { co_return co_await receive_channel_.receive(); } @@ -554,6 +559,8 @@ namespace libp2p::protocol::gossip { message->topic = qtils::ByteVec(qtils::str2byte(pb_publish.topic())); + message->received_from = peer->peer_id_; + auto topic_it = topics_.find(message->topic); if (topic_it == topics_.end()) { continue; @@ -565,7 +572,7 @@ namespace libp2p::protocol::gossip { continue; } score_.validateMessage(peer->peer_id_, message_id, message->topic); - topic->receive_channel_.send(message->data); + topic->receive_channel_.send(*message); score_.deliver_message(peer->peer_id_, message_id, message->topic); broadcast(*topic, peer->peer_id_, message_id, message); } @@ -751,6 +758,74 @@ namespace libp2p::protocol::gossip { return peer->batch_.value(); } + inline std::vector splitBatch(const Rpc &message) { + std::vector pb_messages; + + if (not message.subscriptions.empty() or not message.graft.empty() + or not message.prune.empty() or not message.ihave.empty() + or not message.iwant.empty() or not message.idontwant.empty()) { + gossipsub::pb::RPC pb_message; + + for (auto &[topic_hash, subscribe] : message.subscriptions) { + auto &pb_subscription = *pb_message.add_subscriptions(); + *pb_subscription.mutable_topic_id() = qtils::byte2str(topic_hash); + pb_subscription.set_subscribe(subscribe); + } + + for (auto &topic_hash : message.graft) { + auto &pb_graft = *pb_message.mutable_control()->add_graft(); + *pb_graft.mutable_topic_id() = qtils::byte2str(topic_hash); + } + + for (auto &[topic_hash, backoff] : message.prune) { + auto &pb_prune = *pb_message.mutable_control()->add_prune(); + *pb_prune.mutable_topic_id() = qtils::byte2str(topic_hash); + if (backoff.has_value()) { + pb_prune.set_backoff(backoff->count()); + } + } + + for (auto &[topic_hash, messages] : message.ihave) { + auto &pb_ihave = *pb_message.mutable_control()->add_ihave(); + *pb_ihave.mutable_topic_id() = qtils::byte2str(topic_hash); + auto &pb_messages = *pb_ihave.mutable_message_ids(); + for (auto &message : messages) { + *pb_messages.Add() = qtils::byte2str(message); + } + } + + if (not message.iwant.empty()) { + auto &pb_messages = + *pb_message.mutable_control()->add_iwant()->mutable_message_ids(); + for (auto &message : message.iwant) { + *pb_messages.Add() = qtils::byte2str(message); + } + } + + if (not message.idontwant.empty()) { + auto &pb_messages = *pb_message.mutable_control() + ->add_idontwant() + ->mutable_message_ids(); + for (auto &message : message.idontwant) { + *pb_messages.Add() = qtils::byte2str(message); + } + } + + pb_messages.emplace_back(protobufEncode(pb_message)); + } + + for (auto &publish : message.publish) { + gossipsub::pb::RPC pb_message; + + auto &pb_publish = *pb_message.add_publish(); + toProtobuf(pb_publish, *publish); + + pb_messages.emplace_back(protobufEncode(pb_message)); + } + + return pb_messages; + } + // Writer coroutine: build RPC from batch and send via varint-length framing. void Gossip::checkWrite(const PeerPtr &peer) { if (peer->writing_) { @@ -763,79 +838,23 @@ namespace libp2p::protocol::gossip { return; } peer->writing_ = true; - coroSpawn(*io_context_, [WEAK_SELF, peer]() -> Coro { + coroSpawn(*io_context_, [peer]() -> Coro { co_await coroYield(); assert(peer->writing_); assert(peer->stream_out_.has_value()); while (auto message = qtils::optionTake(peer->batch_)) { - auto self = weak_self.lock(); - if (not self) { - break; - } - - gossipsub::pb::RPC pb_message; - assert(not message->subscriptions.empty() - or not message->publish.empty() or not message->graft.empty() - or not message->prune.empty() or not message->ihave.empty() - or not message->iwant.empty() or not message->idontwant.empty()); - - for (auto &[topic_hash, subscribe] : message->subscriptions) { - auto &pb_subscription = *pb_message.add_subscriptions(); - *pb_subscription.mutable_topic_id() = qtils::byte2str(topic_hash); - pb_subscription.set_subscribe(subscribe); - } - - for (auto &publish : message->publish) { - auto &pb_publish = *pb_message.add_publish(); - toProtobuf(pb_publish, *publish); - } - - for (auto &topic_hash : message->graft) { - auto &pb_graft = *pb_message.mutable_control()->add_graft(); - *pb_graft.mutable_topic_id() = qtils::byte2str(topic_hash); - } - - for (auto &[topic_hash, backoff] : message->prune) { - auto &pb_prune = *pb_message.mutable_control()->add_prune(); - *pb_prune.mutable_topic_id() = qtils::byte2str(topic_hash); - if (backoff.has_value()) { - pb_prune.set_backoff(backoff->count()); - } - } - - for (auto &[topic_hash, messages] : message->ihave) { - auto &pb_ihave = *pb_message.mutable_control()->add_ihave(); - *pb_ihave.mutable_topic_id() = qtils::byte2str(topic_hash); - auto &pb_messages = *pb_ihave.mutable_message_ids(); - for (auto &message : messages) { - *pb_messages.Add() = qtils::byte2str(message); - } - } - - if (not message->iwant.empty()) { - auto &pb_messages = - *pb_message.mutable_control()->add_iwant()->mutable_message_ids(); - for (auto &message : message->iwant) { - *pb_messages.Add() = qtils::byte2str(message); - } - } - - if (not message->idontwant.empty()) { - auto &pb_messages = *pb_message.mutable_control() - ->add_idontwant() - ->mutable_message_ids(); - for (auto &message : message->idontwant) { - *pb_messages.Add() = qtils::byte2str(message); + auto pb_messages = splitBatch(*message); + + for (auto &encoded : pb_messages) { + assert(not encoded.empty()); + auto r = + co_await writeVarintMessage(peer->stream_out_.value(), encoded); + if (not r.has_value()) { + peer->stream_out_.reset(); + break; } } - - auto encoded = protobufEncode(pb_message); - assert(not encoded.empty()); - self.reset(); - auto r = - co_await writeVarintMessage(peer->stream_out_.value(), encoded); - if (not r.has_value()) { - peer->stream_out_.reset(); + if (not peer->stream_out_.has_value()) { break; } } diff --git a/src/transport/quic/engine.cpp b/src/transport/quic/engine.cpp index 9654c13..3544001 100644 --- a/src/transport/quic/engine.cpp +++ b/src/transport/quic/engine.cpp @@ -169,6 +169,9 @@ namespace libp2p::transport::lsquic { if (auto reading = qtils::optionTake(stream_ctx->reading)) { reading.value()(); } + if (auto writing = qtils::optionTake(stream_ctx->writing)) { + writing.value()(); + } if (auto stream = stream_ctx->stream.lock()) { stream->onClose(); } @@ -184,6 +187,15 @@ namespace libp2p::transport::lsquic { reading.value()(); } }; + stream_if.on_write = + +[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) { + lsquic_stream_wantwrite(stream, 0); + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + auto stream_ctx = reinterpret_cast(_stream_ctx); + if (auto writing = qtils::optionTake(stream_ctx->writing)) { + writing.value()(); + } + }; lsquic_engine_api api{}; api.ea_settings = &settings; diff --git a/src/transport/quic/stream.cpp b/src/transport/quic/stream.cpp index 74db89b..2d37a38 100644 --- a/src/transport/quic/stream.cpp +++ b/src/transport/quic/stream.cpp @@ -60,10 +60,28 @@ namespace libp2p::connection { if (not stream_ctx_) { co_return r; } - auto n = lsquic_stream_write(stream_ctx_->ls_stream, in.data(), in.size()); - if (n > 0) { - r = n; + if (stream_ctx_->writing) { + co_return QuicError::STREAM_WRITE_IN_PROGRESS; + } + while (true) { + // Missing from `lsquic_stream_write` documentation comment. + // Return value 0 means buffer is full. + // Call `lsquic_stream_wantwrite` and wait for `stream_if.on_write` + // callback, before calling `lsquic_stream_write` again. + auto n = + lsquic_stream_write(stream_ctx_->ls_stream, in.data(), in.size()); stream_ctx_->engine->wantFlush(stream_ctx_); + if (n == 0) { + co_await coroHandler([&](CoroHandler &&handler) { + stream_ctx_->writing.emplace(std::move(handler)); + lsquic_stream_wantwrite(stream_ctx_->ls_stream, 1); + }); + continue; + } + if (n > 0) { + r = n; + } + break; } co_return r; }