Skip to content
This repository has been archived by the owner on Feb 20, 2024. It is now read-only.

Commit

Permalink
feat(waku-relay): fork gossipsub on libp2p-gossipsub-v0.44.4 (2d9ae38) (
Browse files Browse the repository at this point in the history
  • Loading branch information
LNSD authored May 18, 2023
1 parent f12392f commit 7604aa7
Show file tree
Hide file tree
Showing 35 changed files with 10,530 additions and 10 deletions.
4 changes: 3 additions & 1 deletion waku-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ byte-unit = { version = "4.0.19", default-features = false, features = ["std"] }
bytes = "1.4.0"
hex = "0.4.3"
prost = "0.11.9"
unsigned-varint = "0.7.1"
quick-protobuf = "0.8"
thiserror.workspace = true
unsigned-varint = { version = "0.7.1", features = ["asynchronous-codec"] }
1 change: 1 addition & 0 deletions waku-core/src/common.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod protobuf_codec;
pub mod quick_protobuf_codec;
78 changes: 78 additions & 0 deletions waku-core/src/common/quick_protobuf_codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Code borrowed from: https://github.com/libp2p/rust-libp2p/blob/master/misc/quick-protobuf-codec/

#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

use asynchronous_codec::{Decoder, Encoder};
use bytes::{Bytes, BytesMut};
use quick_protobuf::{BytesReader, MessageRead, MessageWrite, Writer};
use std::marker::PhantomData;
use unsigned_varint::codec::UviBytes;

/// [`Codec`] implements [`Encoder`] and [`Decoder`], uses [`unsigned_varint`]
/// to prefix messages with their length and uses [`quick_protobuf`] and a provided
/// `struct` implementing [`MessageRead`] and [`MessageWrite`] to do the encoding.
pub struct Codec<In, Out = In> {
uvi: UviBytes,
phantom: PhantomData<(In, Out)>,
}

impl<In, Out> Codec<In, Out> {
/// Create new [`Codec`].
///
/// Parameter `max_message_len_bytes` determines the maximum length of the
/// Protobuf message. The limit does not include the bytes needed for the
/// [`unsigned_varint`].
pub fn new(max_message_len_bytes: usize) -> Self {
let mut uvi = UviBytes::default();
uvi.set_max_len(max_message_len_bytes);
Self {
uvi,
phantom: PhantomData::default(),
}
}
}

impl<In: MessageWrite, Out> Encoder for Codec<In, Out> {
type Item = In;
type Error = Error;

fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let mut encoded_msg = Vec::new();
let mut writer = Writer::new(&mut encoded_msg);
item.write_message(&mut writer)
.expect("Encoding to succeed");
self.uvi.encode(Bytes::from(encoded_msg), dst)?;

Ok(())
}
}

impl<In, Out> Decoder for Codec<In, Out>
where
Out: for<'a> MessageRead<'a>,
{
type Item = Out;
type Error = Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let msg = match self.uvi.decode(src)? {
None => return Ok(None),
Some(msg) => msg,
};

let mut reader = BytesReader::from_bytes(&msg);
let message = Self::Item::from_reader(&mut reader, &msg)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
Ok(Some(message))
}
}

#[derive(thiserror::Error, Debug)]
#[error("Failed to encode/decode message")]
pub struct Error(#[from] std::io::Error);

impl From<Error> for std::io::Error {
fn from(e: Error) -> Self {
e.0
}
}
39 changes: 37 additions & 2 deletions waku-relay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,48 @@ version = "0.1.0"
edition = "2021"

[dependencies]
either = "1.5"
libp2p = { version = "0.51.3", features = ["macros"] }
bytes = "1.4.0"
libp2p = { workspace = true, features = ["gossipsub", "macros"] }
prost = "0.11.9"
byteorder = "1.3.4"
fnv = "1.0.7"
futures = "0.3.28"
rand = "0.8"
asynchronous-codec = "0.6"
unsigned-varint = { version = "0.7.0", features = ["asynchronous_codec"] }
log = "0.4.11"
sha2 = "0.10.6"
base64 = "0.21.0"
smallvec = "1.6.1"
strum_macros = "0.24.3"
prost = "0.11.9"
quick-protobuf = "0.8"
#quick-protobuf-codec = { version = "0.1", path = "../../misc/quick-protobuf-codec" }
hex_fmt = "0.3.0"
regex = "1.8.1"
serde = { version = "1", optional = true, features = ["derive"] }
thiserror = { workspace = true }
wasm-timer = "0.2.5"
instant = "0.1.11"
void = "1.0.2"
# Metrics dependencies
prometheus-client = "0.19.0"
waku-core = { version = "0.1.0", path = "../waku-core" }

[dev-dependencies]
async-std = { version = "1.6.3", features = ["unstable"] }
env_logger = "0.10.0"
hex = "0.4.2"
hex-literal = "0.4.1"
#libp2p-core = { path = "../../core"}
#libp2p-mplex = { path = "../../muxers/mplex"}
#libp2p-noise = { path = "../../transports/noise"}
#libp2p-swarm-test = { path = "../../swarm-test"}
#quickcheck-ext = { path = "../../misc/quickcheck-ext" }

# Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
rustc-args = ["--cfg", "docsrs"]
5 changes: 3 additions & 2 deletions waku-relay/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use libp2p::gossipsub::{IdentTopic, MessageAuthenticity, MessageId, ValidationMode};
use crate::gossipsub::{IdentTopic, MessageAuthenticity, MessageId, ValidationMode};
use libp2p::swarm::NetworkBehaviour;
use libp2p::{gossipsub, PeerId};
use libp2p::PeerId;
use prost::Message;

use waku_core::message::proto::waku::message::v1::WakuMessage as WakuMessageProto;
Expand All @@ -9,6 +9,7 @@ use waku_core::pubsub_topic::PubsubTopic;

use crate::error::{PublishError, SubscriptionError};
use crate::event::Event;
use crate::gossipsub;
use crate::message_id::deterministic_message_id_fn;
use crate::proto::MAX_WAKU_RELAY_MESSAGE_SIZE;

Expand Down
3 changes: 1 addition & 2 deletions waku-relay/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
//! Error types that can result from Waku relay.
use libp2p::gossipsub;

use crate::error::PublishError::{Duplicate, GossipsubError, InsufficientPeers, MessageTooLarge};
use crate::error::SubscriptionError::NotAllowed;
use crate::gossipsub;

/// Error associated with publishing a Waku message.
#[derive(Debug, thiserror::Error)]
Expand Down
3 changes: 2 additions & 1 deletion waku-relay/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use libp2p::{gossipsub, PeerId};
use libp2p::PeerId;
use prost::Message;
use strum_macros::Display;

use crate::gossipsub;
use waku_core::message::proto::waku::message::v1::WakuMessage as WakuMessageProto;
use waku_core::message::WakuMessage;
use waku_core::message::MAX_WAKU_MESSAGE_SIZE;
Expand Down
Loading

0 comments on commit 7604aa7

Please sign in to comment.