diff --git a/forward-proxy/Cargo.toml b/forward-proxy/Cargo.toml index f627d8a..74ad7e4 100644 --- a/forward-proxy/Cargo.toml +++ b/forward-proxy/Cargo.toml @@ -22,3 +22,5 @@ once_cell = "1.21.3" pingora-error = "0.5.0" boring = "4.17.0" env_logger = "0.11.8" +utils = { path = "../utils" } +hex = "0.4.3" \ No newline at end of file diff --git a/forward-proxy/src/handler/consts.rs b/forward-proxy/src/handler/consts.rs index 8c50516..392260e 100644 --- a/forward-proxy/src/handler/consts.rs +++ b/forward-proxy/src/handler/consts.rs @@ -23,7 +23,20 @@ impl ForwardHeaderKeys { } const LAYER8_URL: &str = "http://127.0.0.1:5001"; -const RP_URL: &str = "http://127.0.0.1:6193"; +const RP_URL: &str = "http://127.0.0.1:6194"; + pub static RP_INIT_ENCRYPTED_TUNNEL_PATH: Lazy = Lazy::new(|| format!("{}/init-tunnel", RP_URL)); pub static RP_PROXY_PATH: Lazy = Lazy::new(|| format!("{}/proxy", RP_URL)); pub static LAYER8_GET_CERTIFICATE_PATH: Lazy = Lazy::new(|| format!("{}/sp-pub-key?backend_url=", LAYER8_URL)); + +pub const NTOR_SERVER_ID: &str = "ntor_server_id"; +pub const NTOR_SERVER_ID_TMP_VALUE: &str = "ReverseProxyServer"; +pub const NTOR_STATIC_PUBLIC_KEY: &str = "ntor_static_public_key"; +pub const NTOR_STATIC_PUBLIC_KEY_TMP_VALUE: [u8; 32] = [ + 131, 210, 36, 101, 39, 191, 61, 165, 29, 112, 94, 149, 120, 202, 189, 170, + 151, 62, 247, 71, 208, 255, 144, 173, 52, 223, 239, 221, 153, 225, 40, 10 +]; + +pub const INIT_TUNNEL_ENDPOINT: &str = "/init-tunnel"; +pub const PROXY_ENDPOINT: &str = "/proxy"; +pub const HEALTHCHECK_ENDPOINT: &str = "/healthcheck"; \ No newline at end of file diff --git a/forward-proxy/src/handler/helpers.rs b/forward-proxy/src/handler/helpers.rs new file mode 100644 index 0000000..1df5dd9 --- /dev/null +++ b/forward-proxy/src/handler/helpers.rs @@ -0,0 +1,28 @@ +use serde::Serialize; +use chrono::Utc; +use jsonwebtoken::{encode, EncodingKey, Header}; + +// Get SECRET_KEY from environment variable +pub fn get_secret_key() -> String { + std::env::var("JWT_SECRET_KEY").expect("JWT_SECRET_KEY must be set") +} + +#[derive(Serialize)] +struct Claims { + exp: usize, +} + +pub fn generate_standard_token(secret_key: &str) -> pingora::Result> { + let now = Utc::now(); + let claims = Claims { + exp: (now + chrono::Duration::days(1)).timestamp() as usize, + }; + + let token = encode( + &Header::new(jsonwebtoken::Algorithm::HS256), + &claims, + &EncodingKey::from_secret(secret_key.as_bytes()), + )?; + + Ok(token) +} diff --git a/forward-proxy/src/handler/mod.rs b/forward-proxy/src/handler/mod.rs index d31a471..e07b55f 100644 --- a/forward-proxy/src/handler/mod.rs +++ b/forward-proxy/src/handler/mod.rs @@ -1,40 +1,39 @@ -use log::{debug, error, info}; +use log::{error, info}; use pingora::http::StatusCode; -use reqwest::{Client, Response}; +use reqwest::{Client}; use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; use pingora_router::handler::{APIHandlerResponse, DefaultHandlerTrait, RequestBodyTrait}; -use crate::handler::types::response::{ErrorResponse, FpHealthcheckError, FpHealthcheckSuccess, InitEncryptedTunnelResponse, ProxyResponse}; +use crate::handler::types::response::{ErrorResponse, FpHealthcheckError, FpHealthcheckSuccess, InitTunnelResponseFromRP, InitTunnelResponseToINT}; use pingora_router::handler::ResponseBodyTrait; -use reqwest::header::HeaderMap; -use crate::handler::consts::ForwardHeaderKeys::{FpHeaderRequestKey, FpHeaderResponseKey}; -use crate::handler::consts::{LAYER8_GET_CERTIFICATE_PATH, RP_INIT_ENCRYPTED_TUNNEL_PATH, RP_PROXY_PATH}; -use crate::handler::types::request::{InitEncryptedTunnelRequest, ProxyRequest}; +use crate::handler::types::request::{InitTunnelRequest}; +use utils; pub mod types; -mod utils; +mod helpers; mod consts; -pub struct ForwardHandler { - // add later -} +pub struct ForwardHandler {} impl DefaultHandlerTrait for ForwardHandler {} -type NTorPublicKey = Vec; +struct NTorServerCertificate { + server_id: String, + public_key: Vec, +} impl ForwardHandler { async fn get_public_key( &self, backend_url: String, - ctx: &mut Layer8Context - ) -> Result + ctx: &mut Layer8Context, + ) -> Result { - let secret_key = utils::get_secret_key(); - let token = utils::generate_standard_token(&secret_key).unwrap(); + let secret_key = helpers::get_secret_key(); + let token = self::helpers::generate_standard_token(&secret_key).unwrap(); let client = Client::new(); return match client - .get(format!("{}{}", LAYER8_GET_CERTIFICATE_PATH.as_str(), backend_url)) + .get(format!("{}{}", consts::LAYER8_GET_CERTIFICATE_PATH.as_str(), backend_url)) .header("Authorization", format!("Bearer {}", token)) .send() .await @@ -54,7 +53,10 @@ impl ForwardHandler { }) } else { // todo extract public key from response - Ok(vec![]) + Ok(NTorServerCertificate { + server_id: "".to_string(), + public_key: vec![], + }) } } Err(e) => { @@ -70,149 +72,11 @@ impl ForwardHandler { }; } - /// Add response headers to `ctx` to respond to Interceptor: - /// - *Copy* ReverseProxy's response header in `headers` - /// - *Add* custom ForwardProxy's response headers `custom_header` - fn create_response_headers( - headers: HeaderMap, - ctx: &mut Layer8Context, - custom_header: &str - ) { - for (key, val) in headers.iter() { - if let (k, Ok(v)) = (key.to_string(), val.to_str()) { - ctx.insert_response_header(k.as_str(), v); - } - } - - ctx.insert_response_header( - FpHeaderResponseKey.as_str(), - custom_header - ) - } - - /// Create request header to send/forward to ReverseProxy: - /// - *Copy* origin request headers from Interceptor `ctx` - /// - *Add* custom ForwardProxy's request headers `custom_header` - /// - *Set* universal Content-Type and Content-Length - fn create_forward_request_headers( - ctx: &mut Layer8Context, - custom_header: &str, - content_length: usize - ) -> HeaderMap { - // copy all origin header to new request - let origin_headers = ctx.get_request_header().clone(); - let mut reqwest_header = utils::to_reqwest_header(origin_headers); - - // add forward proxy header `fp_request_header` - reqwest_header.insert( - FpHeaderRequestKey.as_str(), - custom_header.parse().unwrap(), - ); - - reqwest_header.insert("Content-Length", content_length.to_string().parse().unwrap()); - reqwest_header.insert("Content-Type", "application/json".parse().unwrap()); - - reqwest_header - } - - /// forward manipulated `init-encrypted-tunnel` request to ReverseProxy and handle success response - async fn init_tunnel_forward_to_rp( - ctx: &mut Layer8Context, - headers: HeaderMap, - body: Vec, - ) -> APIHandlerResponse - { - let body_string = utils::bytes_to_string(&body); - let log_meta = format!("[FORWARD {}]", RP_INIT_ENCRYPTED_TUNNEL_PATH.as_str()); - info!("{log_meta} request headers to RP: {:?}", headers); - info!("{log_meta} request body to RP: {:?}", body_string); - - let client = Client::new(); - let response = client.post(RP_INIT_ENCRYPTED_TUNNEL_PATH.as_str()) - .headers(headers) - .body(body_string) - .send() - .await; - - match response { - Ok(res) if res.status().is_success() => { - let headers = res.headers().clone(); - let rp_response_body = res.bytes().await.unwrap_or_default(); - info!("{log_meta} response headers from RP: {:?}", headers); - info!("{log_meta} response body from RP: {}", utils::bytes_to_string(&rp_response_body.to_vec())); - - // validate reverse proxy response format, is it necessary? - return match utils::bytes_to_json::(rp_response_body.to_vec()) { - Err(e) => { - error!("Error parsing RP response: {:?}", e); - APIHandlerResponse { - status: StatusCode::INTERNAL_SERVER_ERROR, - body: None, - } - } - _ => { - // forward ReverseProxy's headers - ForwardHandler::create_response_headers(headers, ctx, FpHeaderResponseKey.placeholder_value()); - - APIHandlerResponse { - status: StatusCode::OK, - body: Some(rp_response_body.to_vec()), // forward reverse proxy's response - } - } - }; - } - _ => {} - }; - - ForwardHandler::handle_failed_forward_response(log_meta, response).await - } - - /// handle failed forward requests (to ReverseProxy) - async fn handle_failed_forward_response( - log_meta: String, - response: Result - ) -> APIHandlerResponse { - match response { - Ok(res) => { - // Handle 4xx/5xx errors - let status = res.status(); - error!("{log_meta} RP Response: {:?}", res); - - let error_body = match res.content_length() { - None => "internal-server-error".to_string(), - Some(_) => { - res.text().await.unwrap_or_else(|_e| "".to_string()) - } - }; - - let response_bytes = ErrorResponse { - error: error_body - }.to_bytes(); - - APIHandlerResponse { - status: StatusCode::try_from(status.as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), - body: Some(response_bytes), - } - } - Err(e) => { - error!("Failed to forward request to RP: {}", e); - - let response_body_bytes = ErrorResponse { - error: e.to_string(), - }.to_bytes(); - - APIHandlerResponse { - status: StatusCode::INTERNAL_SERVER_ERROR, - body: Some(response_body_bytes), - } - } - } - } - - pub async fn handle_init_encrypted_tunnel(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { + /// Validate request body and get ntor certificate for the given backend URL. + pub async fn handle_init_tunnel_request(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { // validate request body let received_body = match ForwardHandler::parse_request_body::< - InitEncryptedTunnelRequest, + InitTunnelRequest, ErrorResponse >(&ctx.get_request_body()) { @@ -236,101 +100,60 @@ impl ForwardHandler { None => "".to_string() }; - //todo handle result_public_key - let _result_public_key = self.get_public_key(backend_url.to_string(), ctx).await; + //todo handle result_certificate, if no certificate found, return error + let _result_certificate = self.get_public_key(backend_url.to_string(), ctx).await; // println!("public_key: {:?}", result_public_key); - - // copy origin headers and add ForwardProxy header - let new_headers = ForwardHandler::create_forward_request_headers( - ctx, - FpHeaderRequestKey.placeholder_value(), - received_body.len() + ctx.set( + consts::NTOR_SERVER_ID.to_string(), + consts::NTOR_SERVER_ID_TMP_VALUE.to_string(), // replace with real value + ); + ctx.set( + consts::NTOR_STATIC_PUBLIC_KEY.to_string(), + hex::encode(consts::NTOR_STATIC_PUBLIC_KEY_TMP_VALUE), // replace with real value ); - // forward origin request body - ForwardHandler::init_tunnel_forward_to_rp(ctx, new_headers, received_body).await + APIHandlerResponse { + status: StatusCode::OK, + body: Some(received_body), + } } - /// forward manipulated `proxy` request to ReverseProxy and handle success response - async fn proxy_forward_to_rp( - ctx: &mut Layer8Context, - headers: HeaderMap, - body: Vec, - ) -> APIHandlerResponse - { - let body_string = utils::bytes_to_string(&body); - let log_meta = format!("[FORWARD {}]", RP_PROXY_PATH.as_str()); - debug!("{log_meta} request headers to RP: {:?}", headers); - debug!("{log_meta} request body to RP: {}", body_string); + pub fn handle_init_tunnel_response(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { + let ntor_server_id = ctx.get(&*consts::NTOR_SERVER_ID.to_string()).unwrap().clone(); + let ntor_static_public_key = hex::decode( + ctx.get(&*consts::NTOR_STATIC_PUBLIC_KEY.to_string()).clone().unwrap() + ).unwrap(); - let client = Client::new(); - let response = client - .post(RP_PROXY_PATH.as_str()) - .headers(headers) - .body(body_string) - .send() - .await; + let response_body = ctx.get_response_body(); - match response { - Ok(res) if res.status().is_success() => { - let headers = res.headers().clone(); - let rp_response_bytes = res.bytes().await.unwrap_or_default(); - debug!("{log_meta} response headers from RP: {:?}", headers); - debug!("{log_meta} response body from RP: {}", utils::bytes_to_string(&rp_response_bytes.to_vec())); - - // validate reverse proxy's response body format, is it necessary? - match utils::bytes_to_json::(rp_response_bytes.to_vec()) { - Err(err) => { - error!("Reverse Proxy's response mismatch: {:}", err); - return APIHandlerResponse { - status: StatusCode::INTERNAL_SERVER_ERROR, - body: None, - }; - } - Ok(_) => {} + return match utils::bytes_to_json::(response_body) { + Err(e) => { + error!("Error parsing RP response: {:?}", e); + APIHandlerResponse { + status: StatusCode::INTERNAL_SERVER_ERROR, + body: None, } - - // forward ReverseProxy's headers - ForwardHandler::create_response_headers(headers, ctx, FpHeaderResponseKey.placeholder_value()); - - return APIHandlerResponse { - status: StatusCode::OK, - body: Some(rp_response_bytes.to_vec()), // forward ReverseProxy's response - }; } - _ => {} - }; - - ForwardHandler::handle_failed_forward_response(log_meta, response).await - } + Ok(res_from_rp) => { + // forward ReverseProxy's headers - pub async fn handle_proxy(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { - // validate request body - let received_body = match ForwardHandler:: - parse_request_body::(&ctx.get_request_body()) - { - Ok(body) => body.to_bytes(), - Err(err) => { - let body = match err { - None => None, - Some(body) => Some(body.to_bytes()) + let res_to_int = InitTunnelResponseToINT { + ephemeral_public_key: res_from_rp.public_key, + t_b_hash: res_from_rp.t_b_hash, + session_id: res_from_rp.session_id, + static_public_key: ntor_static_public_key, + server_id: ntor_server_id, }; - return APIHandlerResponse { - status: StatusCode::BAD_REQUEST, - body, - }; + APIHandlerResponse { + status: StatusCode::OK, + body: Some(res_to_int.to_bytes()), + } } }; - - let new_headers = ForwardHandler::create_forward_request_headers( - ctx, FpHeaderRequestKey.placeholder_value(), received_body.len()); - - // send new request to ReverseProxy - ForwardHandler::proxy_forward_to_rp(ctx, new_headers, received_body).await } - pub async fn handle_healthcheck(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { + pub fn handle_healthcheck(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { let error = ctx.param("error").unwrap(); if error == "true" { diff --git a/forward-proxy/src/handler/types/request.rs b/forward-proxy/src/handler/types/request.rs index dd4c206..a0977f7 100644 --- a/forward-proxy/src/handler/types/request.rs +++ b/forward-proxy/src/handler/types/request.rs @@ -2,11 +2,11 @@ use serde::{Deserialize, Serialize}; use pingora_router::handler::RequestBodyTrait; #[derive(Serialize, Deserialize, Debug)] -pub struct InitEncryptedTunnelRequest { - pub int_request_body: String, +pub struct InitTunnelRequest { + pub public_key: Vec, } -impl RequestBodyTrait for InitEncryptedTunnelRequest {} +impl RequestBodyTrait for InitTunnelRequest {} #[derive(Serialize, Deserialize, Debug)] pub struct ProxyRequest { diff --git a/forward-proxy/src/handler/types/response.rs b/forward-proxy/src/handler/types/response.rs index 1df4d5f..d879571 100644 --- a/forward-proxy/src/handler/types/response.rs +++ b/forward-proxy/src/handler/types/response.rs @@ -16,11 +16,24 @@ impl ResponseBodyTrait for ErrorResponse { } #[derive(Serialize, Deserialize, Debug)] -pub struct InitEncryptedTunnelResponse { // this struct should match ReverseProxy's Response - pub rp_response_body: String, +pub struct InitTunnelResponseFromRP { // this struct should match ReverseProxy's Response + pub public_key: Vec, + pub t_b_hash: Vec, + pub session_id: String, +} + +impl ResponseBodyTrait for InitTunnelResponseFromRP {} + +#[derive(Serialize, Deserialize, Debug)] +pub struct InitTunnelResponseToINT { // this struct should match Interceptor's expected Response + pub ephemeral_public_key: Vec, + pub t_b_hash: Vec, + pub session_id: String, + pub static_public_key: Vec, + pub server_id: String } -impl ResponseBodyTrait for InitEncryptedTunnelResponse {} +impl ResponseBodyTrait for InitTunnelResponseToINT {} #[derive(Serialize, Deserialize, Debug)] pub struct ProxyResponse { diff --git a/forward-proxy/src/handler/utils.rs b/forward-proxy/src/handler/utils.rs deleted file mode 100644 index 1e3b0b8..0000000 --- a/forward-proxy/src/handler/utils.rs +++ /dev/null @@ -1,50 +0,0 @@ -use std::collections::HashMap; -use chrono::Utc; -use jsonwebtoken::{encode, EncodingKey, Header}; -use reqwest::header::HeaderMap; -use serde::{Deserialize, Serialize}; - -// Get SECRET_KEY from environment variable -pub fn get_secret_key() -> String { - std::env::var("JWT_SECRET_KEY").expect("JWT_SECRET_KEY must be set") -} - -#[derive(Serialize)] -struct Claims { - exp: usize, -} - -pub fn generate_standard_token(secret_key: &str) -> pingora::Result> { - let now = Utc::now(); - let claims = Claims { - exp: (now + chrono::Duration::days(1)).timestamp() as usize, - }; - - let token = encode( - &Header::new(jsonwebtoken::Algorithm::HS256), - &claims, - &EncodingKey::from_secret(secret_key.as_bytes()), - )?; - - Ok(token) -} - -pub fn bytes_to_json Deserialize<'de>>(bytes: Vec) -> Result { - serde_json::from_slice::(&bytes) -} - -pub fn to_reqwest_header(map: HashMap) -> HeaderMap { - let mut header_map = HeaderMap::new(); - for (k, v) in map { - if let Ok(header_name) = reqwest::header::HeaderName::try_from(k.as_str()) { - if let Ok(header_value) = reqwest::header::HeaderValue::from_str(&v) { - header_map.insert(header_name, header_value); - } - } - } - header_map -} - -pub fn bytes_to_string(bytes: &Vec) -> String { - String::from_utf8_lossy(bytes).to_string() -} \ No newline at end of file diff --git a/forward-proxy/src/main.rs b/forward-proxy/src/main.rs index 43ecacf..381da1b 100644 --- a/forward-proxy/src/main.rs +++ b/forward-proxy/src/main.rs @@ -1,15 +1,11 @@ -mod handler; mod proxy; +mod handler; use crate::handler::ForwardHandler; use env_logger::{Env, Target}; -use futures::FutureExt; use log::info; -use pingora::prelude::*; -use pingora_router::handler::APIHandler; -use pingora_router::router::Router; use proxy::ForwardProxy; -use std::sync::Arc; +use pingora::prelude::*; fn main() { // Load environment variables from .env file @@ -18,7 +14,8 @@ fn main() { // let log_file = fs::File::create("log.txt").expect("Failed to create log file"); // let config = ConfigBuilder::new().set_time_to_local(true).build(); // WriteLogger::init(LevelFilter::Debug, config, log_file).expect("Failed to initialize logger"); - env_logger::Builder::from_env(Env::default().write_style_or("RUST_LOG_STYLE", "always")) + env_logger::Builder::from_env(Env::default() + .write_style_or("RUST_LOG_STYLE", "always")) .format_file(true) .format_line_number(true) .target(Target::Stdout) @@ -29,34 +26,19 @@ fn main() { let mut server = Server::new(Some(Opt { conf: std::env::var("SERVER_CONF").ok(), ..Default::default() - })) - .unwrap(); + })).unwrap(); server.bootstrap(); - let fp_handler = Arc::new(ForwardHandler {}); - let mut router: Router> = Router::new(fp_handler.clone()); - - let handle_init_tunnel: APIHandler> = - Box::new(|h, ctx| async move { h.handle_init_encrypted_tunnel(ctx).await }.boxed()); + let fp_handler = ForwardHandler{}; - let handle_proxy: APIHandler> = - Box::new(|h, ctx| async move { h.handle_proxy(ctx).await }.boxed()); - - let handle_healthcheck: APIHandler> = - Box::new(|h, ctx| async move { h.handle_healthcheck(ctx).await }.boxed()); - - router.post( - "/init-tunnel?backend_url={}".to_string(), - Box::new([handle_init_tunnel]), - ); - router.post("/proxy".to_string(), Box::new([handle_proxy])); - router.get( - "/healthcheck?error={}".to_string(), - Box::new([handle_healthcheck]), + let mut proxy = http_proxy_service( + &server.configuration, + ForwardProxy::new(fp_handler) ); - let mut proxy = http_proxy_service(&server.configuration, ForwardProxy::new(router)); proxy.add_tcp("localhost:6191"); + server.add_service(proxy); + server.run_forever(); } diff --git a/forward-proxy/src/proxy.rs b/forward-proxy/src/proxy.rs index 254ef0d..df3d7c2 100644 --- a/forward-proxy/src/proxy.rs +++ b/forward-proxy/src/proxy.rs @@ -1,30 +1,37 @@ use std::sync::Arc; - +use std::time::Duration; use async_trait::async_trait; use boring::x509::X509; use bytes::Bytes; -use log::info; +use log::{error, info}; +use pingora::Error; +use pingora::prelude::{HttpPeer, ProxyHttp, Session}; +use pingora::http::{RequestHeader, ResponseHeader, StatusCode}; use pingora::upstreams::peer::PeerOptions; use pingora::utils::tls::CertKey; use pingora::OrErr; -use pingora::http::{ResponseHeader, StatusCode}; use pingora::listeners::tls::TLS_CONF_ERR; -use pingora::prelude::{HttpPeer, ProxyHttp, Session}; use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; -use pingora_router::router::Router; +use reqwest::header::TRANSFER_ENCODING; +use reqwest::Method; +use crate::handler::ForwardHandler; -pub struct ForwardProxy { - router: Router, +pub struct ForwardProxy { + handler: ForwardHandler, } -impl ForwardProxy { - pub fn new(router: Router) -> Self { - ForwardProxy { router } +impl ForwardProxy { + pub fn new(handler: ForwardHandler) -> Self { + ForwardProxy { + handler, + } } } +/// To see the order of execution and how the request is processed, refer to the documentation +/// see https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md #[async_trait] -impl ProxyHttp for ForwardProxy { +impl ProxyHttp for ForwardProxy { type CTX = Layer8Context; fn new_ctx(&self) -> Self::CTX { @@ -92,69 +99,158 @@ impl ProxyHttp for ForwardProxy { Self::CTX: Send + Sync, { // create Context - // ctx.update(session).await?; - // let request_summary = format!( - // "{} {}", - // session.req_header().method, - // session.req_header().uri.to_string() - // ); - // println!(); - // info!("[REQUEST {}] {:?}", request_summary, ctx.request); - // info!( - // "[REQUEST {}] Decoded body: {}", - // request_summary, - // String::from_utf8_lossy(&*ctx.get_request_body()) - // ); - // println!(); - - // let handler_response = self.router.call_handler(ctx).await; - // if handler_response.status == StatusCode::NOT_FOUND && handler_response.body == None { - // return Ok(false); - // } - - // // set headers - // let mut header = ResponseHeader::build(handler_response.status, None)?; - // let response_header = ctx.get_response_header().clone(); - // for (key, val) in response_header.iter() { - // header.insert_header(key.clone(), val.clone()).unwrap(); - // } - - // let mut response_bytes = vec![]; - // if let Some(body_bytes) = handler_response.body { - // header - // .insert_header("Content-length", &body_bytes.len().to_string()) - // .unwrap(); - // response_bytes = body_bytes; - // }; - - // debug!("HEADERS: {:?}", header.headers); - - // session.write_response_header_ref(&header).await?; - - // println!(); - // info!( - // "[RESPONSE {}] Header: {:?}", - // request_summary, header.headers - // ); - // info!( - // "[RESPONSE {}] Body: {}", - // request_summary, - // String::from_utf8_lossy(&*response_bytes) - // ); - // println!(); - - // Write the response body to the session after setting headers - - // let response_bytes = if response_bytes.is_empty() { - // None - // } else { - // Some(Bytes::from(response_bytes)) - // }; - - // session.write_response_body(response_bytes, true).await?; + ctx.update(session).await?; + let request_summary = session.request_summary(); + println!(); + info!("[REQUEST {}] {:?}", request_summary, ctx.request); + println!(); + + match session.req_header().method { + pingora::http::Method::OPTIONS => { + // Handle CORS preflight request + let header = ResponseHeader::build(StatusCode::NO_CONTENT, None)?; + session.write_response_header_ref(&header).await?; + session.set_keepalive(None); + return Ok(true); + } + _ => {} + } + + match ( + session.req_header().uri.path(), + session.req_header().method.as_str() + ) { + ("/healthcheck", "GET") => { + let handler_response = self.handler.handle_healthcheck(ctx); + let mut header = ResponseHeader::build(handler_response.status, None)?; + let response_headers = header.headers.clone(); + for (key, val) in response_headers.iter() { + header.insert_header(key.clone(), val.clone()).unwrap(); + }; + + let mut response_bytes = vec![]; + if let Some(body_bytes) = handler_response.body { + header + .insert_header("Content-length", &body_bytes.len().to_string()) + .unwrap(); + response_bytes = body_bytes; + }; + + session.write_response_header_ref(&header).await?; + + println!(); + info!("[RESPONSE {}] Header: {:?}", request_summary, header.headers); + info!( + "[RESPONSE {}] Body: {}", + request_summary, + String::from_utf8_lossy(&*response_bytes) + ); + println!(); + + // Write the response body to the session after setting headers + session + .write_response_body(Some(Bytes::from(response_bytes)), true) + .await?; + + return Ok(true); + } + ("/init-tunnel", "POST") => {} + ("/proxy", "POST") => {} + _ => { + let header = ResponseHeader::build(StatusCode::NOT_FOUND, None)?; + session.write_response_header_ref(&header).await?; + session.set_keepalive(None); + return Ok(true); + } + } + Ok(false) } + async fn request_body_filter( + &self, + session: &mut Session, + body: &mut Option, + end_of_stream: bool, + ctx: &mut Self::CTX, + ) -> pingora::Result<()> + where + Self::CTX: Send + Sync, + { + if let Some(b) = body { + ctx.extend_request_body(b.to_vec()); + // drop the body + b.clear(); + } + + if end_of_stream { + info!( + "[REQUEST {}] Decoded body: {}", + session.request_summary(), + String::from_utf8_lossy(&*ctx.get_request_body()), + ); + + // This is the last chunk, we can process the data now + + let handler_response = match session.req_header().uri.path() { + "/init-tunnel" => self.handler.handle_init_tunnel_request(ctx).await, + _ => { + info!( + "[FORWARD {}] FP forward request body: {}", + session.request_summary(), + utils::bytes_to_string(&ctx.get_request_body()) + ); + *body = Some(Bytes::copy_from_slice(ctx.get_request_body().as_slice())); + return Ok(()); + } + }; + + if handler_response.status != StatusCode::OK { + error!( + "[FORWARD {}] Error in request handler with status: {}, error: {}", + session.request_summary(), + handler_response.status, + utils::bytes_to_string(&handler_response.body.unwrap_or_default()) + ); + return Err(pingora::Error::new( + pingora::ErrorType::HTTPStatus(u16::from(handler_response.status)), + )); + } + + info!( + "[FORWARD {}] Request handler response: status: {}, body: {}", + session.request_summary(), + handler_response.status, + utils::bytes_to_string(&handler_response.body.as_ref().unwrap_or(&vec![])) + ); + let fp_req_body = handler_response.body.as_ref().unwrap_or(&vec![]).clone(); + + *body = Some(Bytes::copy_from_slice(fp_req_body.as_slice())); + } + + Ok(()) + } + + async fn upstream_request_filter( + &self, + _session: &mut Session, + upstream_request: &mut RequestHeader, + _ctx: &mut Self::CTX, + ) -> pingora::Result<()> + where + Self::CTX: Send + Sync, + { + upstream_request // is this still needed? + .insert_header("fp_request_header", "fp_request_value") + .unwrap(); + + upstream_request + .insert_header(TRANSFER_ENCODING.as_str(), "chunked") + .unwrap(); + + Ok(()) + } + async fn response_filter( &self, _session: &mut Session, @@ -162,10 +258,88 @@ impl ProxyHttp for ForwardProxy { _ctx: &mut Self::CTX, ) -> pingora::Result<()> { upstream_response.insert_header("Access-Control-Allow-Origin", "*")?; - upstream_response.insert_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")?; - upstream_response.insert_header("Access-Control-Allow-Headers", "Content-Type")?; + upstream_response.insert_header("Access-Control-Allow-Methods", "POST")?; + upstream_response.insert_header("Access-Control-Allow-Headers", "*")?; + upstream_response.insert_header(TRANSFER_ENCODING.as_str(), "chunked")?; + Ok(()) } + + fn response_body_filter( + &self, + session: &mut Session, + body: &mut Option, + end_of_stream: bool, + ctx: &mut Self::CTX, + ) -> pingora::Result> + where + Self::CTX: Send + Sync, + { + if let Some(b) = body { + ctx.extend_response_body(b.to_vec()); + // drop the body + b.clear(); + } + + if end_of_stream { + // This is the last chunk, we can process the data now + info!( + "[FORWARD {}] RP Response decoded body: {}", + session.request_summary(), + String::from_utf8_lossy(&*ctx.get_response_body()), + ); + + let handler_response = match session.req_header().uri.path() { + "/init-tunnel" => self.handler.handle_init_tunnel_response(ctx), + _ => { + info!( + "[RESPONSE {}] FP forward response body: {}", + session.request_summary(), + utils::bytes_to_string(&ctx.get_response_body()) + ); + *body = Some(Bytes::copy_from_slice(ctx.get_response_body().as_slice())); + return Ok(None); + } + }; + + if handler_response.status != StatusCode::OK { + error!( + "[RESPONSE {}] Error in response handler with status: {}, error: {}", + session.request_summary(), + handler_response.status, + utils::bytes_to_string(&handler_response.body.unwrap_or_default()) + ); + return Err(pingora::Error::new( + pingora::ErrorType::HTTPStatus( + u16::from(StatusCode::INTERNAL_SERVER_ERROR) + ), + )); + } + + info!( + "[RESPONSE {}] FP response with status: {}, body: {}", + session.request_summary(), + handler_response.status, + utils::bytes_to_string(&handler_response.body.as_ref().unwrap_or(&vec![])) + ); + let fp_res_body = handler_response.body.as_ref().unwrap_or(&vec![]).clone(); + + *body = Some(Bytes::copy_from_slice(fp_res_body.as_slice())); + } + + Ok(None) + } + + fn fail_to_connect( + &self, + _session: &mut Session, + _peer: &HttpPeer, + _ctx: &mut Self::CTX, + e: Box, + ) -> Box { + error!("Failed to connect to upstream: {}", e); + e + } } mod certs { diff --git a/pingora-router/src/ctx.rs b/pingora-router/src/ctx.rs index b3e622c..e1e098f 100644 --- a/pingora-router/src/ctx.rs +++ b/pingora-router/src/ctx.rs @@ -83,11 +83,6 @@ impl Layer8Context { pub async fn update(&mut self, session: &mut Session) -> pingora::Result { self.request.summary = Layer8ContextRequestSummary::from(session); - match get_request_body(session).await { - Ok(body) => self.request.body = body, - Err(err) => return Err(err) - }; - self.set_request_header(session.req_header().clone()); // take anything as needed later @@ -95,6 +90,14 @@ impl Layer8Context { Ok(true) } + pub async fn read_request_body(&mut self, session: &mut Session) -> pingora::Result { + match get_request_body(session).await { + Ok(body) => self.request.body = body, + Err(err) => return Err(err) + }; + Ok(true) + } + } impl Layer8ContextTrait for Layer8Context { @@ -139,6 +142,10 @@ impl Layer8ContextTrait for Layer8Context { self.request.body = body } + fn extend_request_body(&mut self, body: Vec) { + self.request.body.extend(body) + } + fn get_request_body(&self) -> Vec { self.request.body.clone() } @@ -147,6 +154,10 @@ impl Layer8ContextTrait for Layer8Context { self.response.body = body } + fn extend_response_body(&mut self, body: Vec) { + self.response.body.extend(body); + } + fn get_response_body(&self) -> Vec { self.response.body.clone() } @@ -178,8 +189,10 @@ pub trait Layer8ContextTrait { fn remove_response_header(&mut self, key: &str) -> Option; fn get_response_header(&self) -> &Layer8Header; fn set_request_body(&mut self, body: Vec); + fn extend_request_body(&mut self, body: Vec); fn get_request_body(&self) -> Vec; fn set_response_body(&mut self, body: Vec); + fn extend_response_body(&mut self, body: Vec); fn get_response_body(&self) -> Vec; fn get(&self, key: &str) -> Option<&String>; fn set(&mut self, key: String, value: String); diff --git a/reverse-proxy/Cargo.toml b/reverse-proxy/Cargo.toml index d4207d4..16e1bcc 100644 --- a/reverse-proxy/Cargo.toml +++ b/reverse-proxy/Cargo.toml @@ -21,3 +21,9 @@ futures = "0.3.31" boring = "4.17.0" once_cell = "1.21.3" dotenv = "0.15.0" +ntor = { git = "https://github.com/globe-and-citizen/ntor.git", tag = "0.1.1"} +#ntor = { path = "../../../ntor" } +config = "0.15.11" +toml = "0.8.23" +uuid = { version = "1.16.0", features = ["v4"] } +utils = { path = "../utils" } diff --git a/reverse-proxy/config.toml b/reverse-proxy/config.toml new file mode 100644 index 0000000..c7dbdd7 --- /dev/null +++ b/reverse-proxy/config.toml @@ -0,0 +1,16 @@ +[upstream] +host="localhost" +port=0 + +[server] +local_address="127.0.0.1:6194" +public_address="0.0.0.0:6193" + +[log] +level="DEBUG" +path="console" + +[handler] +#jwt_secret="this is 32-byte wgp's jwt secret" +ntor_server_id="ReverseProxyServer" +ntor_static_secret="this is 32-byte nTorStaticSecret" diff --git a/reverse-proxy/src/config.rs b/reverse-proxy/src/config.rs new file mode 100644 index 0000000..9b1a0a8 --- /dev/null +++ b/reverse-proxy/src/config.rs @@ -0,0 +1,66 @@ +use std::fs; +use serde::Deserialize; +use toml; + +#[derive(Debug, Deserialize)] +pub struct Config { + pub upstream: UpstreamConfig, + pub log: LogConfig, + pub server: ServerConfig, + pub handler: HandlerConfig +} + +impl Config { + /// panic if unable to validate. + /// assuming after this validation, all configs are valid + pub fn validate(&self) { + // todo + } +} + +impl Config { + pub fn from_file(path: &str) -> Self { + let content = fs::read_to_string(path).expect("Failed to read configuration file"); + toml::from_str(&content).expect("Failed to parse configuration file") + } +} + +#[derive(Debug, Deserialize)] +pub(super) struct UpstreamConfig { + pub host: String, + pub port: u16, +} + +#[derive(Debug, Deserialize)] +pub(super) struct LogConfig { + pub path: String, + pub level: String, +} + +impl LogConfig { + pub fn to_level_filter(&self) -> log::LevelFilter { + match self.level.to_uppercase().as_str() { + "INFO" => log::LevelFilter::Info, + "DEBUG" => log::LevelFilter::Debug, + "WARNING" => log::LevelFilter::Warn, + "ERROR" => log::LevelFilter::Error, + "TRACE" => log::LevelFilter::Trace, + "OFF" => log::LevelFilter::Off, + _ => log::max_level() + } + } +} + +#[derive(Debug, Deserialize)] +pub(super) struct ServerConfig { + pub local_address: String, + pub public_address: String +} + +#[derive(Debug, Deserialize)] +pub(super) struct HandlerConfig { + pub ntor_server_id: String, + pub ntor_static_secret: String, +} + + diff --git a/reverse-proxy/src/handler/consts.rs b/reverse-proxy/src/handler/common/consts.rs similarity index 80% rename from reverse-proxy/src/handler/consts.rs rename to reverse-proxy/src/handler/common/consts.rs index 218a31c..2b6580a 100644 --- a/reverse-proxy/src/handler/consts.rs +++ b/reverse-proxy/src/handler/common/consts.rs @@ -8,6 +8,7 @@ pub enum HeaderKeys { IntHeaderRequestKey, FpHeaderRequestKey, BeHeaderResponseKey, + NTorSessionIDKey } impl HeaderKeys { @@ -19,6 +20,7 @@ impl HeaderKeys { HeaderKeys::BeHeaderResponseKey => "be_response_header", HeaderKeys::FpHeaderRequestKey => "fp_request_header", HeaderKeys::IntHeaderRequestKey => "int_request_header", + HeaderKeys::NTorSessionIDKey => "ntor-session-id" } } @@ -31,8 +33,7 @@ impl HeaderKeys { } } -// fixme BE path should be taken from configuration -const BACKEND_URL: &str = "http://localhost:3000"; +// fixme BE path should be taken` from configuration +pub const BACKEND_HOST: &str = "http://localhost:3000"; pub static INIT_TUNNEL_TO_BACKEND_PATH: Lazy = - Lazy::new(|| format!("{}/init-tunnel", BACKEND_URL)); -pub static PROXY_TO_BACKEND_PATH: Lazy = Lazy::new(|| format!("{}/proxy", BACKEND_URL)); + Lazy::new(|| format!("{}/init-tunnel", BACKEND_HOST)); \ No newline at end of file diff --git a/reverse-proxy/src/handler/common/handler.rs b/reverse-proxy/src/handler/common/handler.rs new file mode 100644 index 0000000..6fd4f18 --- /dev/null +++ b/reverse-proxy/src/handler/common/handler.rs @@ -0,0 +1,60 @@ +use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; +use reqwest::header::HeaderMap; +use crate::handler::common::consts::HeaderKeys::{FpHeaderRequestKey, IntHeaderRequestKey, RpHeaderRequestKey, RpHeaderResponseKey}; +use utils::{to_reqwest_header}; + +/// Struct containing only associated methods (no instance methods or fields). +/// The contents are quite drafting, but the idea is to handle common operations +pub struct CommonHandler {} + +impl CommonHandler { + /// Add response headers to `ctx` to respond to FP: + /// - *Copy* Backend's response header in `headers` - *update* `Content-Length` + /// - *Add* custom ReverseProxy's response headers `custom_header` + pub fn create_response_headers( + headers: HeaderMap, + ctx: &mut Layer8Context, + custom_header: &str, + content_length: usize, + ) { + for (key, val) in headers.iter() { + if let (k, Ok(v)) = (key.to_string(), val.to_str()) { + ctx.insert_response_header(k.as_str(), v); + } + } + + ctx.insert_response_header( + RpHeaderResponseKey.as_str(), + custom_header, + ); + + ctx.insert_response_header("Content-Length", &*content_length.to_string()) + } + + /// Create request header to send/forward to BE: + /// - *Copy* origin request headers from ForwardProxy `ctx` + /// - *Add* custom ReverseProxy's request headers `custom_header` + /// - *Set* universal Content-Type and Content-Length + pub fn create_forward_request_headers( + ctx: &mut Layer8Context, + custom_header: &str, + content_length: usize, + ) -> HeaderMap { + // copy all origin header to new request + let origin_headers = ctx.get_request_header().clone(); + let mut reqwest_header = to_reqwest_header(origin_headers); + + // add forward proxy header `fp_request_header` + reqwest_header.insert( + RpHeaderRequestKey.as_str(), + custom_header.parse().unwrap(), + ); + + reqwest_header.insert("Content-Length", content_length.to_string().parse().unwrap()); + reqwest_header.insert("Content-Type", "application/json".parse().unwrap()); + reqwest_header.remove(IntHeaderRequestKey.as_str()); + reqwest_header.remove(FpHeaderRequestKey.as_str()); + + reqwest_header + } +} \ No newline at end of file diff --git a/reverse-proxy/src/handler/common/mod.rs b/reverse-proxy/src/handler/common/mod.rs new file mode 100644 index 0000000..a6ef363 --- /dev/null +++ b/reverse-proxy/src/handler/common/mod.rs @@ -0,0 +1,3 @@ +pub(crate) mod consts; +pub mod types; +pub mod handler; diff --git a/reverse-proxy/src/handler/common/types.rs b/reverse-proxy/src/handler/common/types.rs new file mode 100644 index 0000000..93b7cd4 --- /dev/null +++ b/reverse-proxy/src/handler/common/types.rs @@ -0,0 +1,17 @@ +use std::fmt::Debug; +use serde::{Deserialize, Serialize}; +use pingora_router::handler::ResponseBodyTrait; +use serde_json::Error; + +#[derive(Serialize, Deserialize, Debug)] +pub struct ErrorResponse { + pub error: String +} + +impl ResponseBodyTrait for ErrorResponse { + fn from_json_err(err: Error) -> Option { + Some(ErrorResponse { + error: err.to_string() + }) + } +} \ No newline at end of file diff --git a/reverse-proxy/src/handler/init_tunnel/handler.rs b/reverse-proxy/src/handler/init_tunnel/handler.rs new file mode 100644 index 0000000..2732dcb --- /dev/null +++ b/reverse-proxy/src/handler/init_tunnel/handler.rs @@ -0,0 +1,63 @@ +use log::{error, info}; +use pingora::http::StatusCode; +use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; +use pingora_router::handler::{APIHandlerResponse, DefaultHandlerTrait, ResponseBodyTrait}; +use reqwest::Client; +use crate::handler::common::consts::INIT_TUNNEL_TO_BACKEND_PATH; +use crate::handler::common::types::ErrorResponse; +use crate::handler::init_tunnel::{InitEncryptedTunnelRequest, InitTunnelRequestToBackend}; + +/// Struct containing only associated methods (no instance methods or fields) +pub(crate) struct InitTunnelHandler {} + +impl DefaultHandlerTrait for InitTunnelHandler {} + +impl InitTunnelHandler { + pub(crate) async fn validate_request_body(ctx: &mut Layer8Context) + -> Result + { + return match InitTunnelHandler::parse_request_body::< + InitEncryptedTunnelRequest, + ErrorResponse + >(&ctx.get_request_body()) + { + Ok(res) => Ok(res), + Err(err) => { + let body = match err { + None => None, + Some(err_response) => Some(err_response.to_bytes()) + }; + + InitTunnelHandler::send_result_to_be(false).await; + + Err(APIHandlerResponse { + status: StatusCode::BAD_REQUEST, + body, + }) + } + }; + } + + pub(crate) async fn send_result_to_be(result: bool) { + let body = InitTunnelRequestToBackend { + success: result, + }; + let log_meta = format!("[FORWARD {}]", INIT_TUNNEL_TO_BACKEND_PATH.as_str()); + info!("{log_meta} request to BE body: {:?}", body); + + let client = Client::new(); + match client.post(INIT_TUNNEL_TO_BACKEND_PATH.as_str()) + .header("Content-Type", "application/json") + .json(&body) + .send() + .await + { + Ok(res) => { + info!("{log_meta} Response sending init-tunnel result to BE: {:?}", res) + } + Err(err) => { + error!("{log_meta} Error sending init-tunnel result to BE: {:?}", err) + } + } + } +} diff --git a/reverse-proxy/src/handler/init_tunnel/mod.rs b/reverse-proxy/src/handler/init_tunnel/mod.rs new file mode 100644 index 0000000..29a1717 --- /dev/null +++ b/reverse-proxy/src/handler/init_tunnel/mod.rs @@ -0,0 +1,27 @@ +pub(crate) mod handler; + +use serde::{Deserialize, Serialize}; +use pingora_router::handler::{RequestBodyTrait, ResponseBodyTrait}; + +#[derive(Serialize, Deserialize, Debug)] +pub struct InitEncryptedTunnelRequest { + pub public_key: Vec, +} + +impl RequestBodyTrait for InitEncryptedTunnelRequest {} + +#[derive(Serialize, Deserialize, Debug)] +pub struct InitTunnelRequestToBackend { + pub success: bool, +} + +impl RequestBodyTrait for InitTunnelRequestToBackend {} + +#[derive(Serialize, Deserialize, Debug)] +pub struct InitEncryptedTunnelResponse { + pub public_key: Vec, + pub t_b_hash: Vec, + pub session_id: String +} + +impl ResponseBodyTrait for InitEncryptedTunnelResponse {} diff --git a/reverse-proxy/src/handler/mod.rs b/reverse-proxy/src/handler/mod.rs index ac5860f..27c8859 100644 --- a/reverse-proxy/src/handler/mod.rs +++ b/reverse-proxy/src/handler/mod.rs @@ -1,128 +1,105 @@ -use log::{debug, error, info}; +use std::collections::HashMap; +use std::sync::{Mutex, MutexGuard}; +use log::debug; +use ntor::common::{InitSessionMessage, NTorParty}; +use ntor::server::NTorServer; use pingora::http::StatusCode; -use reqwest::{Client}; use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; -use pingora_router::handler::{APIHandlerResponse, DefaultHandlerTrait, RequestBodyTrait, ResponseBodyTrait}; -use reqwest::header::HeaderMap; -use crate::handler::consts::{INIT_TUNNEL_TO_BACKEND_PATH, PROXY_TO_BACKEND_PATH}; -use crate::handler::consts::HeaderKeys::{FpHeaderRequestKey, IntHeaderRequestKey, RpHeaderRequestKey, RpHeaderResponseKey}; -use crate::handler::types::{ErrorResponse, InitEncryptedTunnelRequest, InitEncryptedTunnelResponse, InitTunnelRequestToBackend, ProxyRequest, ProxyRequestToBackend, ProxyResponse, ProxyResponseFromBackend}; - -pub mod types; -mod consts; -mod utils; - -pub struct ReverseHandler {} - -impl DefaultHandlerTrait for ReverseHandler {} +use pingora_router::handler::{APIHandlerResponse, ResponseBodyTrait}; +use crate::handler::common::consts::HeaderKeys::{RpHeaderResponseKey}; +use init_tunnel::handler::InitTunnelHandler; +use proxy::handler::ProxyHandler; +use init_tunnel::InitEncryptedTunnelResponse; +use utils::{new_uuid, string_to_array32}; +use crate::config::HandlerConfig; + +mod common; +mod init_tunnel; +mod proxy; + +thread_local! { + // + static NTOR_SHARED_SECRETS: Mutex>> = Mutex::new(HashMap::new()); +} + +pub struct ReverseHandler { + config: HandlerConfig, + ntor_static_secret: [u8; 32], +} impl ReverseHandler { - /// Add response headers to `ctx` to respond to FP: - /// - *Copy* Backend's response header in `headers` - *update* `Content-Length` - /// - *Add* custom ReverseProxy's response headers `custom_header` - fn create_response_headers( - headers: HeaderMap, - ctx: &mut Layer8Context, - custom_header: &str, - content_length: usize, - ) { - for (key, val) in headers.iter() { - if let (k, Ok(v)) = (key.to_string(), val.to_str()) { - ctx.insert_response_header(k.as_str(), v); - } - } - - ctx.insert_response_header( - RpHeaderResponseKey.as_str(), - custom_header, - ); - - ctx.insert_response_header("Content-Length", &*content_length.to_string()) - } - - /// Create request header to send/forward to BE: - /// - *Copy* origin request headers from ForwardProxy `ctx` - /// - *Add* custom ReverseProxy's request headers `custom_header` - /// - *Set* universal Content-Type and Content-Length - fn create_forward_request_headers( - ctx: &mut Layer8Context, - custom_header: &str, - content_length: usize, - ) -> HeaderMap { - // copy all origin header to new request - let origin_headers = ctx.get_request_header().clone(); - let mut reqwest_header = utils::to_reqwest_header(origin_headers); - - // add forward proxy header `fp_request_header` - reqwest_header.insert( - RpHeaderRequestKey.as_str(), - custom_header.parse().unwrap(), - ); - - reqwest_header.insert("Content-Length", content_length.to_string().parse().unwrap()); - reqwest_header.insert("Content-Type", "application/json".parse().unwrap()); - reqwest_header.remove(IntHeaderRequestKey.as_str()); - reqwest_header.remove(FpHeaderRequestKey.as_str()); + pub fn new(config: HandlerConfig) -> Self { + let ntor_secret = string_to_array32(config.ntor_static_secret.clone()).unwrap(); - reqwest_header + ReverseHandler { + config, + ntor_static_secret: ntor_secret, + } } - async fn init_tunnel_result_to_be(result: bool) { - let body = InitTunnelRequestToBackend { - success: result, - }; - let log_meta = format!("[FORWARD {}]", INIT_TUNNEL_TO_BACKEND_PATH.as_str()); - info!("{log_meta} request to BE body: {:?}", body); - - let client = Client::new(); - match client.post(INIT_TUNNEL_TO_BACKEND_PATH.as_str()) - .header("Content-Type", "application/json") - .json(&body) - .send() - .await - { - Ok(res) => { - info!("{log_meta} Response sending init-tunnel result to BE: {:?}", res) - } - Err(err) => { - error!("{log_meta} Error sending init-tunnel result to BE: {:?}", err) + fn get_ntor_shared_secret(&self, session_id: String) -> Result, APIHandlerResponse> { + let shared_secret = NTOR_SHARED_SECRETS.with(|memory| { + let guard = memory.lock().unwrap(); + guard.get(&session_id).cloned() + }); + + return match shared_secret { + Some(secret) => Ok(secret.clone()), + None => { + Err(APIHandlerResponse { + status: StatusCode::UNAUTHORIZED, + body: Some("Invalid or expired nTor session ID".as_bytes().to_vec()), + }) } } } pub async fn handle_init_tunnel(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { // validate request body - let request_body = match ReverseHandler::parse_request_body:: - (&ctx.get_request_body()) - { - Ok(res) => res.to_bytes(), - Err(err) => { - let body = match err { - None => None, - Some(err_response) => Some(err_response.to_bytes()) - }; - - ReverseHandler::init_tunnel_result_to_be(false).await; - - return APIHandlerResponse { - status: StatusCode::BAD_REQUEST, - body, - }; - } + let request_body = match InitTunnelHandler::validate_request_body(ctx).await { + Ok(res) => res, + Err(res) => return res }; debug!("[REQUEST /init-tunnel] Parsed body: {:?}", request_body); - // todo validate request headers + // todo I think there are prettier ways to use nTor since we are free to modify the nTor crate, but I'm lazy + let mut ntor_server = NTorServer::new_with_secret( + self.config.ntor_server_id.clone(), + self.ntor_static_secret, + ); - // set ReverseProxy's response header - ctx.insert_response_header(RpHeaderResponseKey.as_str(), RpHeaderResponseKey.placeholder_value()); + if request_body.public_key.len() != 32 { + return APIHandlerResponse { + status: StatusCode::BAD_REQUEST, + body: Some("Invalid public key length".as_bytes().to_vec()), + }; + } + + // Client initializes session with the server + let init_session_msg = InitSessionMessage::from(request_body.public_key); + + let init_session_response = ntor_server.accept_init_session_request(&init_session_msg); + + let ntor_session_id = new_uuid(); - // ReverseProxy's response body let response = InitEncryptedTunnelResponse { - rp_response_body: "body added in ReverseProxy".to_string(), + public_key: init_session_response.public_key(), + t_b_hash: init_session_response.t_b_hash(), + session_id: ntor_session_id.clone(), }; - ReverseHandler::init_tunnel_result_to_be(true).await; + // set ReverseProxy's response header + ctx.insert_response_header( + RpHeaderResponseKey.as_str(), + RpHeaderResponseKey.placeholder_value() + ); + + InitTunnelHandler::send_result_to_be(true).await; + + NTOR_SHARED_SECRETS.with(|memory| { + let mut guard: MutexGuard>> = memory.lock().unwrap(); + guard.insert(ntor_session_id, ntor_server.get_shared_secret().unwrap()); + }); APIHandlerResponse { status: StatusCode::OK, @@ -130,130 +107,55 @@ impl ReverseHandler { } } - /// - get spa_request_body from received request body - /// - send spa body to backend with ReverseProxy header - async fn proxy_request_to_backend( - &self, - ctx: &mut Layer8Context, - headers: HeaderMap, - body: ProxyRequestToBackend, - ) -> APIHandlerResponse { - let new_body_string = String::from_utf8_lossy(&body.to_bytes()).to_string(); - - let log_meta = format!("[FORWARD {}]", PROXY_TO_BACKEND_PATH.as_str()); - info!("{log_meta} request to BE headers: {:?}", headers); - info!("{log_meta} request to BE body: {}", new_body_string); + pub async fn handle_proxy_request(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { - let client = Client::new(); - let response = client.post(PROXY_TO_BACKEND_PATH.as_str()) - .header("Content-Type", "application/json") - .headers(headers) - .body(new_body_string) - .send() - .await; + // validate request headers (nTor session ID) + let session_id = match ProxyHandler::validate_request_headers(ctx) { + Ok(session_id) => session_id, + Err(res) => return res, + }; - match response { - Ok(reqw_response) if reqw_response.status().is_success() => { - let headers = reqw_response.headers().clone(); - info!("{log_meta} response from BE headers: {:?}", reqw_response.headers()); - return match reqw_response.json::().await { - Ok(res) => { - info!("{log_meta} response from BE body: {:?}", res); - // create new response body from backend's response - let proxy_response = ProxyResponse { - be_response_body: res.be_response_body, - rp_response_body: "body added in ReverseProxy".to_string(), - }.to_bytes(); + let shared_secret = match self.get_ntor_shared_secret(session_id) { + Ok(secret) => secret, + Err(res) => return res, + }; - ReverseHandler::create_response_headers( - headers, - ctx, - RpHeaderResponseKey.placeholder_value(), - proxy_response.len(), - ); + // validate request body + let request_body = match ProxyHandler::validate_request_body(ctx) { + Ok(res) => res, + Err(res) => return res, + }; - APIHandlerResponse { - status: StatusCode::OK, - body: Some(proxy_response), - } - } - Err(err) => { - error!("Parsing backend body error: {:?}", err); - APIHandlerResponse { - status: StatusCode::INTERNAL_SERVER_ERROR, - body: None, - } - } - }; - } - Ok(res) => { - // Handle 4xx/5xx errors - let status = res.status(); - error!("{log_meta} BE Response: {:?}", res); + let wrapped_request = match ProxyHandler::decrypt_request_body( + request_body, + self.config.ntor_server_id.clone(), + shared_secret.clone(), + ) { + Ok(req) => req, + Err(res) => return res, + }; + debug!("[REQUEST /proxy] Decrypted request: {:?}", wrapped_request); - let error_body = match res.content_length() { - None => "internal-server-error".to_string(), - Some(_) => { - res.text().await.unwrap_or_else(|_e| "".to_string()) - } - }; + // reconstruct user request + let wrapped_response = match ProxyHandler::rebuild_user_request(wrapped_request).await { + Ok(res) => res, + Err(res) => return res, + }; - let response_bytes = ErrorResponse { - error: error_body - }.to_bytes(); + debug!("[RESPONSE /proxy] Wrapped Backend response: {:?}", wrapped_response); + return match ProxyHandler::encrypt_response_body( + wrapped_response, + self.config.ntor_server_id.clone(), + shared_secret + ) { + Ok(encrypted_message) => { APIHandlerResponse { - status: StatusCode::try_from(status.as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), - body: Some(response_bytes), - } - } - Err(err) => { - error!("{log_meta} Error: {:?}", err); - let status = err.status().unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR); - let err_body = ErrorResponse { - error: format!("Backend error: {}", status), - }; - - APIHandlerResponse { - status: StatusCode::BAD_GATEWAY, - body: Some(err_body.to_bytes()), + status: StatusCode::OK, + body: Some(encrypted_message.to_bytes()), } } + Err(res) => res } } - - pub async fn handle_proxy_request(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { - // validate request body - let body = ctx.get_request_body(); - let request_body = match ReverseHandler::parse_request_body::(&body) { - Ok(proxy_request) => proxy_request, - Err(err) => { - let err_body = match err { - None => None, - Some(err_response) => { - error!("Error parsing request body: {}", err_response.error); - Some(err_response.to_bytes()) - } - }; - - return APIHandlerResponse { - status: StatusCode::BAD_REQUEST, - body: err_body, - }; - } - }; - - let new_body = ProxyRequestToBackend { - spa_request_body: request_body.spa_request_body, - }; - - // todo validate request headers - let new_header = ReverseHandler::create_forward_request_headers( - ctx, - RpHeaderRequestKey.placeholder_value(), - new_body.to_bytes().len(), - ); - - self.proxy_request_to_backend(ctx, new_header, new_body).await - } } \ No newline at end of file diff --git a/reverse-proxy/src/handler/proxy/handler.rs b/reverse-proxy/src/handler/proxy/handler.rs new file mode 100644 index 0000000..1467d06 --- /dev/null +++ b/reverse-proxy/src/handler/proxy/handler.rs @@ -0,0 +1,201 @@ +use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; +use reqwest::header::HeaderMap; +use pingora_router::handler::{APIHandlerResponse, DefaultHandlerTrait, ResponseBodyTrait}; +use log::{debug, error, info}; +use ntor::common::NTorParty; +use ntor::server::NTorServer; +use reqwest::Client; +use pingora::http::StatusCode; +use utils::bytes_to_json; +use crate::handler::common::consts::{HeaderKeys, BACKEND_HOST}; +use crate::handler::common::types::ErrorResponse; +use crate::handler::proxy::{EncryptedMessage, Layer8ResponseObject, Layer8RequestObject}; + +/// Struct containing only associated methods (no instance methods or fields) +pub struct ProxyHandler {} + +impl DefaultHandlerTrait for ProxyHandler {} + +impl ProxyHandler { + /// Validates the request headers for the nTor session ID. + pub(crate) fn validate_request_headers( + ctx: &mut Layer8Context + ) -> Result + { + return match ctx.get_request_header().get(HeaderKeys::NTorSessionIDKey.as_str()) { + None => { + Err(APIHandlerResponse { + status: StatusCode::BAD_REQUEST, + body: Some(ErrorResponse { + error: "Missing nTor session ID header".to_string(), + }.to_bytes()), + }) + } + Some(session_id) => { + if session_id.is_empty() { + return Err(APIHandlerResponse { + status: StatusCode::BAD_REQUEST, + body: Some(ErrorResponse { + error: "Empty nTor session ID header".to_string(), + }.to_bytes()), + }); + } + info!("nTor session ID: {}", session_id); + + // todo validate session_id format + + Ok(session_id.to_string()) + } + }; + } + + pub(crate) fn validate_request_body( + ctx: &mut Layer8Context + ) -> Result + { + match ProxyHandler::parse_request_body::< + EncryptedMessage, + ErrorResponse + >(&ctx.get_request_body()) { + Ok(res) => Ok(res), + Err(err) => { + let body = match err { + None => None, + Some(err_response) => { + error!("Error parsing request body: {}", err_response.error); + Some(err_response.to_bytes()) + } + }; + Err(APIHandlerResponse { + status: StatusCode::BAD_REQUEST, + body, + }) + } + } + } + + pub(crate) fn decrypt_request_body( + request_body: EncryptedMessage, + ntor_server_id: String, + shared_secret: Vec, + ) -> Result + { + let mut ntor_server = NTorServer::new(ntor_server_id); + ntor_server.set_shared_secret(shared_secret.clone()); + + // Decrypt the request body using nTor shared secret + let decrypted_data = ntor_server.decrypt(ntor::common::EncryptedMessage { + nonce: <[u8; 12]>::try_from(request_body.nonce).unwrap(), + data: request_body.data, + }).map_err(|err| { + return APIHandlerResponse { + status: StatusCode::BAD_REQUEST, + body: Some(format!("Decryption failed: {}", err).as_bytes().to_vec()), + }; + })?; + // let decrypted_data = request_body.data; + + // parse decrypted data into WrappedUserRequest + let wrapped_request: Layer8RequestObject = bytes_to_json(decrypted_data) + .map_err(|err| { + return APIHandlerResponse { + status: StatusCode::BAD_REQUEST, + body: Some(format!("Failed to parse request body: {}", err).as_bytes().to_vec()), + }; + })?; + + Ok(wrapped_request) + } + + pub(crate) async fn rebuild_user_request( + wrapped_request: Layer8RequestObject + ) -> Result + { + let header_map = utils::hashmap_to_headermap(&wrapped_request.headers) + .unwrap_or_else(|_| HeaderMap::new()); + debug!("[FORWARD {}] Reconstructed request headers: {:?}", wrapped_request.uri, header_map); + + let body = utils::bytes_to_string(&wrapped_request.body); + debug!("[FORWARD {}] Reconstructed request body: {}", wrapped_request.uri, body); + + let url = format!("{}{}", BACKEND_HOST, wrapped_request.uri); + debug!("[FORWARD {}] Request URL: {}", wrapped_request.uri, url); + + let client = Client::new(); + + let response = client.request( + wrapped_request.method.parse().unwrap(), + url.as_str(), + ) + .headers(header_map.clone()) + .body(body) + .send() + .await; + + return match response { + Ok(success_res) => { + let status = success_res.status().as_u16(); + let serialized_headers = utils::headermap_to_hashmap(&success_res.headers()); + let serialized_body: Vec = success_res.bytes().await.unwrap_or_default().to_vec(); + + debug!( + "[FORWARD {}] Response from backend headers: {:?}", + wrapped_request.uri, + serialized_headers + ); + debug!( + "[FORWARD {}] Response from backend body: {}", + wrapped_request.uri, + utils::bytes_to_string(&serialized_body) + ); + + Ok(Layer8ResponseObject { + status, + headers: serialized_headers, + body: serialized_body, + }) + } + Err(err) => { + error!("[FORWARD] Error while building request to BE: {:?}", err); + let status = err.status().unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR); + let err_body = ErrorResponse { + error: format!("Backend error: {}", status), + }; + + Err(APIHandlerResponse { + status: StatusCode::BAD_GATEWAY, + body: Some(err_body.to_bytes()), + }) + } + }; + } + + pub(crate) fn encrypt_response_body( + response_body: Layer8ResponseObject, + ntor_server_id: String, + shared_secret: Vec, + ) -> Result + { + let mut ntor_server = NTorServer::new(ntor_server_id); + ntor_server.set_shared_secret(shared_secret); + + let data = response_body.to_bytes(); + + // Encrypt the response body using nTor shared secret + let encrypted_data = ntor_server.encrypt(data).map_err(|err| { + return APIHandlerResponse { + status: StatusCode::INTERNAL_SERVER_ERROR, + body: Some(format!("Encryption failed: {}", err).as_bytes().to_vec()), + }; + })?; + // let encrypted_data = ntor::common::EncryptedMessage { + // nonce: [0; 12], // Placeholder, replace with actual nonce generation + // data, + // }; + + Ok(EncryptedMessage { + nonce: encrypted_data.nonce.to_vec(), + data: encrypted_data.data, + }) + } +} diff --git a/reverse-proxy/src/handler/proxy/mod.rs b/reverse-proxy/src/handler/proxy/mod.rs new file mode 100644 index 0000000..732d815 --- /dev/null +++ b/reverse-proxy/src/handler/proxy/mod.rs @@ -0,0 +1,32 @@ +pub(crate) mod handler; + +use std::collections::HashMap; +use pingora_router::handler::{RequestBodyTrait, ResponseBodyTrait}; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug)] +pub struct EncryptedMessage { + pub nonce: Vec, + pub data: Vec +} + +impl RequestBodyTrait for EncryptedMessage {} +impl ResponseBodyTrait for EncryptedMessage {} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Layer8RequestObject { + pub method: String, + pub uri: String, + pub headers: HashMap, + pub body: Vec +} +impl RequestBodyTrait for Layer8RequestObject {} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Layer8ResponseObject { + pub status: u16, + pub headers: HashMap, + pub body: Vec +} + +impl ResponseBodyTrait for Layer8ResponseObject {} \ No newline at end of file diff --git a/reverse-proxy/src/handler/types.rs b/reverse-proxy/src/handler/types.rs deleted file mode 100644 index b35c5fd..0000000 --- a/reverse-proxy/src/handler/types.rs +++ /dev/null @@ -1,68 +0,0 @@ -use std::fmt::Debug; -use serde::{Deserialize, Serialize}; -use pingora_router::handler::{RequestBodyTrait, ResponseBodyTrait}; -use serde_json::Error; - -#[derive(Serialize, Deserialize, Debug)] -pub struct InitEncryptedTunnelRequest { - pub int_request_body: String, -} - -impl RequestBodyTrait for InitEncryptedTunnelRequest {} - -#[derive(Serialize, Deserialize, Debug)] -pub struct InitTunnelRequestToBackend { - pub success: bool, -} - -impl RequestBodyTrait for InitTunnelRequestToBackend {} - -#[derive(Serialize, Deserialize, Debug)] -pub struct InitEncryptedTunnelResponse { - pub rp_response_body: String -} - -impl ResponseBodyTrait for InitEncryptedTunnelResponse {} - -#[derive(Serialize, Deserialize, Debug)] -pub struct ProxyRequest { - pub int_request_body: String, - pub spa_request_body: String -} - -impl RequestBodyTrait for ProxyRequest {} - -#[derive(Serialize, Deserialize, Debug)] -pub struct ProxyRequestToBackend { - pub spa_request_body: String -} - -impl RequestBodyTrait for ProxyRequestToBackend {} - -#[derive(Serialize, Deserialize, Debug)] -pub struct ProxyResponseFromBackend { - pub be_response_body: String -} - -impl ResponseBodyTrait for ProxyResponseFromBackend {} - -#[derive(Serialize, Deserialize, Debug)] -pub struct ProxyResponse { - pub be_response_body: String, - pub rp_response_body: String -} - -impl ResponseBodyTrait for ProxyResponse {} - -#[derive(Serialize, Deserialize, Debug)] -pub struct ErrorResponse { - pub error: String -} - -impl ResponseBodyTrait for ErrorResponse { - fn from_json_err(err: Error) -> Option { - Some(ErrorResponse { - error: err.to_string() - }) - } -} \ No newline at end of file diff --git a/reverse-proxy/src/handler/utils.rs b/reverse-proxy/src/handler/utils.rs deleted file mode 100644 index d8fad5f..0000000 --- a/reverse-proxy/src/handler/utils.rs +++ /dev/null @@ -1,14 +0,0 @@ -use std::collections::HashMap; -use reqwest::header::HeaderMap; - -pub fn to_reqwest_header(map: HashMap) -> HeaderMap { - let mut header_map = HeaderMap::new(); - for (k, v) in map { - if let Ok(header_name) = reqwest::header::HeaderName::try_from(k.as_str()) { - if let Ok(header_value) = reqwest::header::HeaderValue::from_str(&v) { - header_map.insert(header_name, header_value); - } - } - } - header_map -} \ No newline at end of file diff --git a/reverse-proxy/src/main.rs b/reverse-proxy/src/main.rs index 1febd0c..e2088dc 100644 --- a/reverse-proxy/src/main.rs +++ b/reverse-proxy/src/main.rs @@ -2,6 +2,7 @@ mod handler; mod proxy; mod tls_conf; +use std::env; use std::net::ToSocketAddrs; use crate::handler::ReverseHandler; @@ -15,6 +16,7 @@ use pingora_router::handler::APIHandler; use pingora_router::router::Router; use proxy::{BACKEND_PORT, ReverseProxy, UPSTREAM_IP}; use std::sync::Arc; +mod config; fn main() { // let file = OpenOptions::new() @@ -45,7 +47,8 @@ fn main() { // Load environment variables from .env file dotenv::dotenv().ok(); - env_logger::Builder::from_env(Env::default().write_style_or("RUST_LOG_STYLE", "always")) + env_logger::Builder::from_env(Env::default() + .write_style_or("RUST_LOG_STYLE", "always")) .format_file(true) .format_line_number(true) .target(Target::Stdout) @@ -54,8 +57,7 @@ fn main() { let mut my_server = Server::new(Some(Opt { conf: std::env::var("SERVER_CONF").ok(), ..Default::default() - })) - .unwrap(); + })).unwrap(); my_server.bootstrap(); @@ -65,7 +67,12 @@ fn main() { let handle_proxy: APIHandler> = Box::new(|h, ctx| async move { h.handle_proxy_request(ctx).await }.boxed()); - let rp_handler = Arc::new(ReverseHandler {}); + let config_path = env::var("CONFIG_PATH").unwrap_or_else(|_| "config.toml".to_string()); + let backbone_config = config::Config::from_file(&config_path); + backbone_config.validate(); + println!("{:?}", backbone_config); + + let rp_handler = Arc::new(ReverseHandler::new(backbone_config.handler)); let mut router: Router> = Router::new(rp_handler.clone()); router.post("/init-tunnel".to_string(), Box::new([handle_init_tunnel])); router.post("/proxy".to_string(), Box::new([handle_proxy])); diff --git a/reverse-proxy/src/proxy.rs b/reverse-proxy/src/proxy.rs index ae410ed..a8560bb 100644 --- a/reverse-proxy/src/proxy.rs +++ b/reverse-proxy/src/proxy.rs @@ -1,7 +1,7 @@ use pingora::prelude::{HttpPeer, ProxyHttp}; use pingora::proxy::Session; use pingora::http::{ResponseHeader, StatusCode}; -use log::{info}; +use log::{error, info}; use async_trait::async_trait; use bytes::Bytes; use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; @@ -77,61 +77,63 @@ impl ProxyHttp for ReverseProxy { Ok(peer) } + /// Handle request/response data by creating a new request to BE and respond to FP async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> pingora::Result where Self::CTX: Send + Sync, { - // // create Context - // ctx.update(session).await?; - // let request_summary = format!( - // "{} {}", - // session.req_header().method, - // session.req_header().uri.to_string() - // ); - // println!(); - // info!("[REQUEST {}] {:?}", request_summary, ctx.request); - // info!( - // "[REQUEST {}] Decoded body: {}", - // request_summary, - // String::from_utf8_lossy(&*ctx.get_request_body()) - // ); - // println!(); - - // let handler_response = self.router.call_handler(ctx).await; - - // let mut response_bytes = vec![]; - // if let Some(body_bytes) = handler_response.body { - // ctx.insert_response_header("Content-length", &body_bytes.len().to_string()); - // response_bytes = body_bytes; - // }; - // ReverseProxy::::set_headers(session, ctx, handler_response.status).await?; - - // info!( - // "[RESPONSE {}] Body: {}", - // request_summary, - // String::from_utf8_lossy(&*response_bytes) - // ); - // println!(); - - // let body = match response_bytes.is_empty() { - // true => None, - // false => Some(Bytes::from(response_bytes)), - // }; - - // session.write_response_body(body, true).await?; - - Ok(false) + // create Context + ctx.update(session).await?; + ctx.read_request_body(session).await?; + let request_summary = session.request_summary(); + println!(); + info!("[REQUEST {}] {:?}", request_summary, ctx.request); + info!("[REQUEST {}] Decoded body: {}", request_summary, String::from_utf8_lossy(&*ctx.get_request_body())); + println!(); + + let handler_response = self.router.call_handler(ctx).await; + if handler_response.status == StatusCode::NOT_FOUND && handler_response.body.is_none() { + let header = ResponseHeader::build(StatusCode::NOT_FOUND, None)?; + session.write_response_header_ref(&header).await?; + session.set_keepalive(None); + return Ok(true); + } + + let mut response_bytes = vec![]; + if let Some(body_bytes) = handler_response.body { + ctx.insert_response_header("Content-length", &body_bytes.len().to_string()); + response_bytes = body_bytes; + }; + ReverseProxy::::set_headers(session, ctx, handler_response.status).await?; + + info!("[RESPONSE {}] Body: {}", request_summary, String::from_utf8_lossy(&*response_bytes)); + println!(); + + // Write the response body to the session after setting headers + session.write_response_body(Some(Bytes::from(response_bytes)), true).await?; + + Ok(true) } async fn logging( &self, session: &mut Session, - _e: Option<&pingora::Error>, + e: Option<&pingora::Error>, ctx: &mut Self::CTX, ) { let response_code = session .response_written() .map_or(0, |resp| resp.status.as_u16()); + + if !e.is_none() { + // log error + error!( + "{} error: {}", + self.request_summary(session, ctx), + e.as_ref().unwrap() + ); + } + // access log info!( "{} response code: {response_code}", diff --git a/spa/backend/index.js b/spa/backend/index.js index 190820d..aed349f 100644 --- a/spa/backend/index.js +++ b/spa/backend/index.js @@ -9,7 +9,7 @@ const path = require("path"); const multer = require("multer"); const app = express(); -const port = 6191; +const port = 3000; const SECRET_KEY = "my_very_secret_key"; app.use(express.json()); @@ -118,6 +118,7 @@ app.post("/register", async (req, res) => { }); app.post("/login", async (req, res) => { + console.log("reached login endpoint"); const { username, password } = req.body; const user = users.find((u) => u.username === username); diff --git a/utils/Cargo.toml b/utils/Cargo.toml new file mode 100644 index 0000000..b8b799d --- /dev/null +++ b/utils/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "utils" +version = "0.1.0" +edition = "2024" + +[dependencies] +reqwest = { version = "0.11", features = ["json"] } +uuid = { version = "1.16.0", features = ["v4"] } +serde_json = "1.0.140" +serde = { version = "1.0.219", features = ["derive"] } +base64 = "0.21.7" +log = "0.4.27" diff --git a/utils/src/lib.rs b/utils/src/lib.rs new file mode 100644 index 0000000..bfeaf42 --- /dev/null +++ b/utils/src/lib.rs @@ -0,0 +1,123 @@ +use std::collections::HashMap; +use base64::Engine; +use base64::engine::general_purpose; +use uuid::Uuid; +use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; + +use serde::{Deserialize, Serialize}; +use log::error; + +pub fn to_reqwest_header(map: HashMap) -> HeaderMap { + let mut header_map = HeaderMap::new(); + for (k, v) in map { + if let Ok(header_name) = HeaderName::try_from(k.as_str()) { + if let Ok(header_value) = HeaderValue::from_str(&v) { + header_map.insert(header_name, header_value); + } + } + } + header_map +} + +pub fn new_uuid() -> String { + Uuid::new_v4().to_string() +} + +pub fn vec_to_json(vec: Vec) -> String { + serde_json::to_string(&vec).unwrap() +} + +pub fn json_to_vec(json: &str) -> Vec { + serde_json::from_str(json).unwrap() +} + +pub fn string_to_array32(s: String) -> Option<[u8; 32]> { + let bytes = s.into_bytes(); + if bytes.len() == 32 { + Some(bytes.try_into().unwrap()) + } else { + None + } +} + +pub fn bytes_to_json(bytes: Vec) -> Result +where + T: Serialize + for<'de> Deserialize<'de>, +{ + serde_json::from_slice::(&bytes) +} + +pub fn bytes_to_string(bytes: &Vec) -> String { + String::from_utf8_lossy(bytes).to_string() +} + +// String to HeaderMap +pub fn string_to_headermap(s: &str) -> Result> { + let pairs: Vec<(String, String)> = serde_json::from_str(s)?; + let mut headers = HeaderMap::new(); + for (k, v) in pairs { + let name = HeaderName::from_bytes(k.as_bytes())?; + let value = HeaderValue::from_str(&v)?; + headers.insert(name, value); + } + Ok(headers) +} + +// HeaderMap to String +pub fn headermap_to_string(headers: &HeaderMap) -> String { + let pairs: Vec<(String, String)> = headers.iter() + .map( + |(k, v)| ( + k.to_string(), + v.to_str().unwrap_or("").to_string() + ) + ) + .collect(); + serde_json::to_string(&pairs).unwrap() +} + +// http::header::value::HeaderValue to serde_json::Value +fn headervalue_to_json(val: &HeaderValue) -> serde_json::Value { + match val.to_str() { + Ok(s) => serde_json::Value::String(s.to_string()), + Err(_) => serde_json::Value::String(general_purpose::STANDARD.encode(val.as_bytes())), + } +} + +// serde_json::Value to http::header::value::HeaderValue +fn json_to_headervalue( + val: &serde_json::Value +) -> Result +{ + match val { + serde_json::Value::String(s) => HeaderValue::from_str(s), + _ => HeaderValue::from_str(&val.to_string()), + } +} + +pub fn hashmap_to_headermap( + map: &HashMap +) -> Result> +{ + let mut headers = HeaderMap::new(); + for (k, v) in map { + let name = HeaderName::from_bytes(k.as_bytes())?; + let value = json_to_headervalue(v) + .map_err(|e| { + error!("Invalid header value for '{}': {}", k, e); + }) + .unwrap_or_else(|_| HeaderValue::from_str("").unwrap()); + headers.insert(name, value); + } + Ok(headers) +} + +pub fn headermap_to_hashmap(headers: &HeaderMap) -> HashMap { + let mut map = HashMap::new(); + for (k, v) in headers.iter() { + let key = k.as_str().to_string(); + let value = headervalue_to_json(v); + map.insert(key, value); + } + map +} \ No newline at end of file