Skip to content

Initial ValidationOracle implementation #331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,283 changes: 1,004 additions & 279 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ repository = "https://github.com/ethereum/trin"
discv5 = { git = "https://github.com/sigp/discv5.git", branch = "master" }
ethereum-types = "0.12.1"
hex = "0.4.3"
log = "0.4.14"
log = "0.4.17"
prometheus_exporter = "0.8.4"
rand = "0.8.4"
rlp = "0.5.0"
Expand Down
2 changes: 1 addition & 1 deletion ethportal-peertest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ rust-version = "1.58.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0.52"
anyhow = "1.0.57"
clap = "2.33.3"
discv5 = { git = "https://github.com/sigp/discv5.git", branch = "master" }
futures = "0.3.21"
Expand Down
2 changes: 1 addition & 1 deletion ethportal-peertest/src/jsonrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ pub fn make_ipc_request(ipc_path: &str, request: &JsonRpcRequest) -> anyhow::Res
stream.flush().unwrap();
let deser = serde_json::Deserializer::from_reader(stream);
let next_obj = deser.into_iter::<Value>().next();
let response_obj = next_obj.ok_or(anyhow!("Empty JsonRpc response"))?;
let response_obj = next_obj.ok_or_else(|| anyhow!("Empty JsonRpc response"))?;
get_response_result(response_obj)
}

Expand Down
2 changes: 1 addition & 1 deletion ethportal-peertest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub async fn launch_node(id: u16, bootnode_enr: Option<&SszEnr>) -> anyhow::Resu
Some(enr) => {
let external_addr = format!(
"{}:{}",
enr.ip().expect("bootnode must have IP"),
enr.ip4().expect("bootnode must have IP"),
discovery_port
);
let enr_base64 = enr.to_base64();
Expand Down
1 change: 1 addition & 0 deletions newsfragments/331.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Initial HeaderOracle implementation.
15 changes: 11 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use log::debug;
use tokio::sync::mpsc;
Expand All @@ -14,6 +14,7 @@ use trin_core::{
discovery::Discovery, events::PortalnetEvents, storage::PortalStorage,
types::messages::PortalnetConfig,
},
types::validation::HeaderOracle,
utils::bootnodes::parse_bootnodes,
utp::stream::UtpListener,
};
Expand All @@ -22,12 +23,11 @@ use trin_state::initialize_state_network;

