From 371aef76d23e97848ccf22417a86d43cafeb54d5 Mon Sep 17 00:00:00 2001 From: wujian0327 <353981613@qq.com> Date: Fri, 14 Feb 2025 19:31:43 +0800 Subject: [PATCH 1/6] p2p --- .gitignore | 1 + aries/src/main.rs | 65 +--- aries/src/service/api/nostr_router.rs | 15 +- aries/src/service/relay_server.rs | 497 +++++++++++--------------- cert.der | Bin 0 -> 355 bytes common/src/model.rs | 8 +- gateway/src/api/mod.rs | 6 +- gateway/src/api/nostr_router.rs | 2 +- gateway/src/api/ztm_router.rs | 224 ++++++------ gateway/src/https_server.rs | 59 +-- gemini/Cargo.toml | 6 + gemini/src/lib.rs | 1 + gemini/src/p2p/client.rs | 107 ++++++ gemini/src/p2p/mod.rs | 68 ++++ gemini/src/p2p/relay.rs | 257 +++++++++++++ key.der | Bin 0 -> 138 bytes mega/src/commands/service/multi.rs | 14 +- 17 files changed, 798 insertions(+), 532 deletions(-) create mode 100644 cert.der create mode 100644 gemini/src/p2p/client.rs create mode 100644 gemini/src/p2p/mod.rs create mode 100644 gemini/src/p2p/relay.rs create mode 100644 key.der diff --git a/.gitignore b/.gitignore index d442d9b8..9d6b1a6a 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,4 @@ buck-out # Gtk4 resource files **/*.gresource monobean/resources/lib + diff --git a/aries/src/main.rs b/aries/src/main.rs index cdeffde2..92c5fe44 100644 --- a/aries/src/main.rs +++ b/aries/src/main.rs @@ -1,19 +1,7 @@ use clap::Parser; use common::config::{Config, LogConfig}; -use gemini::ztm::{ - agent::{run_ztm_client, LocalZTMAgent, ZTMAgent}, - hub::LocalZTMHub, -}; -use service::{ - ca_server::run_ca_server, - relay_server::{run_relay_server, RelayOptions}, -}; -use std::{ - env, - path::PathBuf, - thread::{self}, - time::{self}, -}; +use service::relay_server::{run_relay_server, RelayOptions}; +use std::{env, path::PathBuf}; use tracing_subscriber::fmt::writer::MakeWriterExt; pub mod service; @@ -44,55 +32,6 @@ async fn main() { tracing::info!("{:?}", option); - if option.only_agent { - let (peer_id, _) = vault::init(); - let ztm_agent: LocalZTMAgent = LocalZTMAgent { - agent_port: option.ztm_agent_port, - }; - ztm_agent.clone().start_ztm_agent(); - thread::sleep(time::Duration::from_secs(3)); - run_ztm_client( - "http://gitmono.org/relay".to_string(), - config.clone(), - peer_id, - ztm_agent, - 8001, - ) - .await - } - - //Start a sub thread to ca server - let config_clone = config.clone(); - let ca_port = option.ca_port; - tokio::spawn(async move { run_ca_server(config_clone, ca_port).await }); - thread::sleep(time::Duration::from_secs(3)); - - //Start a sub thread to run ztm-hub - let ca = format!("127.0.0.1:{ca_port}"); - let ztm_hub: LocalZTMHub = LocalZTMHub { - hub_port: option.ztm_hub_port, - ca, - name: vec!["relay".to_string()], - }; - ztm_hub.clone().start_ztm_hub(); - thread::sleep(time::Duration::from_secs(3)); - - //Start a sub thread to run ztm-agent - let ztm_agent = LocalZTMAgent { - agent_port: option.ztm_agent_port, - }; - thread::sleep(time::Duration::from_secs(3)); - - match ztm_agent.get_ztm_endpoints().await { - Ok(ztm_ep_list) => { - tracing::info!("ztm agent connect success"); - tracing::info!("{} online endpoints", ztm_ep_list.len()); - } - Err(_) => { - tracing::error!("ztm agent connect failed"); - } - } - //Start relay server run_relay_server(config, option).await; } diff --git a/aries/src/service/api/nostr_router.rs b/aries/src/service/api/nostr_router.rs index 76313181..d99239d1 100644 --- a/aries/src/service/api/nostr_router.rs +++ b/aries/src/service/api/nostr_router.rs @@ -16,7 +16,6 @@ use gemini::nostr::{ }; use jupiter::storage::ztm_storage::ZTMStorage; use serde_json::Value; -use tokio::task; use uuid::Uuid; use crate::service::{relay_server::AppState, Req}; @@ -70,13 +69,13 @@ async fn recieve( storage.insert_nostr_event(ztm_nostr_event).await.unwrap(); //Event is forwarded to subscribed nodes - let nostr_event_clone = nostr_event.clone(); - let storage_clone = storage.clone(); - let ztm_agent_port = state.relay_option.clone().ztm_agent_port; - task::spawn(async move { - transfer_event_to_subscribed_nodes(storage_clone, nostr_event_clone, ztm_agent_port) - .await - }); + // let nostr_event_clone = nostr_event.clone(); + // let storage_clone = storage.clone(); + // let ztm_agent_port = state.relay_option.clone().ztm_agent_port; + // task::spawn(async move { + // transfer_event_to_subscribed_nodes(storage_clone, nostr_event_clone, ztm_agent_port) + // .await + // }); let res = RelayMessage::new_ok(nostr_event.id, true, "ok".to_string()); let value = serde_json::to_value(res).unwrap(); diff --git a/aries/src/service/relay_server.rs b/aries/src/service/relay_server.rs index 2245f56f..fa0c1b2d 100644 --- a/aries/src/service/relay_server.rs +++ b/aries/src/service/relay_server.rs @@ -1,21 +1,12 @@ -use std::collections::HashMap; use std::net::SocketAddr; use std::str::FromStr; -use std::time::{Duration, SystemTime}; -use axum::extract::{Query, State}; use axum::http::StatusCode; use axum::response::IntoResponse; -use axum::routing::{get, post}; +use axum::routing::get; use axum::{Json, Router}; -use callisto::{ztm_lfs_info, ztm_node, ztm_repo_info}; use clap::Parser; use common::config::Config; -use gemini::ztm::hub::{LocalHub, ZTMUserPermit, ZTMCA}; -use gemini::ztm::send_get_request_to_peer_by_tunnel; -use gemini::{ - LFSChunk, LFSInfo, LFSInfoPostBody, LFSInfoRes, Node, RelayGetParams, RelayResultRes, RepoInfo, -}; use jupiter::context::Context; use tower::ServiceBuilder; use tower_http::cors::{Any, CorsLayer}; @@ -29,24 +20,12 @@ pub struct RelayOptions { #[arg(long, default_value_t = String::from("127.0.0.1"))] pub host: String, - #[arg(long, default_value_t = String::from("127.0.0.1"))] - pub hub_host: String, - #[arg(long, default_value_t = 8001)] pub relay_port: u16, - #[arg(long, default_value_t = 7777)] - pub ztm_agent_port: u16, - - #[arg(long, default_value_t = 8888)] - pub ztm_hub_port: u16, - #[arg(long, default_value_t = 9999)] pub ca_port: u16, - #[arg(long, default_value_t = false)] - pub only_agent: bool, - #[arg(long, short)] pub config: Option, } @@ -62,6 +41,7 @@ pub async fn run_relay_server(config: Config, option: RelayOptions) { let server_url = format!("{}:{}", option.host, option.relay_port); tracing::info!("start relay server: {server_url}"); + tokio::spawn(async move { gemini::p2p::relay::run(option.host, option.relay_port).await }); let addr = SocketAddr::from_str(&server_url).unwrap(); let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); axum::serve(listener, app.into_make_service()) @@ -75,8 +55,6 @@ pub async fn app(config: Config, relay_option: RelayOptions) -> Router { relay_option, }; - let context = Context::new(config.clone()).await; - tokio::spawn(async move { loop_running(context).await }); Router::new() .nest("/api/v1", routers().with_state(state)) .layer(ServiceBuilder::new().layer(CorsLayer::new().allow_origin(Any))) @@ -85,17 +63,16 @@ pub async fn app(config: Config, relay_option: RelayOptions) -> Router { } pub fn routers() -> Router { - let router = Router::new() - .route("/hello", get(hello)) - .route("/certificate", get(certificate)) - .route("/ping", get(ping)) - .route("/node_list", get(node_list)) - .route("/repo_provide", post(repo_provide)) - .route("/repo_list", get(repo_list)) - .route("/test/send", get(send_message)) - .route("/lfs_share", post(lfs_share)) - .route("/lfs_list", get(lfs_list)) - .route("/lfs_chunk", get(lfs_chunk)); + let router = Router::new().route("/hello", get(hello)); + // .route("/certificate", get(certificate)) + // .route("/ping", get(ping)) + // .route("/node_list", get(node_list)) + // .route("/repo_provide", post(repo_provide)) + // .route("/repo_list", get(repo_list)) + // .route("/test/send", get(send_message)) + // .route("/lfs_share", post(lfs_share)) + // .route("/lfs_list", get(lfs_list)) + // .route("/lfs_chunk", get(lfs_chunk)); Router::new() .merge(router) @@ -106,270 +83,212 @@ async fn hello() -> Result { Ok(Json("hello relay")) } -async fn certificate( - Query(query): Query, - state: State, -) -> Result, (StatusCode, String)> { - let option = state.relay_option.clone(); - if query.name.is_none() { - return Err((StatusCode::BAD_REQUEST, "not enough paras".to_string())); - } - let name = query.name.unwrap(); - - let ztm: LocalHub = LocalHub { - hub_host: option.hub_host, - hub_port: option.ztm_hub_port, - ca_port: option.ca_port, - }; - let permit = match ztm.create_ztm_certificate(name.clone()).await { - Ok(p) => p, - Err(e) => { - return Err((StatusCode::INTERNAL_SERVER_ERROR, e)); - } - }; - Ok(Json(permit)) -} - -pub async fn ping( - Query(query): Query, - state: State, -) -> Result, (StatusCode, String)> { - let storage = state.context.services.ztm_storage.clone(); - let node: ztm_node::Model = match query.try_into() { - Ok(n) => n, - Err(_) => { - return Err((StatusCode::BAD_REQUEST, "invalid paras".to_string())); - } - }; - match storage.insert_or_update_node(node).await { - Ok(_) => Ok(Json(RelayResultRes { success: true })), - Err(_) => Err(( - StatusCode::INTERNAL_SERVER_ERROR, - "invalid paras".to_string(), - )), - } -} - -pub async fn node_list( - Query(_query): Query, - state: State, -) -> Result>, (StatusCode, String)> { - let storage = state.context.services.ztm_storage.clone(); - let nodelist: Vec = storage - .get_all_node() - .await - .unwrap() - .into_iter() - .map(|x| x.into()) - .collect(); - Ok(Json(nodelist)) -} - -pub async fn repo_provide( - state: State, - Json(repo_info): Json, -) -> Result, (StatusCode, String)> { - if repo_info.identifier.is_empty() { - return Err((StatusCode::BAD_REQUEST, "paras invalid".to_string())); - } - let repo_info_model: ztm_repo_info::Model = repo_info.into(); - let storage = state.context.services.ztm_storage.clone(); - match storage.insert_or_update_repo_info(repo_info_model).await { - Ok(_) => Ok(Json(RelayResultRes { success: true })), - Err(_) => Err(( - StatusCode::INTERNAL_SERVER_ERROR, - "invalid paras".to_string(), - )), - } -} - -pub async fn repo_list( - Query(_query): Query, - state: State, -) -> Result>, (StatusCode, String)> { - let storage = state.context.services.ztm_storage.clone(); - let repo_info_list: Vec = storage - .get_all_repo_info() - .await - .unwrap() - .into_iter() - .map(|x| x.into()) - .collect(); - let nodelist: Vec = storage - .get_all_node() - .await - .unwrap() - .into_iter() - .map(|x| x.into()) - .collect(); - let mut repo_info_list_result = vec![]; - for mut repo in repo_info_list { - for node in &nodelist { - if repo.origin == node.peer_id { - repo.peer_online = node.online; - } - } - repo_info_list_result.push(repo.clone()); - } - Ok(Json(repo_info_list_result)) -} - -pub async fn lfs_share( - state: State, - Json(lfs_info): Json, -) -> Result, (StatusCode, String)> { - let ztm_lfs_model: ztm_lfs_info::Model = lfs_info.into(); - let storage = state.context.services.ztm_storage.clone(); - match storage.insert_lfs_info(ztm_lfs_model).await { - Ok(_) => Ok(Json(RelayResultRes { success: true })), - Err(_) => Err(( - StatusCode::INTERNAL_SERVER_ERROR, - "invalid paras".to_string(), - )), - } -} - -pub async fn lfs_list( - Query(_query): Query, - state: State, -) -> Result>, (StatusCode, String)> { - let lfs_info_list_result = lfs_list_handler(state).await; - Ok(Json(lfs_info_list_result)) -} - -async fn lfs_list_handler(state: State) -> Vec { - let storage = state.context.services.ztm_storage.clone(); - let lfs_info_list: Vec = storage - .get_all_lfs_info() - .await - .unwrap() - .into_iter() - .map(|x| x.into()) - .collect(); - let nodelist: Vec = storage - .get_all_node() - .await - .unwrap() - .into_iter() - .map(|x| x.into()) - .collect(); - let mut lfs_info_list_result = vec![]; - for mut lfs in lfs_info_list { - for node in &nodelist { - if lfs.peer_id == node.peer_id { - lfs.peer_online = node.online; - } - } - lfs_info_list_result.push(lfs.clone()); - } - lfs_info_list_result -} - -async fn send_message( - Query(query): Query>, - state: State, -) -> Result, (StatusCode, String)> { - let ztm_agent_port = state.relay_option.ztm_agent_port; - let peer_id = match query.get("peer_id") { - Some(i) => i.to_string(), - None => { - return Err(( - StatusCode::BAD_REQUEST, - String::from("peer_id not provide\n"), - )); - } - }; - let path = match query.get("path") { - Some(i) => i.to_string(), - None => { - return Err((StatusCode::BAD_REQUEST, String::from("path not provide\n"))); - } - }; - let result = match send_get_request_to_peer_by_tunnel(ztm_agent_port, peer_id, path).await { - Ok(s) => s, - Err(e) => { - tracing::error!(e); - return Err((StatusCode::INTERNAL_SERVER_ERROR, e)); - } - }; +// pub async fn ping( +// Query(query): Query, +// state: State, +// ) -> Result, (StatusCode, String)> { +// let storage = state.context.services.ztm_storage.clone(); +// let node: ztm_node::Model = match query.try_into() { +// Ok(n) => n, +// Err(_) => { +// return Err((StatusCode::BAD_REQUEST, "invalid paras".to_string())); +// } +// }; +// match storage.insert_or_update_node(node).await { +// Ok(_) => Ok(Json(RelayResultRes { success: true })), +// Err(_) => Err(( +// StatusCode::INTERNAL_SERVER_ERROR, +// "invalid paras".to_string(), +// )), +// } +// } - Ok(Json(result)) -} +// pub async fn node_list( +// Query(_query): Query, +// state: State, +// ) -> Result>, (StatusCode, String)> { +// let storage = state.context.services.ztm_storage.clone(); +// let nodelist: Vec = storage +// .get_all_node() +// .await +// .unwrap() +// .into_iter() +// .map(|x| x.into()) +// .collect(); +// Ok(Json(nodelist)) +// } -pub async fn lfs_chunk( - Query(query): Query, - state: State, -) -> Result, (StatusCode, String)> { - if query.file_hash.is_none() { - return Err((StatusCode::BAD_REQUEST, "not enough paras".to_string())); - } - let file_hash = query.file_hash.unwrap().clone(); +// pub async fn repo_provide( +// state: State, +// Json(repo_info): Json, +// ) -> Result, (StatusCode, String)> { +// if repo_info.identifier.is_empty() { +// return Err((StatusCode::BAD_REQUEST, "paras invalid".to_string())); +// } +// let repo_info_model: ztm_repo_info::Model = repo_info.into(); +// let storage = state.context.services.ztm_storage.clone(); +// match storage.insert_or_update_repo_info(repo_info_model).await { +// Ok(_) => Ok(Json(RelayResultRes { success: true })), +// Err(_) => Err(( +// StatusCode::INTERNAL_SERVER_ERROR, +// "invalid paras".to_string(), +// )), +// } +// } - let lfs_object = state - .context - .services - .lfs_db_storage - .get_lfs_object(file_hash.clone()) - .await - .unwrap(); +// pub async fn repo_list( +// Query(_query): Query, +// state: State, +// ) -> Result>, (StatusCode, String)> { +// let storage = state.context.services.ztm_storage.clone(); +// let repo_info_list: Vec = storage +// .get_all_repo_info() +// .await +// .unwrap() +// .into_iter() +// .map(|x| x.into()) +// .collect(); +// let nodelist: Vec = storage +// .get_all_node() +// .await +// .unwrap() +// .into_iter() +// .map(|x| x.into()) +// .collect(); +// let mut repo_info_list_result = vec![]; +// for mut repo in repo_info_list { +// for node in &nodelist { +// if repo.origin == node.peer_id { +// repo.peer_online = node.online; +// } +// } +// repo_info_list_result.push(repo.clone()); +// } +// Ok(Json(repo_info_list_result)) +// } - if lfs_object.is_none() { - return Err(( - StatusCode::INTERNAL_SERVER_ERROR, - "lfs chunk info not found".to_string(), - )); - } +// pub async fn lfs_share( +// state: State, +// Json(lfs_info): Json, +// ) -> Result, (StatusCode, String)> { +// let ztm_lfs_model: ztm_lfs_info::Model = lfs_info.into(); +// let storage = state.context.services.ztm_storage.clone(); +// match storage.insert_lfs_info(ztm_lfs_model).await { +// Ok(_) => Ok(Json(RelayResultRes { success: true })), +// Err(_) => Err(( +// StatusCode::INTERNAL_SERVER_ERROR, +// "invalid paras".to_string(), +// )), +// } +// } - let mut lfs_object_chunks: Vec = state - .context - .services - .lfs_db_storage - .get_lfs_relations(file_hash) - .await - .unwrap() - .iter() - .map(|x| x.clone().into()) - .collect(); +// pub async fn lfs_list( +// Query(_query): Query, +// state: State, +// ) -> Result>, (StatusCode, String)> { +// let lfs_info_list_result = lfs_list_handler(state).await; +// Ok(Json(lfs_info_list_result)) +// } - let mut lfs_info_res: LFSInfoRes = lfs_object.unwrap().into(); - lfs_info_res.chunks.append(&mut lfs_object_chunks); +// async fn lfs_list_handler(state: State) -> Vec { +// let storage = state.context.services.ztm_storage.clone(); +// let lfs_info_list: Vec = storage +// .get_all_lfs_info() +// .await +// .unwrap() +// .into_iter() +// .map(|x| x.into()) +// .collect(); +// let nodelist: Vec = storage +// .get_all_node() +// .await +// .unwrap() +// .into_iter() +// .map(|x| x.into()) +// .collect(); +// let mut lfs_info_list_result = vec![]; +// for mut lfs in lfs_info_list { +// for node in &nodelist { +// if lfs.peer_id == node.peer_id { +// lfs.peer_online = node.online; +// } +// } +// lfs_info_list_result.push(lfs.clone()); +// } +// lfs_info_list_result +// } - Ok(Json(lfs_info_res)) -} +// async fn send_message( +// Query(query): Query>, +// state: State, +// ) -> Result, (StatusCode, String)> { +// let ztm_agent_port = state.relay_option.ztm_agent_port; +// let peer_id = match query.get("peer_id") { +// Some(i) => i.to_string(), +// None => { +// return Err(( +// StatusCode::BAD_REQUEST, +// String::from("peer_id not provide\n"), +// )); +// } +// }; +// let path = match query.get("path") { +// Some(i) => i.to_string(), +// None => { +// return Err((StatusCode::BAD_REQUEST, String::from("path not provide\n"))); +// } +// }; +// let result = match send_get_request_to_peer_by_tunnel(ztm_agent_port, peer_id, path).await { +// Ok(s) => s, +// Err(e) => { +// tracing::error!(e); +// return Err((StatusCode::INTERNAL_SERVER_ERROR, e)); +// } +// }; -async fn loop_running(context: Context) { - let mut interval = tokio::time::interval(Duration::from_secs(60)); +// Ok(Json(result)) +// } - loop { - check_nodes_online(context.clone()).await; - // ping_self(context.clone()).await; - interval.tick().await; - } -} +// pub async fn lfs_chunk( +// Query(query): Query, +// state: State, +// ) -> Result, (StatusCode, String)> { +// if query.file_hash.is_none() { +// return Err((StatusCode::BAD_REQUEST, "not enough paras".to_string())); +// } +// let file_hash = query.file_hash.unwrap().clone(); + +// let lfs_object = state +// .context +// .services +// .lfs_db_storage +// .get_lfs_object(file_hash.clone()) +// .await +// .unwrap(); + +// if lfs_object.is_none() { +// return Err(( +// StatusCode::INTERNAL_SERVER_ERROR, +// "lfs chunk info not found".to_string(), +// )); +// } -async fn check_nodes_online(context: Context) { - let storage = context.services.ztm_storage.clone(); - let nodelist: Vec = - storage.get_all_node().await.unwrap().into_iter().collect(); - for mut node in nodelist { - if !node.online { - continue; - } - //check online - let from_timestamp = Duration::from_millis(node.last_online_time as u64); - let now = SystemTime::now(); - let elapsed = match now.duration_since(SystemTime::UNIX_EPOCH) { - Ok(dur) => dur, - Err(_) => { - continue; - } - }; - if elapsed.as_secs() > from_timestamp.as_secs() + 60 { - node.online = false; - storage.update_node(node.clone()).await.unwrap(); - } - } -} +// let mut lfs_object_chunks: Vec = state +// .context +// .services +// .lfs_db_storage +// .get_lfs_relations(file_hash) +// .await +// .unwrap() +// .iter() +// .map(|x| x.clone().into()) +// .collect(); + +// let mut lfs_info_res: LFSInfoRes = lfs_object.unwrap().into(); +// lfs_info_res.chunks.append(&mut lfs_object_chunks); + +// Ok(Json(lfs_info_res)) +// } // async fn ping_self(context: Context) { // let storage = context.services.ztm_storage.clone(); diff --git a/cert.der b/cert.der new file mode 100644 index 0000000000000000000000000000000000000000..1d003571265b59c9bc149b8928634206445da2e1 GIT binary patch literal 355 zcmXqLVvILvVq{ss%*4pVB;uJgC2wm9BilLedmC-I_CAkdu9h?4V&l+i^EhYA!pvl# zXee(W%f=ka!ptL9l$@TLr%;@llcrFdnVy%LqL7?gRAQhY&TDRJU|k4&7@^iPGqN)~F|b&!wt3xGcVpg{*O%06qzhK9Z`!a+pt#g-i|m3A zyb7_EhYxCKpF10{dqLV1@%hyqJ)0lQ*=tZSW3t%f?O#v2edSmzVIXE80`!8cAPcVn zcN1q$esW??Mt*S#vPYRc84TQ*6d8^lTd!U4@yF8*8t=Sw_i#9E*6zTme}CGFsaKu7T`Tg1kFJmd08s;WY5)KL literal 0 HcmV?d00001 diff --git a/common/src/model.rs b/common/src/model.rs index ece39aa1..a47e2dea 100644 --- a/common/src/model.rs +++ b/common/src/model.rs @@ -8,15 +8,9 @@ pub struct CommonOptions { } #[derive(Args, Clone, Debug)] -pub struct ZtmOptions { - #[arg(long, default_value_t = 7777)] - pub ztm_agent_port: u16, - +pub struct P2pOptions { #[arg(long)] pub bootstrap_node: Option, - - #[arg(long, default_value_t = false)] - pub cache: bool, } #[derive(Deserialize, Debug)] diff --git a/gateway/src/api/mod.rs b/gateway/src/api/mod.rs index bab02a7a..74e2058c 100644 --- a/gateway/src/api/mod.rs +++ b/gateway/src/api/mod.rs @@ -1,14 +1,14 @@ -use common::model::ZtmOptions; +use common::model::P2pOptions; use mono::api::MonoApiServiceState; pub mod github_router; +mod model; pub mod nostr_router; pub mod ztm_router; -mod model; #[derive(Clone)] pub struct MegaApiServiceState { pub inner: MonoApiServiceState, pub port: u16, - pub ztm: ZtmOptions, + pub p2p: P2pOptions, } diff --git a/gateway/src/api/nostr_router.rs b/gateway/src/api/nostr_router.rs index a2dd35f8..670a88a5 100644 --- a/gateway/src/api/nostr_router.rs +++ b/gateway/src/api/nostr_router.rs @@ -123,7 +123,7 @@ async fn send( .unwrap() .unwrap(); - let bootstrap_node = match state.ztm.bootstrap_node.clone() { + let bootstrap_node = match state.p2p.bootstrap_node.clone() { Some(b) => b, None => { return Err(( diff --git a/gateway/src/api/ztm_router.rs b/gateway/src/api/ztm_router.rs index f8fa655b..316ced3c 100644 --- a/gateway/src/api/ztm_router.rs +++ b/gateway/src/api/ztm_router.rs @@ -1,125 +1,121 @@ -use std::collections::HashMap; +// use std::collections::HashMap; -use axum::{ - extract::{Query, State}, - http::StatusCode, - routing::{get, post}, - Json, Router, -}; +// use axum::{ +// extract::{Query, State}, +// http::StatusCode, +// routing::{get, post}, +// Json, Router, +// }; -use callisto::ztm_path_mapping; -use common::model::CommonResult; -use gemini::nostr::subscribe_git_event; -use vault::get_peerid; +// use callisto::ztm_path_mapping; +// use common::model::CommonResult; +// use gemini::nostr::subscribe_git_event; +// use vault::get_peerid; -use crate::api::model::RepoProvideQuery; -use crate::api::MegaApiServiceState; +// use crate::api::model::RepoProvideQuery; +// use crate::api::MegaApiServiceState; -pub fn routers() -> Router { - Router::new() - .route("/ztm/repo_provide", post(repo_provide)) - .route("/ztm/repo_fork", get(repo_fork)) - .route("/ztm/peer_id", get(peer_id)) - .route("/ztm/alias_to_path", get(alias_to_path)) -} +// pub fn routers() -> Router { +// Router::new() +// .route("/ztm/repo_provide", post(repo_provide)) +// .route("/ztm/repo_fork", get(repo_fork)) +// .route("/ztm/peer_id", get(peer_id)) +// .route("/ztm/alias_to_path", get(alias_to_path)) +// } -async fn repo_provide( - state: State, - Json(json): Json, -) -> Result>, (StatusCode, String)> { - let bootstrap_node = match state.ztm.bootstrap_node.clone() { - Some(b) => b.clone(), - None => { - return Err(( - StatusCode::INTERNAL_SERVER_ERROR, - String::from("Bootstrap node not provide\n"), - )); - } - }; - let RepoProvideQuery { path, alias } = json.clone(); - let context = state.inner.context.clone(); - let model: ztm_path_mapping::Model = json.into(); - context - .services - .ztm_storage - .save_alias_mapping(model.clone()) - .await - .map_err(|_| (StatusCode::BAD_REQUEST, String::from("Invalid Params")))?; - let res = match gemini::http::handler::repo_provide( - bootstrap_node, - state.inner.context.clone(), - path, - alias, - get_peerid(), - ) - .await - { - Ok(s) => CommonResult::success(Some(s)), - Err(err) => CommonResult::failed(err.as_str()), - }; - Ok(Json(res)) -} +// async fn repo_provide( +// state: State, +// Json(json): Json, +// ) -> Result>, (StatusCode, String)> { +// let bootstrap_node = match state.p2p.bootstrap_node.clone() { +// Some(b) => b.clone(), +// None => { +// return Err(( +// StatusCode::INTERNAL_SERVER_ERROR, +// String::from("Bootstrap node not provide\n"), +// )); +// } +// }; +// let RepoProvideQuery { path, alias } = json.clone(); +// let context = state.inner.context.clone(); +// let model: ztm_path_mapping::Model = json.into(); +// context +// .services +// .ztm_storage +// .save_alias_mapping(model.clone()) +// .await +// .map_err(|_| (StatusCode::BAD_REQUEST, String::from("Invalid Params")))?; +// let res = match gemini::http::handler::repo_provide( +// bootstrap_node, +// state.inner.context.clone(), +// path, +// alias, +// get_peerid(), +// ) +// .await +// { +// Ok(s) => CommonResult::success(Some(s)), +// Err(err) => CommonResult::failed(err.as_str()), +// }; +// Ok(Json(res)) +// } -async fn repo_fork( - Query(query): Query>, - state: State, -) -> Result>, (StatusCode, String)> { - let identifier = match query.get("identifier") { - Some(i) => i, - None => { - return Err(( - StatusCode::BAD_REQUEST, - String::from("Identifier not provide\n"), - )); - } - }; +// async fn repo_fork( +// Query(query): Query>, +// state: State, +// ) -> Result>, (StatusCode, String)> { +// let identifier = match query.get("identifier") { +// Some(i) => i, +// None => { +// return Err(( +// StatusCode::BAD_REQUEST, +// String::from("Identifier not provide\n"), +// )); +// } +// }; - let res = gemini::http::handler::repo_folk_alias( - state.ztm.ztm_agent_port, - identifier.clone().to_string(), - ) - .await; - let res = match res { - Ok(data) => CommonResult::success(Some(data)), - Err(err) => CommonResult::failed(&err.to_string()), - }; +// let res = gemini::http::handler::repo_folk_alias(7777, identifier.clone().to_string()).await; +// let res = match res { +// Ok(data) => CommonResult::success(Some(data)), +// Err(err) => CommonResult::failed(&err.to_string()), +// }; - //nostr subscribe to Events - if let Some(bootstrap_node) = state.ztm.bootstrap_node.clone() { - let _ = subscribe_git_event(identifier.to_string(), get_peerid(), bootstrap_node).await; - } +// //nostr subscribe to Events +// if let Some(bootstrap_node) = state.p2p.bootstrap_node.clone() { +// let _ = subscribe_git_event(identifier.to_string(), get_peerid(), bootstrap_node).await; +// } - Ok(Json(res)) -} +// Ok(Json(res)) +// } -async fn peer_id( - Query(_query): Query>, - _state: State, -) -> Result>, (StatusCode, String)> { - let (peer_id, _) = vault::init(); - Ok(Json(CommonResult::success(Some(peer_id)))) -} +// async fn peer_id( +// Query(_query): Query>, +// _state: State, +// ) -> Result>, (StatusCode, String)> { +// let (peer_id, _) = vault::init(); +// Ok(Json(CommonResult::success(Some(peer_id)))) +// } -async fn alias_to_path( - Query(query): Query>, - state: State, -) -> Result>, (StatusCode, String)> { - let context = state.inner.context.clone(); - let alias = match query.get("alias") { - Some(str) => str, - None => { - return Err((StatusCode::BAD_REQUEST, String::from("Alias not provide\n"))); - } - }; - let res: Option = context - .services - .ztm_storage - .get_path_from_alias(alias) - .await - .map_err(|_| (StatusCode::BAD_REQUEST, String::from("Invalid Params")))?; - if let Some(res) = res { - Ok(Json(CommonResult::success(Some(res.repo_path)))) - } else { - Err((StatusCode::BAD_REQUEST, String::from("Alias not found\n"))) - } -} +// async fn alias_to_path( +// Query(query): Query>, +// state: State, +// ) -> Result>, (StatusCode, String)> { +// let context = state.inner.context.clone(); +// let alias = match query.get("alias") { +// Some(str) => str, +// None => { +// return Err((StatusCode::BAD_REQUEST, String::from("Alias not provide\n"))); +// } +// }; +// let res: Option = context +// .services +// .ztm_storage +// .get_path_from_alias(alias) +// .await +// .map_err(|_| (StatusCode::BAD_REQUEST, String::from("Invalid Params")))?; +// if let Some(res) = res { +// Ok(Json(CommonResult::success(Some(res.repo_path)))) +// } else { +// Err((StatusCode::BAD_REQUEST, String::from("Alias not found\n"))) +// } +// } diff --git a/gateway/src/https_server.rs b/gateway/src/https_server.rs index d4eaf0b7..9fd526e5 100644 --- a/gateway/src/https_server.rs +++ b/gateway/src/https_server.rs @@ -14,7 +14,7 @@ use tower_http::cors::{Any, CorsLayer}; use tower_http::decompression::RequestDecompressionLayer; use tower_http::trace::TraceLayer; -use common::model::{CommonOptions, ZtmOptions}; +use common::model::{CommonOptions, P2pOptions}; use gemini::ztm::agent::{run_ztm_client, LocalZTMAgent}; use jupiter::context::Context; use mono::api::lfs::lfs_router; @@ -29,7 +29,7 @@ pub struct HttpOptions { pub common: CommonOptions, #[clap(flatten)] - pub ztm: ZtmOptions, + pub p2p: P2pOptions, #[arg(long, default_value_t = 8000)] pub http_port: u16, @@ -41,7 +41,7 @@ pub struct HttpsOptions { pub common: CommonOptions, #[clap(flatten)] - pub ztm: ZtmOptions, + pub p2p: P2pOptions, #[arg(long, default_value_t = 443)] pub https_port: u16, @@ -59,17 +59,17 @@ pub async fn https_server(context: Context, options: HttpsOptions) { https_key_path, https_cert_path, https_port, - ztm, + p2p, } = options.clone(); - check_run_with_ztm(context.clone(), options.ztm.clone(), https_port); + check_run_with_p2p(context.clone(), options.p2p.clone()); let app = app( context, host.clone(), https_port, options.common.clone(), - ztm.clone(), + p2p.clone(), ) .await; @@ -88,17 +88,17 @@ pub async fn http_server(context: Context, options: HttpOptions) { let HttpOptions { common: CommonOptions { host, .. }, http_port, - ztm, + p2p, } = options.clone(); - check_run_with_ztm(context.clone(), options.ztm.clone(), http_port); + check_run_with_p2p(context.clone(), options.p2p.clone()); let app = app( context, host.clone(), http_port, options.common.clone(), - ztm.clone(), + p2p.clone(), ) .await; @@ -116,7 +116,7 @@ pub async fn app( host: String, port: u16, common: CommonOptions, - ztm: ZtmOptions, + p2p: P2pOptions, ) -> Router { let state = AppState { host, @@ -132,7 +132,7 @@ pub async fn app( oauth_client: None, store: None, }, - ztm, + p2p, port, }; @@ -145,7 +145,7 @@ pub async fn app( pub fn mega_routers() -> Router { Router::new() - .merge(ztm_router::routers()) + // .merge(ztm_router::routers()) .merge(nostr_router::routers()) .merge(github_router::routers()) } @@ -179,41 +179,16 @@ pub async fn app( .with_state(state) } -pub fn check_run_with_ztm(context: Context, ztm: ZtmOptions, http_port: u16) { +pub fn check_run_with_p2p(_context: Context, p2p: P2pOptions) { //Mega server join a ztm mesh - match ztm.bootstrap_node { + match p2p.bootstrap_node { Some(bootstrap_node) => { tracing::info!( - "The bootstrap node is {}, prepare to join ztm network", + "The bootstrap node is {}, prepare to join p2p network", bootstrap_node.clone() ); - let (peer_id, _) = vault::init(); - let ztm_agent: LocalZTMAgent = LocalZTMAgent { - agent_port: ztm.ztm_agent_port, - }; - ztm_agent.clone().start_ztm_agent(); - thread::sleep(time::Duration::from_secs(3)); - - let bootstrap_node_clone = bootstrap_node.clone(); - let config_clone = context.config.clone(); - let ztm_agent_clone = ztm_agent.clone(); - tokio::spawn(async move { - run_ztm_client( - bootstrap_node_clone, - config_clone, - peer_id, - ztm_agent_clone, - http_port, - ) - .await - }); - - if ztm.cache { - thread::sleep(time::Duration::from_secs(3)); - tokio::spawn(async move { - cache_public_repo_and_lfs(bootstrap_node, context, ztm_agent, http_port).await - }); - } + + tokio::spawn(async move { gemini::p2p::client::run(bootstrap_node).await }); } None => { tracing::info!("The bootstrap node is not set, prepare to start mega server locally"); diff --git a/gemini/Cargo.toml b/gemini/Cargo.toml index e2e8b567..55e74792 100644 --- a/gemini/Cargo.toml +++ b/gemini/Cargo.toml @@ -28,3 +28,9 @@ secp256k1 = { workspace = true, features = ["serde", "rand", "hashes"] } ring = "0.17.8" hex = { workspace = true } async-trait = { workspace = true } +quinn = "0.11" +anyhow = { workspace = true } +lazy_static = { workspace = true } +rcgen = "0.13" +dashmap = "6.0.1" +uuid = { workspace = true } \ No newline at end of file diff --git a/gemini/src/lib.rs b/gemini/src/lib.rs index 9d9b4fcd..3372d2fd 100644 --- a/gemini/src/lib.rs +++ b/gemini/src/lib.rs @@ -11,6 +11,7 @@ pub mod cache; pub mod http; pub mod lfs; pub mod nostr; +pub mod p2p; pub mod util; pub mod ztm; diff --git a/gemini/src/p2p/client.rs b/gemini/src/p2p/client.rs new file mode 100644 index 00000000..6c8b410d --- /dev/null +++ b/gemini/src/p2p/client.rs @@ -0,0 +1,107 @@ +use std::net::SocketAddr; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::anyhow; +use anyhow::{bail, Context, Result}; +use quinn::crypto::rustls::QuicClientConfig; +use quinn::{rustls, ClientConfig, Connection, Endpoint}; +use tokio::sync::mpsc; +use tracing::info; +use uuid::Uuid; + +use crate::p2p::relay::ReceiveData; +use crate::p2p::Action; + +use super::{get_certificate, ALPN_QUIC_HTTP}; + +pub async fn run(bootstrap_node: String) -> Result<()> { + info!("Start"); + rustls::crypto::ring::default_provider() + .install_default() + .expect("Failed to install rustls crypto provider"); + + let connection = get_client_connection(bootstrap_node).await?; + + let remote_address = connection.remote_address(); + let stable_id = connection.stable_id(); + info!("Established connection: {remote_address:#?},{stable_id:#?}"); + + let connection = Arc::new(connection); + let connection_clone = connection.clone(); + + let (tx, mut rx) = mpsc::channel(8); + + let peer_id = vault::get_peerid(); + + tokio::spawn(async move { + loop { + let (mut quic_send, _) = connection_clone.open_bi().await.unwrap(); + let ping = ReceiveData { + from: peer_id.clone(), + data: vec![], + func: "".to_string(), + action: Action::Ping, + to: "".to_string(), + req_id: Uuid::new_v4().into(), + }; + let json = serde_json::to_string(&ping).unwrap(); + quic_send.write_all(json.as_ref()).await.unwrap(); + quic_send.finish().unwrap(); + tokio::time::sleep(Duration::from_secs(20)).await; + } + }); + + let connection_clone = connection.clone(); + tokio::spawn(async move { + loop { + let (_, mut quic_recv) = connection_clone.accept_bi().await.unwrap(); + let buffer = quic_recv.read_to_end(1024 * 1024).await.unwrap(); + info!("QUIC Received:\n{}", String::from_utf8_lossy(&*buffer)); + if tx.send(buffer).await.is_err() { + info!("Receiver closed"); + return; + } + } + }); + + while let Some(message) = rx.recv().await { + //TODO with the message + info!( + "Channel Received message: {}", + String::from_utf8_lossy(&message) + ); + } + + Ok(()) +} + +pub async fn get_client_connection(bootstrap_node: String) -> anyhow::Result { + let (certs, _key) = get_certificate().await?; + + let mut roots = rustls::RootCertStore::empty(); + + for ele in certs { + roots.add(ele)?; + } + + let mut client_crypto = rustls::ClientConfig::builder() + .with_root_certificates(roots) + .with_no_client_auth(); + + client_crypto.alpn_protocols = ALPN_QUIC_HTTP.iter().map(|&x| x.into()).collect(); + let client_config = ClientConfig::new(Arc::new(QuicClientConfig::try_from(client_crypto)?)); + info!("Connection"); + let mut endpoint = Endpoint::client(SocketAddr::from_str("127.0.0.1:0").unwrap())?; + info!("Connection2"); + endpoint.set_default_client_config(client_config); + + let server_addr: SocketAddr = bootstrap_node.parse()?; + let conn = endpoint + .connect(server_addr, "localhost")? + .await + .map_err(|e| anyhow!("failed to connect: {}", e))?; + info!("Connection3"); + Ok(conn) +} diff --git a/gemini/src/p2p/mod.rs b/gemini/src/p2p/mod.rs new file mode 100644 index 00000000..1a92246e --- /dev/null +++ b/gemini/src/p2p/mod.rs @@ -0,0 +1,68 @@ +use std::{fmt, fs, io}; + +use anyhow::{bail, Context}; +use quinn::rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; +use serde::{Deserialize, Serialize}; +use tracing::info; + +pub mod client; +pub mod relay; + +pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"]; + +#[derive(Serialize, Deserialize, Debug)] +pub enum Action { + Ping, + Send, + Call, + Callback, +} + +impl fmt::Display for Action { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Action::Ping => { + write!(f, "Ping") + } + Action::Send => { + write!(f, "Send") + } + Action::Call => { + write!(f, "Call") + } + Action::Callback => { + write!(f, "Callback") + } + } + } +} + +pub async fn get_certificate( +) -> anyhow::Result<(Vec>, PrivateKeyDer<'static>)> { + let (certs, key) = { + let cert_path = "cert.der"; + let key_path = "key.der"; + let (cert, key) = match fs::read(&cert_path).and_then(|x| Ok((x, fs::read(&key_path)?))) { + Ok((cert, key)) => ( + CertificateDer::from(cert), + PrivateKeyDer::try_from(key).map_err(anyhow::Error::msg)?, + ), + Err(ref e) if e.kind() == io::ErrorKind::NotFound => { + info!("generating self-signed certificate"); + let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap(); + let key = PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()); + let cert = cert.cert.into(); + fs::write(&cert_path, &cert).context("failed to write certificate")?; + fs::write(&key_path, key.secret_pkcs8_der()) + .context("failed to write private key")?; + (cert, key.into()) + } + Err(e) => { + bail!("failed to read certificate: {}", e); + } + }; + + (vec![cert], key) + }; + Ok((certs, key)) +} diff --git a/gemini/src/p2p/relay.rs b/gemini/src/p2p/relay.rs new file mode 100644 index 00000000..49f06e5e --- /dev/null +++ b/gemini/src/p2p/relay.rs @@ -0,0 +1,257 @@ +use anyhow::anyhow; +use anyhow::{bail, Context, Result}; +use dashmap::DashMap; +use lazy_static::lazy_static; +use quinn::{ + crypto::rustls::QuicServerConfig, + rustls::{ + self, + pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}, + }, + RecvStream, SendStream, +}; +use serde::{Deserialize, Serialize}; +use std::{fs, io, net::SocketAddr, str::FromStr, sync::Arc}; +use tracing::{error, info, info_span, Instrument}; + +use crate::p2p::{get_certificate, ALPN_QUIC_HTTP}; + +use super::Action; + +lazy_static! { + static ref Session: DashMap> = DashMap::new(); + static ref REQ_ID_MAP: DashMap> = DashMap::new(); +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct ReceiveData { + pub from: String, + pub data: Vec, + pub func: String, + pub action: Action, + pub to: String, + pub req_id: String, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct SenderData { + pub from: String, + pub data: Vec, + pub func: String, + pub err: String, + pub to: String, + pub req_id: String, +} + +pub async fn run(host: String, port: u16) -> Result<()> { + rustls::crypto::ring::default_provider() + .install_default() + .expect("Failed to install rustls crypto provider"); + + let (certs, key) = get_certificate().await?; + + let mut server_crypto = rustls::ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(certs, key)?; + server_crypto.alpn_protocols = ALPN_QUIC_HTTP.iter().map(|&x| x.into()).collect(); + + let server_config = + quinn::ServerConfig::with_crypto(Arc::new(QuicServerConfig::try_from(server_crypto)?)); + + let addr = format!("{}:{}", host, port); + let endpoint = + quinn::Endpoint::server(server_config, SocketAddr::from_str(addr.as_str()).unwrap())?; + info!("listening on {}", endpoint.local_addr()?); + + while let Some(conn) = endpoint.accept().await { + { + info!("accepting connection"); + let fut = handle_connection(conn); + tokio::spawn(async move { + if let Err(e) = fut.await { + error!("connection failed: {reason}", reason = e.to_string()) + } + }); + } + } + + Ok(()) +} + +async fn handle_connection(conn: quinn::Incoming) -> Result<()> { + let connection = conn.await?; + let span = info_span!( + "connection", + remote = %connection.remote_address(), + protocol = %connection + .handshake_data() + .unwrap() + .downcast::().unwrap() + .protocol + .map_or_else(|| "".into(), |x| String::from_utf8_lossy(&x).into_owned()) + ); + async { + let remote_address = connection.remote_address(); + let local_ip = connection.local_ip().unwrap(); + let stable_id = connection.stable_id(); + info!("established connection: {remote_address:#?},{local_ip:#?},{stable_id:#?}"); + let connection = Arc::new(connection); + + // Each stream initiated by the client constitutes a new request. + loop { + let connection_clone = connection.clone(); + let stream = connection_clone.accept_bi().await; + let stream = match stream { + Err(quinn::ConnectionError::ApplicationClosed { .. }) => { + info!("connection closed"); + return Ok(()); + } + Err(e) => { + info!("connection error:{}", e); + return Err(e); + } + Ok(s) => s, + }; + + // let fut = handle_request(sender.clone(), stream.1); + let connection_clone = connection.clone(); + let fut = handle_receive(stream.0, stream.1, connection_clone); + + tokio::spawn( + async move { + if let Err(e) = fut.await { + error!("failed: {reason}", reason = e.to_string()); + } + } + .instrument(info_span!("request")), + ); + } + } + .instrument(span) + .await?; + Ok(()) +} + +async fn handle_receive( + mut _sender: SendStream, + mut recv: RecvStream, + connection: Arc, +) -> anyhow::Result<()> { + let buffer_vec = recv.read_to_end(1024 * 10).await?; + if buffer_vec.is_empty() { + println!("QUIC Received is empty"); + return Ok(()); + } + let result = String::from_utf8_lossy(&*buffer_vec); + + let data: ReceiveData = match serde_json::from_str(&*result) { + Ok(data) => data, + Err(e) => { + error!("QUIC Received Error:\n{:?}", e); + return Err(anyhow!("QUIC Received Error:\n{:?}", e)); + } + }; + info!( + "QUIC Received Message from[{}], Action[{}]:\n", + data.from, data.action + ); + match data.action { + Action::Ping => { + Session.insert(data.from.clone(), connection.clone()); + let sender_data = SenderData { + from: "".to_string(), + data: "ok".as_bytes().to_vec(), + func: data.func.clone(), + err: "".to_string(), + to: data.from.to_string(), + req_id: data.req_id, + }; + let json = serde_json::to_string(&sender_data)?; + let (mut quic_send, _) = connection.clone().open_bi().await?; + quic_send.write_all(&json.as_bytes()).await?; + quic_send.finish()?; + } + Action::Send => { + let connection = match Session.get(data.to.as_str()) { + None => { + error!("Failed to find connection to {}", data.to); + return Err(anyhow!("Failed to find connection to {}", data.to)); + } + Some(conn) => conn, + }; + + let sender_data = SenderData { + from: data.from.to_string(), + data: data.data, + func: data.func.clone(), + err: "".to_string(), + to: data.to.to_string(), + req_id: data.req_id, + }; + let json = serde_json::to_string(&sender_data)?; + let (mut send, _) = connection.open_bi().await?; + send.write_all(&json.as_bytes()).await?; + send.finish()?; + } + Action::Call => { + { + let connection_to = match Session.get(data.to.as_str()) { + None => { + error!("Failed to find connection to {}", data.to); + return Err(anyhow!("Failed to find connection to {}", data.to)); + } + Some(conn) => conn, + }; + let sender_data = SenderData { + from: data.from.to_string(), + data: data.data, + func: data.func.clone(), + err: "".to_string(), + to: data.to.to_string(), + req_id: data.req_id.clone(), + }; + let json = serde_json::to_string(&sender_data)?; + let (mut send, _) = connection_to.open_bi().await?; + send.write_all(&json.as_bytes()).await?; + send.finish()?; + } + let from_connection = connection; + REQ_ID_MAP.insert(data.req_id.to_string(), from_connection.clone()); + Session.insert(data.from.clone(), from_connection.clone()); + } + Action::Callback => { + { + let connection = match REQ_ID_MAP.get(data.req_id.as_str()) { + None => { + error!("Failed to find connection req {}", data.req_id); + return Err(anyhow!("Failed to find connection req {}", data.req_id)); + } + Some(conn) => conn, + }; + let sender_data = SenderData { + from: data.from.to_string(), + data: data.data, + func: data.func.clone(), + err: "".to_string(), + to: data.to.to_string(), + req_id: data.req_id.clone(), + }; + let json = serde_json::to_string(&sender_data)?; + let (mut send, _) = connection.open_bi().await?; + send.write_all(&json.as_bytes()).await?; + send.finish()?; + } + REQ_ID_MAP.remove(data.req_id.as_str()); + } + } + + { + let peers: Vec = Session.iter().map(|entry| entry.key().clone()).collect(); + info!("Online peers num: {}", peers.len()); + for x in peers { + info!("Online peer: {}", x.to_string()); + } + } + + Ok(()) +} diff --git a/key.der b/key.der new file mode 100644 index 0000000000000000000000000000000000000000..ec58012813300300b5b1e8f569843a1de14d35d2 GIT binary patch literal 138 zcmV;50CoQ`frkPC05B5<2P%e0&OHJF1_&yKNX|V20S5$aFlzz<0R$jzepd$=ydslP zu8SG+_rRyTDK+OH!5kr7=M6k|*17+oL<2$q1Uaib>yCcdp7iU|COjK(s;`2ux)5`9 sKC~U6@C_hcdBeddE6&YOyP#&27oU5GjI-dJy)bx~lNOV=^~px{2mtXp3jhEB literal 0 HcmV?d00001 diff --git a/mega/src/commands/service/multi.rs b/mega/src/commands/service/multi.rs index 0116c354..12d0124b 100644 --- a/mega/src/commands/service/multi.rs +++ b/mega/src/commands/service/multi.rs @@ -5,7 +5,7 @@ use clap::{ArgMatches, Args, Command, FromArgMatches, ValueEnum}; use common::{ config::Config, errors::MegaResult, - model::{CommonOptions, ZtmOptions}, + model::{CommonOptions, P2pOptions}, }; use gateway::https_server::{self, HttpOptions, HttpsOptions}; use jupiter::context::Context; @@ -26,7 +26,7 @@ pub struct StartOptions { pub common: CommonOptions, #[clap(flatten)] - pub ztm: ZtmOptions, + pub p2p: P2pOptions, #[arg(long, default_value_t = 8000)] pub http_port: u16, @@ -60,14 +60,18 @@ pub(crate) async fn exec(config: Config, args: &ArgMatches) -> MegaResult { let service_type = server_matchers.service; let context = Context::new(config.clone()).await; - context.services.mono_storage.init_monorepo(&config.monorepo).await; + context + .services + .mono_storage + .init_monorepo(&config.monorepo) + .await; let context_clone = context.clone(); let http_server = if service_type.contains(&StartCommand::Http) { let http = HttpOptions { common: server_matchers.common.clone(), http_port: server_matchers.http_port, - ztm: server_matchers.ztm, + p2p: server_matchers.p2p, }; tokio::spawn(async move { https_server::http_server(context_clone, http).await }) } else if service_type.contains(&StartCommand::Https) { @@ -76,7 +80,7 @@ pub(crate) async fn exec(config: Config, args: &ArgMatches) -> MegaResult { https_port: server_matchers.https_port, https_key_path: server_matchers.https_key_path.unwrap(), https_cert_path: server_matchers.https_cert_path.unwrap(), - ztm: server_matchers.ztm, + p2p: server_matchers.p2p, }; tokio::spawn(async move { https_server::https_server(context_clone, https).await }) } else { From 0f1ce74b6f9096775cf240eacc8e549f5683f8dd Mon Sep 17 00:00:00 2001 From: wujian0327 <353981613@qq.com> Date: Thu, 20 Feb 2025 16:36:54 +0800 Subject: [PATCH 2/6] aa --- aries/src/service/api/ca_router.rs | 105 +++++++++++++++++++++++++++++ aries/src/service/api/mod.rs | 1 + aries/src/service/ca_server.rs | 3 + aries/src/service/relay_server.rs | 1 + gateway/src/api/nostr_router.rs | 30 +++++++++ gemini/src/p2p/client.rs | 71 +++++++++++++++++-- 6 files changed, 207 insertions(+), 4 deletions(-) create mode 100644 aries/src/service/api/ca_router.rs diff --git a/aries/src/service/api/ca_router.rs b/aries/src/service/api/ca_router.rs new file mode 100644 index 00000000..184ac3b1 --- /dev/null +++ b/aries/src/service/api/ca_router.rs @@ -0,0 +1,105 @@ +use axum::{ + body::{to_bytes, Body}, + extract::{Query, State}, + http::{Request, Response, StatusCode, Uri}, + routing::get, + Router, +}; +use gemini::RelayGetParams; +use regex::Regex; + +use crate::service::relay_server::AppState; + +pub fn routers() -> Router { + Router::new().route( + "/{*path}", + get(get_method_router) + .post(post_method_router) + .delete(delete_method_router), + ) +} + +async fn get_method_router( + _state: State, + Query(_params): Query, + uri: Uri, +) -> Result, (StatusCode, String)> { + if Regex::new(r"/certificates/[a-zA-Z0-9]+$") + .unwrap() + .is_match(uri.path()) + { + let name = match gemini::ca::get_cert_name_from_path(uri.path()) { + Some(n) => n, + None => { + return gemini::ca::response_error( + StatusCode::BAD_REQUEST.as_u16(), + "Bad request".to_string(), + ) + } + }; + return gemini::ca::get_certificate(name).await; + } + Err(( + StatusCode::NOT_FOUND, + String::from("Operation not supported\n"), + )) +} + +async fn post_method_router( + _state: State, + uri: Uri, + req: Request, +) -> Result, (StatusCode, String)> { + if Regex::new(r"/certificates/[a-zA-Z0-9]+$") + .unwrap() + .is_match(uri.path()) + { + let name = match gemini::ca::get_cert_name_from_path(uri.path()) { + Some(n) => n, + None => { + return gemini::ca::response_error( + StatusCode::BAD_REQUEST.as_u16(), + "Bad request".to_string(), + ) + } + }; + return gemini::ca::issue_certificate(name).await; + } else if Regex::new(r"/sign/[a-zA-Z0-9]+$") + .unwrap() + .is_match(uri.path()) + { + let name = match gemini::ca::get_hub_name_from_path(uri.path()) { + Some(n) => n, + None => { + return gemini::ca::response_error( + StatusCode::BAD_REQUEST.as_u16(), + "Bad request".to_string(), + ) + } + }; + let bytes = to_bytes(req.into_body(), usize::MAX).await.unwrap(); + let pubkey = String::from_utf8(bytes.to_vec()).unwrap(); + return gemini::ca::sign_certificate(name, pubkey).await; + } + Err(( + StatusCode::NOT_FOUND, + String::from("Operation not supported\n"), + )) +} + +async fn delete_method_router( + _state: State, + uri: Uri, + _req: Request, +) -> Result, (StatusCode, String)> { + if Regex::new(r"/certificates/[a-zA-Z0-9]+$") + .unwrap() + .is_match(uri.path()) + { + return gemini::ca::delete_certificate(uri.path()).await; + } + Err(( + StatusCode::NOT_FOUND, + String::from("Operation not supported\n"), + )) +} diff --git a/aries/src/service/api/mod.rs b/aries/src/service/api/mod.rs index 13e3d62e..e854934c 100644 --- a/aries/src/service/api/mod.rs +++ b/aries/src/service/api/mod.rs @@ -1,4 +1,5 @@ pub mod nostr_router; +pub mod ca_router; #[cfg(test)] mod tests {} diff --git a/aries/src/service/ca_server.rs b/aries/src/service/ca_server.rs index 1c9158f5..26098eb3 100755 --- a/aries/src/service/ca_server.rs +++ b/aries/src/service/ca_server.rs @@ -41,7 +41,10 @@ pub async fn app(config: Config, host: String, port: u16) -> Router { port, context: Context::new(config.clone()).await, }; + routers(state) +} +pub fn routers(state: AppState) -> Router { Router::new() .route( "/*path", diff --git a/aries/src/service/relay_server.rs b/aries/src/service/relay_server.rs index fa0c1b2d..0b9e1d8c 100644 --- a/aries/src/service/relay_server.rs +++ b/aries/src/service/relay_server.rs @@ -77,6 +77,7 @@ pub fn routers() -> Router { Router::new() .merge(router) .merge(api::nostr_router::routers()) + .merge(api::ca_router::routers()) } async fn hello() -> Result { diff --git a/gateway/src/api/nostr_router.rs b/gateway/src/api/nostr_router.rs index 670a88a5..20b11a23 100644 --- a/gateway/src/api/nostr_router.rs +++ b/gateway/src/api/nostr_router.rs @@ -19,6 +19,7 @@ use crate::api::MegaApiServiceState; pub fn routers() -> Router { Router::new() .route("/nostr", post(recieve)) + .route("/nostr/quic/send_event", post(send_quic)) .route("/nostr/send_event", post(send)) .route("/nostr/event_list", get(event_list)) } @@ -150,6 +151,35 @@ async fn send( Ok(Json(CommonResult::success(None))) } +async fn send_quic( + state: State, + body: String, +) -> Result>, (StatusCode, String)> { + let bootstrap_node = match state.p2p.bootstrap_node.clone() { + Some(b) => b, + None => { + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "bootstrap node is not set".to_string(), + )); + } + }; + + match gemini::p2p::client::send( + "1".to_string(), + "nostr_event".to_string(), + "hello".as_bytes().to_vec(), + bootstrap_node, + ) + .await + { + Ok(_) => {} + Err(_) => {} + } + + Ok(Json(CommonResult::success(None))) +} + pub async fn event_list( Query(_query): Query>, state: State, diff --git a/gemini/src/p2p/client.rs b/gemini/src/p2p/client.rs index 6c8b410d..1564ae88 100644 --- a/gemini/src/p2p/client.rs +++ b/gemini/src/p2p/client.rs @@ -10,8 +10,9 @@ use quinn::{rustls, ClientConfig, Connection, Endpoint}; use tokio::sync::mpsc; use tracing::info; use uuid::Uuid; +use vault::get_peerid; -use crate::p2p::relay::ReceiveData; +use crate::p2p::relay::{ReceiveData, SenderData}; use crate::p2p::Action; use super::{get_certificate, ALPN_QUIC_HTTP}; @@ -92,9 +93,7 @@ pub async fn get_client_connection(bootstrap_node: String) -> anyhow::Result anyhow::Result, + bootstrap_node: String, +) -> anyhow::Result> { + let (tx, rx) = tokio::sync::oneshot::channel(); + + // 建立 QUIC 连接 + let connection = get_client_connection(bootstrap_node).await?; + + let remote_address = connection.remote_address(); + let stable_id = connection.stable_id(); + info!("established connection: {remote_address:#?},{stable_id:#?}"); + let connection = Arc::new(connection); + + let connection_clone = connection.clone(); + let local_peer_id = get_peerid(); + tokio::spawn(async move { + let (mut sender, _) = connection_clone.open_bi().await.unwrap(); + let send = ReceiveData { + from: local_peer_id.clone(), + data: data.clone(), + func: func.to_string(), + action: Action::Send, + to: to_peer_id.to_string(), + req_id: Uuid::new_v4().into(), + }; + let json = serde_json::to_string(&send).unwrap(); + sender.write_all(json.as_bytes()).await.unwrap(); + sender.finish().unwrap(); + }); + + let connection_clone = connection.clone(); + + tokio::spawn(async move { + let (_, mut quic_recv) = connection_clone.accept_bi().await.unwrap(); + // 等待接收一个新的双向流 + let buffer = quic_recv.read_to_end(1024 * 1024).await.unwrap(); + println!("QUIC Received:\n{}", String::from_utf8_lossy(&*buffer)); + if tx.send(buffer).is_err() { + println!("Receiver closed"); + return; + } + }); + let message = match rx.await { + Ok(r) => r, + Err(e) => { + return Err(anyhow!("QUIC Received Error:\n{:?}", e)); + } + }; + println!( + "Channel Received message: {}", + String::from_utf8_lossy(&message) + ); + let data: SenderData = match serde_json::from_slice(&*message) { + Ok(data) => data, + Err(e) => { + eprintln!("QUIC Received Error:\n{:?}", e); + return Err(anyhow!("QUIC Received Error:\n{:?}", e)); + } + }; + return Ok(data.data); +} From dcddce5e20aa9bba7332a6d818411c8e71a77425 Mon Sep 17 00:00:00 2001 From: wujian0327 <353981613@qq.com> Date: Mon, 24 Feb 2025 11:10:23 +0800 Subject: [PATCH 3/6] feature: add ca service for p2p --- aries/src/service/api/ca_router.rs | 66 ++------- aries/src/service/api/nostr_router.rs | 2 +- aries/src/service/ca_server.rs | 147 -------------------- aries/src/service/mod.rs | 1 - aries/src/service/relay_server.rs | 3 - cert.der | Bin 355 -> 0 bytes gateway/src/api/nostr_router.rs | 34 +---- gateway/src/https_server.rs | 5 +- gemini/Cargo.toml | 4 +- gemini/src/ca/client.rs | 74 +++++++++++ gemini/src/ca/mod.rs | 184 ++++++-------------------- gemini/src/ca/server.rs | 103 ++++++++++++++ gemini/src/p2p/client.rs | 101 ++++++++------ gemini/src/p2p/mod.rs | 37 +----- gemini/src/p2p/relay.rs | 89 +++++++++---- key.der | Bin 138 -> 0 bytes 16 files changed, 360 insertions(+), 490 deletions(-) delete mode 100755 aries/src/service/ca_server.rs delete mode 100644 cert.der create mode 100644 gemini/src/ca/client.rs create mode 100644 gemini/src/ca/server.rs delete mode 100644 key.der diff --git a/aries/src/service/api/ca_router.rs b/aries/src/service/api/ca_router.rs index 184ac3b1..6b2e60d8 100644 --- a/aries/src/service/api/ca_router.rs +++ b/aries/src/service/api/ca_router.rs @@ -11,12 +11,7 @@ use regex::Regex; use crate::service::relay_server::AppState; pub fn routers() -> Router { - Router::new().route( - "/{*path}", - get(get_method_router) - .post(post_method_router) - .delete(delete_method_router), - ) + Router::new().route("/{*path}", get(get_method_router).post(post_method_router)) } async fn get_method_router( @@ -28,16 +23,16 @@ async fn get_method_router( .unwrap() .is_match(uri.path()) { - let name = match gemini::ca::get_cert_name_from_path(uri.path()) { + let name = match gemini::ca::server::get_cert_name_from_path(uri.path()) { Some(n) => n, None => { - return gemini::ca::response_error( - StatusCode::BAD_REQUEST.as_u16(), - "Bad request".to_string(), - ) + return Err((StatusCode::BAD_REQUEST, "Bad request".to_string())); } }; - return gemini::ca::get_certificate(name).await; + return match gemini::ca::server::get_certificate(name).await { + Ok(cert) => Ok(Response::builder().body(Body::from(cert)).unwrap()), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())), + }; } Err(( StatusCode::NOT_FOUND, @@ -54,49 +49,16 @@ async fn post_method_router( .unwrap() .is_match(uri.path()) { - let name = match gemini::ca::get_cert_name_from_path(uri.path()) { + let name = match gemini::ca::server::get_cert_name_from_path(uri.path()) { Some(n) => n, - None => { - return gemini::ca::response_error( - StatusCode::BAD_REQUEST.as_u16(), - "Bad request".to_string(), - ) - } - }; - return gemini::ca::issue_certificate(name).await; - } else if Regex::new(r"/sign/[a-zA-Z0-9]+$") - .unwrap() - .is_match(uri.path()) - { - let name = match gemini::ca::get_hub_name_from_path(uri.path()) { - Some(n) => n, - None => { - return gemini::ca::response_error( - StatusCode::BAD_REQUEST.as_u16(), - "Bad request".to_string(), - ) - } + None => return Err((StatusCode::BAD_REQUEST, "Bad request".to_string())), }; let bytes = to_bytes(req.into_body(), usize::MAX).await.unwrap(); - let pubkey = String::from_utf8(bytes.to_vec()).unwrap(); - return gemini::ca::sign_certificate(name, pubkey).await; - } - Err(( - StatusCode::NOT_FOUND, - String::from("Operation not supported\n"), - )) -} - -async fn delete_method_router( - _state: State, - uri: Uri, - _req: Request, -) -> Result, (StatusCode, String)> { - if Regex::new(r"/certificates/[a-zA-Z0-9]+$") - .unwrap() - .is_match(uri.path()) - { - return gemini::ca::delete_certificate(uri.path()).await; + let csr = String::from_utf8(bytes.to_vec()).unwrap(); + return match gemini::ca::server::issue_certificate(name, csr).await { + Ok(cert) => Ok(Response::builder().body(Body::from(cert)).unwrap()), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())), + }; } Err(( StatusCode::NOT_FOUND, diff --git a/aries/src/service/api/nostr_router.rs b/aries/src/service/api/nostr_router.rs index d99239d1..03b58920 100644 --- a/aries/src/service/api/nostr_router.rs +++ b/aries/src/service/api/nostr_router.rs @@ -113,7 +113,7 @@ async fn recieve( } } -async fn transfer_event_to_subscribed_nodes( +async fn _transfer_event_to_subscribed_nodes( storage: ZTMStorage, nostr_event: NostrEvent, ztm_agent_port: u16, diff --git a/aries/src/service/ca_server.rs b/aries/src/service/ca_server.rs deleted file mode 100755 index 26098eb3..00000000 --- a/aries/src/service/ca_server.rs +++ /dev/null @@ -1,147 +0,0 @@ -use std::net::SocketAddr; -use std::str::FromStr; - -use axum::body::{to_bytes, Body}; -use axum::extract::{Query, State}; -use axum::http::{Request, Response, StatusCode, Uri}; -use axum::routing::get; -use axum::Router; -use common::config::Config; -use gemini::RelayGetParams; -use jupiter::context::Context; -use regex::Regex; -use tower::ServiceBuilder; -use tower_http::cors::{Any, CorsLayer}; -use tower_http::decompression::RequestDecompressionLayer; -use tower_http::trace::TraceLayer; - -pub async fn run_ca_server(config: Config, port: u16) { - let host = "127.0.0.1".to_string(); - let app = app(config.clone(), host.clone(), port).await; - - let server_url = format!("{}:{}", host, port); - tracing::info!("start ca server: {server_url}"); - let addr = SocketAddr::from_str(&server_url).unwrap(); - let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); - axum::serve(listener, app.into_make_service()) - .await - .unwrap(); -} - -#[derive(Clone)] -pub struct AppState { - pub context: Context, - pub host: String, - pub port: u16, -} - -pub async fn app(config: Config, host: String, port: u16) -> Router { - let state = AppState { - host, - port, - context: Context::new(config.clone()).await, - }; - routers(state) -} - -pub fn routers(state: AppState) -> Router { - Router::new() - .route( - "/*path", - get(get_method_router) - .post(post_method_router) - .delete(delete_method_router), - ) - .layer(ServiceBuilder::new().layer(CorsLayer::new().allow_origin(Any))) - .layer(TraceLayer::new_for_http()) - .layer(RequestDecompressionLayer::new()) - .with_state(state) -} - -async fn get_method_router( - _state: State, - Query(_params): Query, - uri: Uri, -) -> Result, (StatusCode, String)> { - if Regex::new(r"/certificates/[a-zA-Z0-9]+$") - .unwrap() - .is_match(uri.path()) - { - let name = match gemini::ca::get_cert_name_from_path(uri.path()) { - Some(n) => n, - None => { - return gemini::ca::response_error( - StatusCode::BAD_REQUEST.as_u16(), - "Bad request".to_string(), - ) - } - }; - return gemini::ca::get_certificate(name).await; - } - Err(( - StatusCode::NOT_FOUND, - String::from("Operation not supported\n"), - )) -} - -async fn post_method_router( - _state: State, - uri: Uri, - req: Request, -) -> Result, (StatusCode, String)> { - if Regex::new(r"/certificates/[a-zA-Z0-9]+$") - .unwrap() - .is_match(uri.path()) - { - let name = match gemini::ca::get_cert_name_from_path(uri.path()) { - Some(n) => n, - None => { - return gemini::ca::response_error( - StatusCode::BAD_REQUEST.as_u16(), - "Bad request".to_string(), - ) - } - }; - return gemini::ca::issue_certificate(name).await; - } else if Regex::new(r"/sign/hub/[a-zA-Z0-9]+$") - .unwrap() - .is_match(uri.path()) - { - let name = match gemini::ca::get_hub_name_from_path(uri.path()) { - Some(n) => n, - None => { - return gemini::ca::response_error( - StatusCode::BAD_REQUEST.as_u16(), - "Bad request".to_string(), - ) - } - }; - let bytes = to_bytes(req.into_body(), usize::MAX).await.unwrap(); - let pubkey = String::from_utf8(bytes.to_vec()).unwrap(); - return gemini::ca::sign_certificate(name, pubkey).await; - } - Err(( - StatusCode::NOT_FOUND, - String::from("Operation not supported\n"), - )) -} - -async fn delete_method_router( - _state: State, - uri: Uri, - _req: Request, -) -> Result, (StatusCode, String)> { - if Regex::new(r"/certificates/[a-zA-Z0-9]+$") - .unwrap() - .is_match(uri.path()) - { - return gemini::ca::delete_certificate(uri.path()).await; - } - Err(( - StatusCode::NOT_FOUND, - String::from("Operation not supported\n"), - )) -} - -#[cfg(test)] -mod tests {} diff --git a/aries/src/service/mod.rs b/aries/src/service/mod.rs index a33daed5..ec6bc3dc 100644 --- a/aries/src/service/mod.rs +++ b/aries/src/service/mod.rs @@ -3,7 +3,6 @@ use gemini::nostr::client_message::Filter; use serde::{Deserialize, Serialize}; pub mod api; -pub mod ca_server; pub mod relay_server; #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/aries/src/service/relay_server.rs b/aries/src/service/relay_server.rs index 0b9e1d8c..39886ca0 100644 --- a/aries/src/service/relay_server.rs +++ b/aries/src/service/relay_server.rs @@ -23,9 +23,6 @@ pub struct RelayOptions { #[arg(long, default_value_t = 8001)] pub relay_port: u16, - #[arg(long, default_value_t = 9999)] - pub ca_port: u16, - #[arg(long, short)] pub config: Option, } diff --git a/cert.der b/cert.der deleted file mode 100644 index 1d003571265b59c9bc149b8928634206445da2e1..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 355 zcmXqLVvILvVq{ss%*4pVB;uJgC2wm9BilLedmC-I_CAkdu9h?4V&l+i^EhYA!pvl# zXee(W%f=ka!ptL9l$@TLr%;@llcrFdnVy%LqL7?gRAQhY&TDRJU|k4&7@^iPGqN)~F|b&!wt3xGcVpg{*O%06qzhK9Z`!a+pt#g-i|m3A zyb7_EhYxCKpF10{dqLV1@%hyqJ)0lQ*=tZSW3t%f?O#v2edSmzVIXE80`!8cAPcVn zcN1q$esW??Mt*S#vPYRc84TQ*6d8^lTd!U4@yF8*8t=Sw_i#9E*6zTme}CGFsaKu7T`Tg1kFJmd08s;WY5)KL diff --git a/gateway/src/api/nostr_router.rs b/gateway/src/api/nostr_router.rs index 0d3d42b0..e7104201 100644 --- a/gateway/src/api/nostr_router.rs +++ b/gateway/src/api/nostr_router.rs @@ -19,7 +19,6 @@ use crate::api::MegaApiServiceState; pub fn routers() -> Router { Router::new() .route("/nostr", post(recieve)) - .route("/nostr/quic/send_event", post(send_quic)) .route("/nostr/send_event", post(send)) .route("/nostr/event_list", get(event_list)) } @@ -134,7 +133,9 @@ async fn send( } }; - let git_event = git_event_req.to_git_event(identifier, git_ref.ref_git_id).await; + let git_event = git_event_req + .to_git_event(identifier, git_ref.ref_git_id) + .await; match git_event.sent_to_relay(bootstrap_node.clone()).await { Ok(_) => { @@ -151,35 +152,6 @@ async fn send( Ok(Json(CommonResult::success(None))) } -async fn send_quic( - state: State, - body: String, -) -> Result>, (StatusCode, String)> { - let bootstrap_node = match state.p2p.bootstrap_node.clone() { - Some(b) => b, - None => { - return Err(( - StatusCode::INTERNAL_SERVER_ERROR, - "bootstrap node is not set".to_string(), - )); - } - }; - - match gemini::p2p::client::send( - "1".to_string(), - "nostr_event".to_string(), - "hello".as_bytes().to_vec(), - bootstrap_node, - ) - .await - { - Ok(_) => {} - Err(_) => {} - } - - Ok(Json(CommonResult::success(None))) -} - pub async fn event_list( Query(_query): Query>, state: State, diff --git a/gateway/src/https_server.rs b/gateway/src/https_server.rs index 9fd526e5..6e478e9b 100644 --- a/gateway/src/https_server.rs +++ b/gateway/src/https_server.rs @@ -1,27 +1,24 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; -use std::{thread, time}; use axum::routing::get; use axum::{http, Router}; use axum_server::tls_rustls::RustlsConfig; use clap::Args; -use gemini::cache::cache_public_repo_and_lfs; use tower::ServiceBuilder; use tower_http::cors::{Any, CorsLayer}; use tower_http::decompression::RequestDecompressionLayer; use tower_http::trace::TraceLayer; use common::model::{CommonOptions, P2pOptions}; -use gemini::ztm::agent::{run_ztm_client, LocalZTMAgent}; use jupiter::context::Context; use mono::api::lfs::lfs_router; use mono::api::MonoApiServiceState; use mono::server::https_server::{get_method_router, post_method_router, AppState}; -use crate::api::{github_router, nostr_router, ztm_router, MegaApiServiceState}; +use crate::api::{github_router, nostr_router, MegaApiServiceState}; #[derive(Args, Clone, Debug)] pub struct HttpOptions { diff --git a/gemini/Cargo.toml b/gemini/Cargo.toml index 55e74792..574eaaa1 100644 --- a/gemini/Cargo.toml +++ b/gemini/Cargo.toml @@ -31,6 +31,6 @@ async-trait = { workspace = true } quinn = "0.11" anyhow = { workspace = true } lazy_static = { workspace = true } -rcgen = "0.13" +rcgen = {version="0.13",features=["x509-parser","pem", "crypto"]} dashmap = "6.0.1" -uuid = { workspace = true } \ No newline at end of file +uuid = { workspace = true } diff --git a/gemini/src/ca/client.rs b/gemini/src/ca/client.rs new file mode 100644 index 00000000..c818b81a --- /dev/null +++ b/gemini/src/ca/client.rs @@ -0,0 +1,74 @@ +use anyhow::Result; +use anyhow::{anyhow, Ok}; +use rcgen::{CertificateParams, KeyPair}; +use reqwest::Client; +use vault::get_peerid; + +use super::{get_from_vault, save_to_vault}; + +static USER_KEY: &str = "user_key"; + +pub async fn get_user_key() -> String { + match get_from_vault(USER_KEY.to_string()).await { + Some(key) => key, + None => { + let user_key = KeyPair::generate().unwrap(); + save_to_vault(USER_KEY.to_string(), user_key.serialize_pem()).await; + user_key.serialize_pem() + } + } +} + +pub async fn get_user_cert_from_ca(ca: String) -> Result { + let name = get_peerid().await; + //request to ca + let url = format!("{ca}/api/v1/ca/certificates/{name}"); + let url = add_http_to_url(url); + let client = Client::new(); + let response = client.get(url.clone()).send().await.unwrap(); + if response.status().is_success() { + //cert exists + return Ok(response.text().await.unwrap()); + } + + let params = CertificateParams::new(vec![name]).unwrap(); + + let key = get_user_key().await; + let key = KeyPair::from_pem(&key).unwrap(); + let user_csr = params.serialize_request(&key).unwrap(); + + //request a new cert + let response = client + .post(url) + .body(user_csr.pem().unwrap()) + .send() + .await + .unwrap(); + if response.status().is_success() { + return Ok(response.text().await.unwrap()); + } + + Err(anyhow!("get user certificate from ca failed")) +} + +pub async fn get_ca_cert_from_ca(ca: String) -> Result { + //request to ca + let url = format!("{ca}/api/v1/ca/certificates/ca"); + let url = add_http_to_url(url); + let client = Client::new(); + let response = client.get(url.clone()).send().await?; + if response.status().is_success() { + return Ok(response.text().await?); + } + + Err(anyhow!("get user certificate from ca failed")) +} + +fn add_http_to_url(url: String) -> String { + if url.starts_with("http://") { + return url; + } + + let url = format!("http://{url}"); + url +} diff --git a/gemini/src/ca/mod.rs b/gemini/src/ca/mod.rs index ed4a7c41..17a80b07 100755 --- a/gemini/src/ca/mod.rs +++ b/gemini/src/ca/mod.rs @@ -1,133 +1,7 @@ -use axum::{body::Body, response::Response}; -use reqwest::StatusCode; -use serde::{Deserialize, Serialize}; use serde_json::json; -#[derive(Deserialize, Serialize, Debug, Clone)] -struct ErrorResult { - status: u16, - #[serde(rename = "message")] - msg: String, -} - -impl ErrorResult { - pub fn to_json_string(&self) -> String { - serde_json::to_string(self).unwrap() - } - pub fn to_json_string_new(status: u16, msg: String) -> String { - let e = ErrorResult { status, msg }; - e.to_json_string() - } -} - -pub async fn get_certificate(name: String) -> Result { - if name == "ca" { - return Ok(Response::builder() - .body(Body::from(vault::pki::get_root_cert().await)) - .unwrap()); - } - let cert_option = get_from_vault(name).await; - match cert_option { - Some(cert) => Ok(Response::builder().body(Body::from(cert)).unwrap()), - None => response_error( - StatusCode::NOT_FOUND.as_u16(), - "Username not found".to_string(), - ), - } -} - -pub async fn issue_certificate(name: String) -> Result { - if is_reserved_name(name.clone()) { - return response_error( - StatusCode::FORBIDDEN.as_u16(), - "Reserved username".to_string(), - ); - } - // let cert_option = get_from_vault(name.clone()); - // if cert_option.is_some() { - // return response_error( - // StatusCode::CONFLICT.as_u16(), - // "Username already exists".to_string(), - // ); - // } - let (cert_pem, private_key) = vault::pki::issue_cert(json!({ - "ttl": "10d", - "common_name": name, - })).await; - //save cert to vault - save_to_vault(name, cert_pem).await; - Ok(Response::builder().body(Body::from(private_key)).unwrap()) -} - -pub async fn sign_certificate( - name: String, - pubkey: String, -) -> Result { - tracing::info!("sign_certificate,name:{name},pubkey:{pubkey}"); - if is_reserved_name(name.clone()) { - return response_error( - StatusCode::FORBIDDEN.as_u16(), - "Reserved username".to_string(), - ); - } - let cert_option = get_from_vault(name.clone()).await; - if cert_option.is_some() { - return response_error( - StatusCode::CONFLICT.as_u16(), - "Username already exists".to_string(), - ); - } - let (cert_pem, private_key) = vault::pki::issue_cert(json!({ - "ttl": "10d", - "common_name": name, - })).await; - //save cert to vault - save_to_vault(name, cert_pem).await; - Ok(Response::builder().body(Body::from(private_key)).unwrap()) -} - -pub async fn delete_certificate(path: &str) -> Result { - let name = match get_cert_name_from_path(path) { - Some(n) => n, - None => return response_error(StatusCode::BAD_REQUEST.as_u16(), "Bad request".to_string()), - }; - if is_reserved_name(name.clone()) { - return response_error( - StatusCode::FORBIDDEN.as_u16(), - "Reserved username".to_string(), - ); - } - delete_to_vault(name); - Ok(Response::builder() - .status(204) - .body(Body::from("")) - .unwrap()) -} - -pub fn get_cert_name_from_path(path: &str) -> Option { - let v: Vec<&str> = path.split('/').collect(); - v.get(3).map(|s| s.to_string()) -} - -pub fn get_hub_name_from_path(path: &str) -> Option { - let v: Vec<&str> = path.split('/').collect(); - v.get(4).map(|s| s.to_string()) -} - -fn is_reserved_name(name: String) -> bool { - if name == "ca" { - return true; - } - is_hub_name(name) -} - -fn is_hub_name(_name: String) -> bool { - // if name == "hub" || name.starts_with("hub/") { - // return true; - // } - // false - false -} +pub mod client; +pub mod server; async fn save_to_vault(key: String, value: String) { let key_f = format!("ca_{key}"); @@ -137,7 +11,9 @@ async fn save_to_vault(key: String, value: String) { .as_object() .unwrap() .clone(); - vault::vault::write_secret(key_f.as_str(), Some(kv_data.clone())).await.unwrap(); + vault::vault::write_secret(key_f.as_str(), Some(kv_data.clone())) + .await + .unwrap(); } async fn get_from_vault(key: String) -> Option { @@ -157,18 +33,44 @@ async fn get_from_vault(key: String) -> Option { } } -fn delete_to_vault(_key: String) { - // let key_f = format!("ca_{key}"); - // vault::vault::write_secret(key_f.as_str(), Some(Map).unwrap()); +async fn _delete_to_vault(key: String) { + let key_f = format!("ca_{key}"); + vault::vault::write_secret(key_f.as_str(), None) + .await + .unwrap(); } -pub fn response_error(status: u16, message: String) -> Result { - Ok({ - let error_result = ErrorResult::to_json_string_new(status, message); - Response::builder() - .status(status) - .header("Content-Type", "application/json") - .body(Body::from(error_result)) - .unwrap() - }) +#[cfg(test)] +mod tests { + + use quinn::rustls::pki_types::{pem::PemObject, CertificateSigningRequestDer}; + use rcgen::{ + generate_simple_self_signed, CertificateParams, CertificateSigningRequestParams, + CertifiedKey, KeyPair, + }; + + #[tokio::test] + async fn self_signed_cert() { + let subject_alt_names = vec!["localhost".to_string()]; + + let CertifiedKey { cert, key_pair } = + generate_simple_self_signed(subject_alt_names).unwrap(); + print!("root_cert:{}", cert.pem()); + + let name = "localhost"; + let params = CertificateParams::new(vec![name.into()]).unwrap(); + + let user_key_pair = KeyPair::generate().unwrap(); + let user_csr = params.serialize_request(&user_key_pair).unwrap(); + + let csrd = CertificateSigningRequestDer::from_pem_slice(user_csr.pem().unwrap().as_bytes()) + .unwrap(); + let csrq = CertificateSigningRequestParams::from_der(&csrd).unwrap(); + let user_cert = csrq.signed_by(&cert, &key_pair).unwrap(); + + // let c = CertificateSigningRequestDer::from_pem_slice(user_csr.pem().unwrap().as_bytes()) + // .unwrap(); + // let user_cert = params.signed_by(&user_key_pair, &cert, &key_pair).unwrap(); + print!("user_cert:{}", user_cert.pem()); + } } diff --git a/gemini/src/ca/server.rs b/gemini/src/ca/server.rs new file mode 100644 index 00000000..46dfb56b --- /dev/null +++ b/gemini/src/ca/server.rs @@ -0,0 +1,103 @@ +use quinn::rustls::pki_types::CertificateSigningRequestDer; +use quinn::rustls::pki_types::{pem::PemObject, CertificateDer, PrivateKeyDer}; +use rcgen::{ + generate_simple_self_signed, CertificateParams, CertificateSigningRequestParams, CertifiedKey, + KeyPair, +}; + +use anyhow::anyhow; +use anyhow::Result; + +use crate::ca::save_to_vault; + +use super::get_from_vault; + +static ROOT_CERT: &str = "root_cert"; +static ROOT_KEY: &str = "root_key"; + +static USER_KEY_PRE: &str = "user_"; + +pub async fn get_root_cert_pem() -> String { + match get_from_vault(ROOT_CERT.to_string()).await { + Some(cert) => cert, + None => init_self_signed_cert().await.0, + } +} + +pub async fn get_root_cert_der() -> CertificateDer<'static> { + let cert = get_root_cert_pem().await; + let cert = CertificateDer::from_pem_slice(cert.as_bytes()).unwrap(); + cert +} + +pub async fn get_root_key_pem() -> String { + match get_from_vault(ROOT_KEY.to_string()).await { + Some(key) => key, + None => init_self_signed_cert().await.1, + } +} + +pub async fn get_root_key_der() -> PrivateKeyDer<'static> { + let key = get_root_key_pem().await; + let key = PrivateKeyDer::from_pem_slice(key.as_bytes()).unwrap(); + key +} + +async fn init_self_signed_cert() -> (String, String) { + let subject_alt_names = vec!["localhost".to_string()]; + + let CertifiedKey { cert, key_pair } = generate_simple_self_signed(subject_alt_names).unwrap(); + save_to_vault(ROOT_CERT.to_string(), cert.pem()).await; + save_to_vault(ROOT_KEY.to_string(), key_pair.serialize_pem()).await; + (cert.pem(), key_pair.serialize_pem()) +} + +pub async fn get_certificate(name: String) -> Result { + if name == "ca" { + return Ok(get_root_cert_pem().await); + } + + let cert_option = get_from_vault(add_user_key_pre(name)).await; + match cert_option { + Some(cert) => Ok(cert), + None => Err(anyhow!("Username not found")), + } +} + +pub async fn issue_certificate(name: String, csr: String) -> Result { + tracing::info!("sign_certificate, name:{name},csr:{csr}"); + let ca_key = KeyPair::from_pem(get_root_key_pem().await.as_str()).unwrap(); + let params = CertificateParams::from_ca_cert_pem(get_root_cert_pem().await.as_str()).unwrap(); + let ca_cert = params.self_signed(&ca_key).unwrap(); + + let csrd = match CertificateSigningRequestDer::from_pem_slice(csr.as_bytes()) { + Ok(csrd) => csrd, + Err(e) => return Err(anyhow!(e.to_string())), + }; + let csrq = CertificateSigningRequestParams::from_der(&csrd).unwrap(); + let user_cert = csrq.signed_by(&ca_cert, &ca_key).unwrap(); + + save_to_vault(add_user_key_pre(name), user_cert.pem()).await; + Ok(user_cert.pem()) +} + +fn _is_reserved_key(name: String) -> bool { + if [ROOT_CERT.to_string(), ROOT_KEY.to_string()].contains(&name) { + return true; + } + false +} + +fn add_user_key_pre(name: String) -> String { + format!("{}{}", USER_KEY_PRE, name) +} + +pub fn get_cert_name_from_path(path: &str) -> Option { + let v: Vec<&str> = path.split('/').collect(); + v.get(3).map(|s| s.to_string()) +} + +pub fn get_hub_name_from_path(path: &str) -> Option { + let v: Vec<&str> = path.split('/').collect(); + v.get(4).map(|s| s.to_string()) +} diff --git a/gemini/src/p2p/client.rs b/gemini/src/p2p/client.rs index 1564ae88..9216fb57 100644 --- a/gemini/src/p2p/client.rs +++ b/gemini/src/p2p/client.rs @@ -4,26 +4,36 @@ use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; -use anyhow::{bail, Context, Result}; +use anyhow::Ok; +use anyhow::Result; use quinn::crypto::rustls::QuicClientConfig; -use quinn::{rustls, ClientConfig, Connection, Endpoint}; +use quinn::rustls::pki_types::pem::PemObject; +use quinn::rustls::pki_types::CertificateDer; +use quinn::rustls::pki_types::PrivateKeyDer; +use quinn::{rustls, ClientConfig, Endpoint}; use tokio::sync::mpsc; use tracing::info; use uuid::Uuid; use vault::get_peerid; +use crate::ca; use crate::p2p::relay::{ReceiveData, SenderData}; use crate::p2p::Action; -use super::{get_certificate, ALPN_QUIC_HTTP}; +use super::ALPN_QUIC_HTTP; pub async fn run(bootstrap_node: String) -> Result<()> { - info!("Start"); rustls::crypto::ring::default_provider() .install_default() .expect("Failed to install rustls crypto provider"); - let connection = get_client_connection(bootstrap_node).await?; + let endpoint = get_client_endpoint(bootstrap_node.clone()).await?; + + let server_addr: SocketAddr = bootstrap_node.parse()?; + let connection = endpoint + .connect(server_addr, "localhost")? + .await + .map_err(|e| anyhow!("failed to connect: {}", e))?; let remote_address = connection.remote_address(); let stable_id = connection.stable_id(); @@ -34,7 +44,7 @@ pub async fn run(bootstrap_node: String) -> Result<()> { let (tx, mut rx) = mpsc::channel(8); - let peer_id = vault::get_peerid(); + let peer_id = vault::get_peerid().await; tokio::spawn(async move { loop { @@ -50,7 +60,7 @@ pub async fn run(bootstrap_node: String) -> Result<()> { let json = serde_json::to_string(&ping).unwrap(); quic_send.write_all(json.as_ref()).await.unwrap(); quic_send.finish().unwrap(); - tokio::time::sleep(Duration::from_secs(20)).await; + tokio::time::sleep(Duration::from_secs(60)).await; } }); @@ -59,7 +69,7 @@ pub async fn run(bootstrap_node: String) -> Result<()> { loop { let (_, mut quic_recv) = connection_clone.accept_bi().await.unwrap(); let buffer = quic_recv.read_to_end(1024 * 1024).await.unwrap(); - info!("QUIC Received:\n{}", String::from_utf8_lossy(&*buffer)); + info!("QUIC Received:\n{}", String::from_utf8_lossy(&buffer)); if tx.send(buffer).await.is_err() { info!("Receiver closed"); return; @@ -78,30 +88,25 @@ pub async fn run(bootstrap_node: String) -> Result<()> { Ok(()) } -pub async fn get_client_connection(bootstrap_node: String) -> anyhow::Result { - let (certs, _key) = get_certificate().await?; +pub async fn get_client_endpoint(bootstrap_node: String) -> anyhow::Result { + let (user_cert, user_key) = get_user_cert_from_ca(bootstrap_node.clone()).await?; + let ca_cert = get_ca_cert_from_ca(bootstrap_node.clone()).await?; let mut roots = rustls::RootCertStore::empty(); - for ele in certs { - roots.add(ele)?; - } + roots.add(ca_cert).unwrap(); let mut client_crypto = rustls::ClientConfig::builder() .with_root_certificates(roots) - .with_no_client_auth(); - + .with_client_auth_cert([user_cert].to_vec(), user_key) + .unwrap(); client_crypto.alpn_protocols = ALPN_QUIC_HTTP.iter().map(|&x| x.into()).collect(); + client_crypto.enable_early_data = true; let client_config = ClientConfig::new(Arc::new(QuicClientConfig::try_from(client_crypto)?)); let mut endpoint = Endpoint::client(SocketAddr::from_str("127.0.0.1:0").unwrap())?; endpoint.set_default_client_config(client_config); - let server_addr: SocketAddr = bootstrap_node.parse()?; - let conn = endpoint - .connect(server_addr, "localhost")? - .await - .map_err(|e| anyhow!("failed to connect: {}", e))?; - Ok(conn) + Ok(endpoint) } pub async fn send( @@ -109,11 +114,16 @@ pub async fn send( func: String, data: Vec, bootstrap_node: String, -) -> anyhow::Result> { +) -> Result> { let (tx, rx) = tokio::sync::oneshot::channel(); - // 建立 QUIC 连接 - let connection = get_client_connection(bootstrap_node).await?; + let endpoint = get_client_endpoint(bootstrap_node.clone()).await?; + + let server_addr: SocketAddr = bootstrap_node.parse()?; + let connection = endpoint + .connect(server_addr, "localhost")? + .await + .map_err(|e| anyhow!("failed to connect: {}", e))?; let remote_address = connection.remote_address(); let stable_id = connection.stable_id(); @@ -121,7 +131,7 @@ pub async fn send( let connection = Arc::new(connection); let connection_clone = connection.clone(); - let local_peer_id = get_peerid(); + let local_peer_id = get_peerid().await; tokio::spawn(async move { let (mut sender, _) = connection_clone.open_bi().await.unwrap(); let send = ReceiveData { @@ -141,30 +151,33 @@ pub async fn send( tokio::spawn(async move { let (_, mut quic_recv) = connection_clone.accept_bi().await.unwrap(); - // 等待接收一个新的双向流 let buffer = quic_recv.read_to_end(1024 * 1024).await.unwrap(); - println!("QUIC Received:\n{}", String::from_utf8_lossy(&*buffer)); + info!("QUIC Received:\n{}", String::from_utf8_lossy(&buffer)); if tx.send(buffer).is_err() { - println!("Receiver closed"); - return; + info!("Receiver closed"); } }); - let message = match rx.await { - Ok(r) => r, - Err(e) => { - return Err(anyhow!("QUIC Received Error:\n{:?}", e)); - } - }; - println!( + let message = rx.await?; + info!( "Channel Received message: {}", String::from_utf8_lossy(&message) ); - let data: SenderData = match serde_json::from_slice(&*message) { - Ok(data) => data, - Err(e) => { - eprintln!("QUIC Received Error:\n{:?}", e); - return Err(anyhow!("QUIC Received Error:\n{:?}", e)); - } - }; - return Ok(data.data); + let data: SenderData = serde_json::from_slice(&message)?; + Ok(data.data) +} + +pub async fn get_user_cert_from_ca( + ca: String, +) -> Result<(CertificateDer<'static>, PrivateKeyDer<'static>)> { + let cert = ca::client::get_user_cert_from_ca(ca).await?; + let cert = CertificateDer::from_pem_slice(cert.as_bytes())?; + let key = ca::client::get_user_key().await; + let key = PrivateKeyDer::from_pem_slice(key.as_bytes())?; + Ok((cert, key)) +} + +pub async fn get_ca_cert_from_ca(ca: String) -> Result> { + let cert = ca::client::get_ca_cert_from_ca(ca).await?; + let cert = CertificateDer::from_pem_slice(cert.as_bytes())?; + Ok(cert) } diff --git a/gemini/src/p2p/mod.rs b/gemini/src/p2p/mod.rs index 1a92246e..869b9ddf 100644 --- a/gemini/src/p2p/mod.rs +++ b/gemini/src/p2p/mod.rs @@ -1,14 +1,11 @@ -use std::{fmt, fs, io}; +use std::fmt; -use anyhow::{bail, Context}; -use quinn::rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; use serde::{Deserialize, Serialize}; -use tracing::info; pub mod client; pub mod relay; -pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"]; +pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"h3"]; #[derive(Serialize, Deserialize, Debug)] pub enum Action { @@ -36,33 +33,3 @@ impl fmt::Display for Action { } } } - -pub async fn get_certificate( -) -> anyhow::Result<(Vec>, PrivateKeyDer<'static>)> { - let (certs, key) = { - let cert_path = "cert.der"; - let key_path = "key.der"; - let (cert, key) = match fs::read(&cert_path).and_then(|x| Ok((x, fs::read(&key_path)?))) { - Ok((cert, key)) => ( - CertificateDer::from(cert), - PrivateKeyDer::try_from(key).map_err(anyhow::Error::msg)?, - ), - Err(ref e) if e.kind() == io::ErrorKind::NotFound => { - info!("generating self-signed certificate"); - let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap(); - let key = PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()); - let cert = cert.cert.into(); - fs::write(&cert_path, &cert).context("failed to write certificate")?; - fs::write(&key_path, key.secret_pkcs8_der()) - .context("failed to write private key")?; - (cert, key.into()) - } - Err(e) => { - bail!("failed to read certificate: {}", e); - } - }; - - (vec![cert], key) - }; - Ok((certs, key)) -} diff --git a/gemini/src/p2p/relay.rs b/gemini/src/p2p/relay.rs index 49f06e5e..75a7dab3 100644 --- a/gemini/src/p2p/relay.rs +++ b/gemini/src/p2p/relay.rs @@ -1,20 +1,22 @@ use anyhow::anyhow; -use anyhow::{bail, Context, Result}; +use anyhow::Result; use dashmap::DashMap; use lazy_static::lazy_static; +use quinn::rustls::pki_types::{CertificateDer, PrivateKeyDer}; +use quinn::rustls::server::WebPkiClientVerifier; use quinn::{ crypto::rustls::QuicServerConfig, - rustls::{ - self, - pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}, - }, + rustls::{self}, RecvStream, SendStream, }; +use quinn::{IdleTimeout, ServerConfig, TransportConfig, VarInt}; use serde::{Deserialize, Serialize}; -use std::{fs, io, net::SocketAddr, str::FromStr, sync::Arc}; +use std::time::Duration; +use std::{net::SocketAddr, str::FromStr, sync::Arc}; use tracing::{error, info, info_span, Instrument}; -use crate::p2p::{get_certificate, ALPN_QUIC_HTTP}; +use crate::ca; +use crate::p2p::ALPN_QUIC_HTTP; use super::Action; @@ -44,24 +46,11 @@ pub struct SenderData { } pub async fn run(host: String, port: u16) -> Result<()> { - rustls::crypto::ring::default_provider() - .install_default() - .expect("Failed to install rustls crypto provider"); - - let (certs, key) = get_certificate().await?; - - let mut server_crypto = rustls::ServerConfig::builder() - .with_no_client_auth() - .with_single_cert(certs, key)?; - server_crypto.alpn_protocols = ALPN_QUIC_HTTP.iter().map(|&x| x.into()).collect(); - - let server_config = - quinn::ServerConfig::with_crypto(Arc::new(QuicServerConfig::try_from(server_crypto)?)); - + let server_config = get_server_config().await?; let addr = format!("{}:{}", host, port); let endpoint = quinn::Endpoint::server(server_config, SocketAddr::from_str(addr.as_str()).unwrap())?; - info!("listening on {}", endpoint.local_addr()?); + info!("Quic server listening on udp {}", endpoint.local_addr()?); while let Some(conn) = endpoint.accept().await { { @@ -78,6 +67,39 @@ pub async fn run(host: String, port: u16) -> Result<()> { Ok(()) } +pub async fn get_server_config() -> Result { + rustls::crypto::ring::default_provider() + .install_default() + .expect("Failed to install rustls crypto provider"); + + let (certs, key) = get_root_certificate_from_vault().await?; + + let mut roots = rustls::RootCertStore::empty(); + for c in certs.clone() { + roots.add(c)?; + } + + let client_verifier = WebPkiClientVerifier::builder(roots.into()) + .build() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + + let mut server_crypto = rustls::ServerConfig::builder() + .with_client_cert_verifier(client_verifier) + .with_single_cert(certs, key)?; + server_crypto.alpn_protocols = ALPN_QUIC_HTTP.iter().map(|&x| x.into()).collect(); + server_crypto.max_early_data_size = u32::MAX; + + let mut server_config = + quinn::ServerConfig::with_crypto(Arc::new(QuicServerConfig::try_from(server_crypto)?)); + + let mut transport_config = TransportConfig::default(); + transport_config.max_idle_timeout(Some(IdleTimeout::from(VarInt::from_u32(300_000)))); + transport_config.keep_alive_interval(Some(Duration::from_secs(15))); + server_config.transport_config(transport_config.into()); + + Ok(server_config) +} + async fn handle_connection(conn: quinn::Incoming) -> Result<()> { let connection = conn.await?; let span = info_span!( @@ -139,12 +161,12 @@ async fn handle_receive( ) -> anyhow::Result<()> { let buffer_vec = recv.read_to_end(1024 * 10).await?; if buffer_vec.is_empty() { - println!("QUIC Received is empty"); + error!("QUIC Received is empty"); return Ok(()); } - let result = String::from_utf8_lossy(&*buffer_vec); + let result = String::from_utf8_lossy(&buffer_vec); - let data: ReceiveData = match serde_json::from_str(&*result) { + let data: ReceiveData = match serde_json::from_str(&result) { Ok(data) => data, Err(e) => { error!("QUIC Received Error:\n{:?}", e); @@ -168,7 +190,7 @@ async fn handle_receive( }; let json = serde_json::to_string(&sender_data)?; let (mut quic_send, _) = connection.clone().open_bi().await?; - quic_send.write_all(&json.as_bytes()).await?; + quic_send.write_all(json.as_bytes()).await?; quic_send.finish()?; } Action::Send => { @@ -190,7 +212,7 @@ async fn handle_receive( }; let json = serde_json::to_string(&sender_data)?; let (mut send, _) = connection.open_bi().await?; - send.write_all(&json.as_bytes()).await?; + send.write_all(json.as_bytes()).await?; send.finish()?; } Action::Call => { @@ -212,7 +234,7 @@ async fn handle_receive( }; let json = serde_json::to_string(&sender_data)?; let (mut send, _) = connection_to.open_bi().await?; - send.write_all(&json.as_bytes()).await?; + send.write_all(json.as_bytes()).await?; send.finish()?; } let from_connection = connection; @@ -238,7 +260,7 @@ async fn handle_receive( }; let json = serde_json::to_string(&sender_data)?; let (mut send, _) = connection.open_bi().await?; - send.write_all(&json.as_bytes()).await?; + send.write_all(json.as_bytes()).await?; send.finish()?; } REQ_ID_MAP.remove(data.req_id.as_str()); @@ -255,3 +277,12 @@ async fn handle_receive( Ok(()) } + +///Relay +pub async fn get_root_certificate_from_vault( +) -> anyhow::Result<(Vec>, PrivateKeyDer<'static>)> { + let cert = ca::server::get_root_cert_der().await; + let key = ca::server::get_root_key_der().await; + + Ok((vec![cert], key)) +} diff --git a/key.der b/key.der deleted file mode 100644 index ec58012813300300b5b1e8f569843a1de14d35d2..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 138 zcmV;50CoQ`frkPC05B5<2P%e0&OHJF1_&yKNX|V20S5$aFlzz<0R$jzepd$=ydslP zu8SG+_rRyTDK+OH!5kr7=M6k|*17+oL<2$q1Uaib>yCcdp7iU|COjK(s;`2ux)5`9 sKC~U6@C_hcdBeddE6&YOyP#&27oU5GjI-dJy)bx~lNOV=^~px{2mtXp3jhEB From 6795f785b72b47bb3859c026d063c287dc4627d6 Mon Sep 17 00:00:00 2001 From: wujian0327 <353981613@qq.com> Date: Mon, 24 Feb 2025 11:24:44 +0800 Subject: [PATCH 4/6] feature: add ca service for p2p --- gateway/src/https_server.rs | 23 +++-------------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/gateway/src/https_server.rs b/gateway/src/https_server.rs index f74cc763..0f86ccbc 100644 --- a/gateway/src/https_server.rs +++ b/gateway/src/https_server.rs @@ -61,13 +61,7 @@ pub async fn https_server(context: Context, options: HttpsOptions) { check_run_with_p2p(context.clone(), options.p2p.clone()); - let app = app( - context, - host.clone(), - https_port, - ztm.clone(), - ) - .await; + let app = app(context, host.clone(), https_port, p2p.clone()).await; let server_url = format!("{}:{}", host, https_port); let addr = SocketAddr::from_str(&server_url).unwrap(); @@ -89,13 +83,7 @@ pub async fn http_server(context: Context, options: HttpOptions) { check_run_with_p2p(context.clone(), options.p2p.clone()); - let app = app( - context, - host.clone(), - http_port, - ztm.clone(), - ) - .await; + let app = app(context, host.clone(), http_port, p2p.clone()).await; let server_url = format!("{}:{}", host, http_port); @@ -106,12 +94,7 @@ pub async fn http_server(context: Context, options: HttpOptions) { .unwrap(); } -pub async fn app( - context: Context, - host: String, - port: u16, - ztm: ZtmOptions, -) -> Router { +pub async fn app(context: Context, host: String, port: u16, p2p: P2pOptions) -> Router { let state = AppState { host, port, From 3f605f82e5c236a052931152db9ac5bd0f51890c Mon Sep 17 00:00:00 2001 From: jian_wu <353981613@qq.com> Date: Wed, 26 Feb 2025 12:11:40 +0800 Subject: [PATCH 5/6] Update gemini/src/ca/client.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- gemini/src/ca/client.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/gemini/src/ca/client.rs b/gemini/src/ca/client.rs index c818b81a..3f7f9660 100644 --- a/gemini/src/ca/client.rs +++ b/gemini/src/ca/client.rs @@ -25,18 +25,17 @@ pub async fn get_user_cert_from_ca(ca: String) -> Result { let url = format!("{ca}/api/v1/ca/certificates/{name}"); let url = add_http_to_url(url); let client = Client::new(); - let response = client.get(url.clone()).send().await.unwrap(); + let response = client.get(url.clone()).send().await?; if response.status().is_success() { //cert exists - return Ok(response.text().await.unwrap()); + return Ok(response.text().await?); } - let params = CertificateParams::new(vec![name]).unwrap(); + let params = CertificateParams::new(vec![name])?; let key = get_user_key().await; - let key = KeyPair::from_pem(&key).unwrap(); - let user_csr = params.serialize_request(&key).unwrap(); - + let key = KeyPair::from_pem(&key)?; + let user_csr = params.serialize_request(&key)?; //request a new cert let response = client .post(url) From e51f7462692c43d41c65b9ec8257d58943bfd111 Mon Sep 17 00:00:00 2001 From: jian_wu <353981613@qq.com> Date: Wed, 26 Feb 2025 12:11:59 +0800 Subject: [PATCH 6/6] Update gemini/src/ca/client.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- gemini/src/ca/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gemini/src/ca/client.rs b/gemini/src/ca/client.rs index 3f7f9660..f4f828b6 100644 --- a/gemini/src/ca/client.rs +++ b/gemini/src/ca/client.rs @@ -64,7 +64,7 @@ pub async fn get_ca_cert_from_ca(ca: String) -> Result { } fn add_http_to_url(url: String) -> String { - if url.starts_with("http://") { + if url.starts_with("http://") || url.starts_with("https://") { return url; }