Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions forward-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ bytes = "1.10.1"
clap = { version = "3.2.25", features = ["derive"] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
log = "0.4.27"
chrono = "0.4.40"
reqwest = { version="0.11", default-features=false, features=["json", "rustls-tls"] }
tokio = { version = "1.44.2", features = ["rt-multi-thread", "macros"] }
Expand All @@ -21,7 +20,7 @@ pingora-router = { path = "../pingora-router", version = "0.1.0" }
once_cell = "1.21.3"
pingora-error = "0.5.0"
boring = "4.17.0"
env_logger = "0.11.8"
utils = { path = "../utils", version = "0.1.0" }
hex = "0.4.3"
envy = "0.4.2"
tracing = "0.1.41"
24 changes: 1 addition & 23 deletions forward-proxy/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct FPConfig {

#[derive(Debug, Deserialize)]
pub struct LogConfig {
#[allow(dead_code)]
pub log_path: String,
pub log_level: String,
}
Expand All @@ -37,27 +38,4 @@ pub struct TlsConfig {
pub ca_cert: String,
pub cert: String,
pub key: String,
}

impl LogConfig {
pub fn to_level_filter(&self) -> log::LevelFilter {
match self.log_level.to_uppercase().as_str() {
"INFO" => log::LevelFilter::Info,
"DEBUG" => log::LevelFilter::Debug,
"WARNING" => log::LevelFilter::Warn,
"ERROR" => log::LevelFilter::Error,
"TRACE" => log::LevelFilter::Trace,
"OFF" => log::LevelFilter::Off,
_ => log::max_level()
}
}
}

impl TlsConfig {
pub fn load(&mut self) -> Result<(), String> {
// todo validate certs?
// this method was created to load certificates from files but now certs are directly in config.
// so it does nothing for now, but kept for future use to validate certs if needed
Ok(())
}
}
11 changes: 11 additions & 0 deletions forward-proxy/src/handler/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,15 @@ impl CtxKeys {
CtxKeys::FpRpJwt => "fp_rp_jwt".to_string(),
}
}
}

pub struct LogTypes {}

impl LogTypes {
pub const ACCESS_LOG: &'static str = "ACCESS_LOG";
pub const ACCESS_LOG_RESULT: &'static str = "ACCESS_LOG_RESULT";
pub const UPSTREAM_CONNECT: &'static str = "UPSTREAM_CONNECT";
pub const HANDLE_CLIENT_REQUEST: &'static str = "HANDLE_CLIENT_REQUEST";
pub const HANDLE_UPSTREAM_RESPONSE: &'static str = "HANDLE_UPSTREAM_RESPONSE";
pub const HEALTHCHECK: &'static str = "HEALTHCHECK";
}
60 changes: 42 additions & 18 deletions forward-proxy/src/handler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use log::{error, info};
use pingora::http::StatusCode;
use reqwest::Client;
use pingora_router::ctx::{Layer8Context, Layer8ContextTrait};
use pingora_router::handler::{APIHandlerResponse, DefaultHandlerTrait, RequestBodyTrait};
use crate::handler::types::response::{ErrorResponse, FpHealthcheckError, FpHealthcheckSuccess, InitTunnelResponseFromRP, InitTunnelResponseToINT};
use pingora_router::handler::ResponseBodyTrait;
use serde::Deserialize;
use tracing::{debug, error, info};
use crate::handler::types::request::InitTunnelRequest;
use utils;
use utils::jwt::JWTClaims;
use crate::config::HandlerConfig;
use crate::handler::consts::LogTypes;
Comment thread
dtpthao marked this conversation as resolved.