pub async fn run_trin(
trin_config: TrinConfig,
infura_project_id: String,
infura_url: String,
) -> Result<Arc<JsonRpcExiter>, Box<dyn std::error::Error>> {
trin_config.display_config();

let bootnode_enrs = parse_bootnodes(&trin_config.bootnodes)?;

let portalnet_config = PortalnetConfig {
external_addr: trin_config.external_addr,
private_key: trin_config.private_key.clone(),
Expand Down Expand Up @@ -60,6 +60,12 @@ pub async fn run_trin(
let storage_config =
PortalStorage::setup_config(discovery.local_enr().node_id(), trin_config.kb)?;

// Initialize validation oracle
let header_oracle = Arc::new(RwLock::new(HeaderOracle {
infura_url: infura_url.clone(),
..HeaderOracle::default()
}));

debug!("Selected networks to spawn: {:?}", trin_config.networks);
// Initialize state sub-network service and event handlers, if selected
let (state_handler, state_network_task, state_event_tx, state_utp_tx, state_jsonrpc_tx) =
Expand Down Expand Up @@ -92,6 +98,7 @@ pub async fn run_trin(
utp_listener_tx,
portalnet_config.clone(),
storage_config.clone(),
header_oracle,
)
.await
} else {
Expand All @@ -108,7 +115,7 @@ pub async fn run_trin(
tokio::task::spawn_blocking(|| {
launch_jsonrpc_server(
jsonrpc_trin_config,
infura_project_id,
infura_url,
portal_jsonrpc_tx,
live_server_tx,
json_exiter_clone,
Expand Down
16 changes: 4 additions & 12 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::env;

use trin_core::cli::TrinConfig;
use trin_core::{cli::TrinConfig, utils::infura::build_infura_project_url_from_env};

use trin::run_trin;

Expand All @@ -11,15 +9,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Launching trin");

let trin_config = TrinConfig::from_cli();
let infura_project_id = match env::var("TRIN_INFURA_PROJECT_ID") {
Ok(val) => val,
Err(_) => panic!(
"Must supply Infura key as environment variable, like:\n\
TRIN_INFURA_PROJECT_ID=\"your-key-here\" trin"
),
};

let exiter = run_trin(trin_config, infura_project_id).await?;
let infura_url = build_infura_project_url_from_env();

let exiter = run_trin(trin_config, infura_url).await?;

tokio::signal::ctrl_c()
.await
Expand Down
5 changes: 3 additions & 2 deletions trin-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ edition = "2021"
rust-version = "1.58.0"

[dependencies]
anyhow = "1.0.52"
anyhow = "1.0.57"
async-recursion = "1.0.0"
async-trait = "0.1.53"
base64 = "0.13.0"
bytes = "1.1.0"
clap = "2.33.3"
Expand All @@ -27,7 +28,7 @@ hmac-sha256 = "1.1.1"
httparse = "1.5.1"
keccak-hash = "0.8.0"
lazy_static = "1.4.0"
log = "0.4.14"
log = "0.4.17"
num = "0.4.0"
parking_lot = "0.11.2"
prometheus_exporter = "0.8.4"
Expand Down
28 changes: 8 additions & 20 deletions trin-core/src/jsonrpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Default for JsonRpcExiter {

pub fn launch_jsonrpc_server(
trin_config: TrinConfig,
infura_project_id: String,
infura_url: String,
portal_tx: UnboundedSender<PortalJsonRpcRequest>,
live_server_tx: tokio::sync::mpsc::Sender<bool>,
json_rpc_exiter: Arc<JsonRpcExiter>,
Expand All @@ -68,19 +68,13 @@ pub fn launch_jsonrpc_server(
match trin_config.web3_transport.as_str() {
"ipc" => launch_ipc_client(
pool,
infura_project_id,
infura_url,
&trin_config.web3_ipc_path,
portal_tx,
live_server_tx,
json_rpc_exiter,
),
"http" => launch_http_client(
pool,
infura_project_id,
trin_config,
portal_tx,
live_server_tx,
),
"http" => launch_http_client(pool, infura_url, trin_config, portal_tx, live_server_tx),
val => panic!("Unsupported web3 transport: {}", val),
}
}
Expand Down Expand Up @@ -126,7 +120,7 @@ fn get_listener_result(ipc_path: &str) -> tokio::io::Result<uds_windows::UnixLis

fn launch_ipc_client(
pool: ThreadPool,
infura_project_id: String,
infura_url: String,
ipc_path: &str,
portal_tx: UnboundedSender<PortalJsonRpcRequest>,
live_server_tx: tokio::sync::mpsc::Sender<bool>,
Expand Down Expand Up @@ -174,10 +168,9 @@ fn launch_ipc_client(
Err(_) => break, // Socket exited
};
debug!("New IPC client: {:?}", stream.peer_addr().unwrap());
let infura_project_id = infura_project_id.clone();
let infura_url = infura_url.clone();
let portal_tx = portal_tx.clone();
pool.execute(move || {
let infura_url = get_infura_url(&infura_project_id);
let mut rx = stream.try_clone().unwrap();
let mut tx = stream;
serve_ipc_client(&mut rx, &mut tx, &infura_url, portal_tx);
Expand All @@ -192,7 +185,7 @@ fn launch_ipc_client(

fn launch_http_client(
pool: ThreadPool,
infura_project_id: String,
infura_url: String,
trin_config: TrinConfig,
portal_tx: UnboundedSender<PortalJsonRpcRequest>,
live_server_tx: tokio::sync::mpsc::Sender<bool>,
Expand All @@ -212,10 +205,9 @@ fn launch_http_client(
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let infura_project_id = infura_project_id.clone();
let infura_url = infura_url.clone();
let portal_tx = portal_tx.clone();
pool.execute(move || {
let infura_url = get_infura_url(&infura_project_id);
serve_http_client(stream, &infura_url, portal_tx);
});
}
Expand Down Expand Up @@ -386,7 +378,7 @@ fn dispatch_trin_request(
}

// Handle all requests served by infura
fn dispatch_infura_request(obj: JsonRequest, infura_url: &str) -> Result<String, String> {
pub fn dispatch_infura_request(obj: JsonRequest, infura_url: &str) -> Result<String, String> {
match proxy_to_url(&obj, infura_url) {
Ok(result_body) => Ok(std::str::from_utf8(&result_body).unwrap().to_owned()),
Err(err) => Err(json!({
Expand Down Expand Up @@ -447,7 +439,3 @@ fn proxy_to_url(request: &JsonRequest, url: &str) -> io::Result<Vec<u8>> {
)),
}
}

fn get_infura_url(infura_project_id: &str) -> String {
return format!("https://mainnet.infura.io:443/v3/{}", infura_project_id);
}
4 changes: 2 additions & 2 deletions trin-core/src/portalnet/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl Discovery {
if let Some(ip_address) = config.enr_address {
builder.ip(ip_address);
}
builder.udp(config.listen_port);
builder.udp4(config.listen_port);
builder.build(&enr_key).unwrap()
};

Expand Down Expand Up @@ -150,7 +150,7 @@ impl Discovery {
json!({
"enr": self.discv5.local_enr().to_base64(),
"nodeId": self.discv5.local_enr().node_id().to_string(),
"ip": self.discv5.local_enr().ip().map_or("None".to_owned(), |ip| ip.to_string())
"ip": self.discv5.local_enr().ip4().map_or("None".to_owned(), |ip| ip.to_string())
})
}

Expand Down
27 changes: 22 additions & 5 deletions trin-core/src/portalnet/overlay.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use anyhow::anyhow;
use std::{collections::HashSet, marker::PhantomData, sync::Arc, time::Duration};
use std::{
collections::HashSet,
fmt::Debug,
marker::{PhantomData, Sync},
sync::Arc,
time::Duration,
};

use super::{
discovery::Discovery,
Expand All @@ -17,6 +23,7 @@ use crate::portalnet::{

use crate::{
portalnet::types::content_key::RawContentKey,
types::validation::Validator,
utp::{
stream::{UtpListenerEvent, UtpListenerRequest, UtpStream, BUF_SIZE},
trin_helpers::{UtpAccept, UtpMessage, UtpStreamId},
Expand Down Expand Up @@ -68,7 +75,7 @@ impl Default for OverlayConfig {
/// implement the overlay protocol and the overlay protocol is where we can encapsulate the logic for
/// handling common network requests/responses.
#[derive(Clone)]
pub struct OverlayProtocol<TContentKey, TMetric> {
pub struct OverlayProtocol<TContentKey, TMetric, TValidator> {
/// Reference to the underlying discv5 protocol
pub discovery: Arc<Discovery>,
/// Reference to the database instance
Expand All @@ -89,10 +96,17 @@ pub struct OverlayProtocol<TContentKey, TMetric> {
phantom_content_key: PhantomData<TContentKey>,
/// Associate a metric with the overlay network.
phantom_metric: PhantomData<TMetric>,
/// Declare the Validator type for a given overlay network.
phantom_validator: PhantomData<TValidator>,
}

impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
OverlayProtocol<TContentKey, TMetric>
impl<
TContentKey: OverlayContentKey + Send + Sync,
TMetric: Metric + Send,
TValidator: 'static + Validator<TContentKey> + Send,
> OverlayProtocol<TContentKey, TMetric, TValidator>
where
<TContentKey as TryFrom<Vec<u8>>>::Error: Debug,
{
pub async fn new(
config: OverlayConfig,
Expand All @@ -101,6 +115,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
storage: Arc<RwLock<PortalStorage>>,
data_radius: U256,
protocol: ProtocolId,
validator: TValidator,
) -> Self {
let kbuckets = Arc::new(RwLock::new(KBucketsTable::new(
discovery.local_enr().node_id().into(),
Expand All @@ -111,7 +126,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
)));

let data_radius = Arc::new(data_radius);
let request_tx = OverlayService::<TContentKey, TMetric>::spawn(
let request_tx = OverlayService::<TContentKey, TMetric, TValidator>::spawn(
Arc::clone(&discovery),
Arc::clone(&storage),
Arc::clone(&kbuckets),
Expand All @@ -121,6 +136,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
protocol.clone(),
utp_listener_tx.clone(),
config.enable_metrics,
validator,
)
.await
.unwrap();
Expand All @@ -135,6 +151,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
utp_listener_tx,
phantom_content_key: PhantomData,
phantom_metric: PhantomData,
phantom_validator: PhantomData,
}
}

Expand Down
Loading