pub mod types;
pub mod consts;
Expand Down Expand Up @@ -51,20 +52,20 @@ impl ForwardHandler {
{
let client = Client::new();

let res = client.get(
//todo
// the input backend_url is originally from interceptor request,
// the interceptor only accepts URLs with http(s) scheme.
// But the authentication server expects a URL without scheme
format!(
"{}{}",
self.config.auth_get_certificate_url,
backend_url.replace("http://", "").replace("https://", "")
)
)
//todo
// the input backend_url is originally from interceptor request,
// the interceptor only accepts URLs with http(s) scheme.
// But the authentication server expects a URL without scheme
let request_path = format!(
"{}{}",
self.config.auth_get_certificate_url,
backend_url.replace("http://", "").replace("https://", "")
);
let res = client.get(&request_path)
.header("Authorization", format!("Bearer {}", self.config.auth_access_token))
.send()
.await
// unable to connect
.map_err(|e| {
let response_body = ErrorResponse {
error: format!("Failed to connect to layer8: {}", e)
Expand All @@ -76,11 +77,17 @@ impl ForwardHandler {
}
})?;

// connected but request failed
if !res.status().is_success() {
let response_body = ErrorResponse {
error: format!("Failed to get public key from layer8, status code: {}", res.status().as_u16()),
};
error!("Sending error response: {:?}", response_body);
error!(
log_type=LogTypes::HANDLE_CLIENT_REQUEST,
"Failed to get ntor certificate for {}: {:?}",
request_path,
response_body
Comment thread
dtpthao marked this conversation as resolved.
Outdated
);

ctx.insert_response_header("Connection", "close"); // Ensure connection closes???

Expand All @@ -95,7 +102,11 @@ impl ForwardHandler {
}

let cert: AuthServerResponse = res.json().await.map_err(|err| {
error!("Failed to parse authentication server response: {:?}", err);
error!(
log_type=LogTypes::HANDLE_CLIENT_REQUEST,
"Failed to parse authentication server response: {:?}",
err
);
APIHandlerResponse {
status: StatusCode::INTERNAL_SERVER_ERROR,
body: None,
Expand All @@ -104,14 +115,23 @@ impl ForwardHandler {

let pub_key = utils::cert::extract_x509_pem(cert.x509_certificate.clone())
.map_err(|e| {
error!("Failed to parse x509 certificate: {:?}", e);
error!(
log_type=LogTypes::HANDLE_CLIENT_REQUEST,
"Failed to parse x509 certificate: {:?}",
e
);
APIHandlerResponse {
status: StatusCode::INTERNAL_SERVER_ERROR,
body: None,
}
})?;

info!("AuthenticationServer response: {:?}", cert);
debug!("AuthenticationServer response: {:?}", cert);
info!(
log_type=LogTypes::HANDLE_CLIENT_REQUEST,
"Obtained ntor credentials for backend_url: {}",
backend_url
);

Ok(NTorServerCertificate {
server_id: backend_url, // todo I still prefer taking the server_id value from certificate's subject
Expand Down Expand Up @@ -175,7 +195,7 @@ impl ForwardHandler {
Ok(cert) => cert,
Err(err) => return err
};
info!("Server certificate: {:?}", server_certificate);
debug!("Server certificate: {:?}", server_certificate);

ctx.set(
consts::CtxKeys::NTorServerId.to_string(),
Expand Down Expand Up @@ -203,7 +223,11 @@ impl ForwardHandler {

return match utils::bytes_to_json::<InitTunnelResponseFromRP>(response_body) {
Err(e) => {
error!("Error parsing RP response: {:?}", e);
error!(
log_type=LogTypes::HANDLE_UPSTREAM_RESPONSE,
"Error parsing RP response: {:?}",
e
);
APIHandlerResponse {
status: StatusCode::INTERNAL_SERVER_ERROR,
body: None,
Expand Down
41 changes: 11 additions & 30 deletions forward-proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,32 @@ mod proxy;
mod handler;
mod config;

use std::fs::OpenOptions;
use crate::handler::ForwardHandler;
use env_logger::{Env, Target};
use log::{debug, info};
use proxy::ForwardProxy;
use pingora::prelude::*;
use crate::config::FPConfig;
use tracing::{info, debug};

fn load_config() -> FPConfig {
// Load environment variables from .env file
dotenv::dotenv().ok();

// Deserialize from env vars
let mut config: FPConfig = envy::from_env().expect("Failed to load config");
let config: FPConfig = envy::from_env().expect("Failed to load config");

config.tls_config.load().expect("Failed to load TLS configuration");

let target = match config.log_config.log_path.as_str() {
"console" => Target::Stdout,
path => {
let file = OpenOptions::new()
.append(true)
.create(true)
.open(path)
.expect("Can't create log file!");

Target::Pipe(Box::new(file))
}
};

env_logger::Builder::from_env(Env::default()
.write_style_or("RUST_LOG_STYLE", "always"))
.format_file(true)
.format_line_number(true)
.filter(None, config.log_config.to_level_filter())
.target(target)
.init();
utils::log::init_logger(
"ForwardProxy",
config.log_config.log_level.clone(),
config.log_config.log_path.clone(),
);

debug!("Loaded ForwardProxyConfig: {:?}", config);
debug!(name: "FPConfig", value = ?config);
config
}

fn main() {
let config = load_config();

info!("Starting server...");

let mut server = Server::new(Some(Opt {
conf: std::env::var("SERVER_CONF").ok(),
..Default::default()
Expand All @@ -59,12 +38,14 @@ fn main() {

let mut proxy = http_proxy_service(
&server.configuration,
ForwardProxy::new(config.tls_config, fp_handler)
ForwardProxy::new(config.tls_config, fp_handler),
);

proxy.add_tcp(&format!("{}:{}", config.listen_address, config.listen_port));

server.add_service(proxy);

info!("Starting server at {}:{}", config.listen_address, config.listen_port);

server.run_forever();
}
Loading
Loading