diff --git a/forward-proxy/.env.dev b/forward-proxy/.env.dev index 31b3166..59fb20c 100644 --- a/forward-proxy/.env.dev +++ b/forward-proxy/.env.dev @@ -14,7 +14,7 @@ AUTH_ACCESS_TOKEN=eyJhbGciOiJIUzI1NiJ9.eyJleHAiOjE5MTYyMzkwMjIsInVzZXJfaWQiOiIxM AUTH_GET_CERTIFICATE_URL=http://localhost:5001/sp-pub-key?backend_url= # mTLS configurations -ENABLE_TLS=true +ENABLE_TLS=false CA_CERT="-----BEGIN CERTIFICATE-----\nMIIF0zCCA7ugAwIBAgIUQbJs4Jhd2NiSeL0Y+SZ4z7W6l7swDQYJKoZIhvcNAQEL\nBQAwcTELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRUwEwYDVQQHDAxTYW5GcmFu\nY2lzY28xGDAWBgNVBAoMD0dsb2JlQW5kQ2l0aXplbjEPMA0GA1UECwwGTGF5ZXI4\nMRMwEQYDVQQDDAptVExTUm9vdENBMB4XDTI1MDkyNDE5MDQ1OFoXDTM1MDkyMjE5\nMDQ1OFowcTELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRUwEwYDVQQHDAxTYW5G\ncmFuY2lzY28xGDAWBgNVBAoMD0dsb2JlQW5kQ2l0aXplbjEPMA0GA1UECwwGTGF5\nZXI4MRMwEQYDVQQDDAptVExTUm9vdENBMIICIjANBgkqhkiG9w0BAQEFAAOCAg8A\nMIICCgKCAgEAmQsqgvtgfiQvhaziGzWMAZRzCViScRCrwkvmeE2U6Q/etk1PacuH\ndkJAvDQgLBsXCThg6G0hjw6BHX/i5seBQ6XHBNXEhAR70X3Y9VDCOoOLbqI22XAH\nZrX5gswBUzXbxxbxPrc7lK8N8s2M/LS4ADo9QWu4gtDnfr2eEWQR1P7o8llIn3z+\nGJStvzD0Jlfa39Gv9sIjPsoe7hvSpDiYqwGYDpeBLxn2SU39tyyyxDOH5NrpHG0z\nYMh20kK/0qkiVknd7/oqbSIv1EKfLLnw7p09LmsaHhlR+w+kQt6Z2Dzvae1qMj/D\nWnf1fgBOThZdnfUDznFGLjhPpGIvyjB8zk16wgYW7OIslqqbPPSLWm3ls3qpU0pl\nn2ROOWSLufXobh0Ly8vLLGVN8mwYeaTyXK4ELC0ScrpzBJbQ0mN3xulCMulk54iZ\nLy8KA1DtiwWyyV1cCvb36kfEPavU9zCYUgjvpVp3+fato0DBqe4ukMBUlb5C9qQ0\nKEkz/z75yWAe6bSVqXnYVuIihoxumM6oRMJwfOeOU0yyIfIkhwmDxw+yqeUDVibw\nseq5Ul4qVulocSAYPUX4ym/rPqC0SI+Z3EJZSLbiwjh1jmMiLbrsguYROSIOFrVO\noyYmjCUnc5Dxs04Q8VK80NuDFxCVEaK19esxFzCcFYDTiXoocem6EccCAwEAAaNj\nMGEwHQYDVR0OBBYEFJOQTjwIZb078bHcHzxNnkUiCKW1MB8GA1UdIwQYMBaAFJOQ\nTjwIZb078bHcHzxNnkUiCKW1MA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQD\nAgGGMA0GCSqGSIb3DQEBCwUAA4ICAQAyCYD4IShYuU66OQAPz0/P4rVUr+mfKX0s\nYhnWRY+hLm4M0zp51/H9XGnHUZwGf3No3I0vWFDZHZShKt/GemCZy/WOJzjDGH71\ncNraMm/Bn2c7RXEqzK0qaKdgVo4vrS9inGExGtTRywEc+fLgH+myVR+Dy6JfxpeW\nFarMpu5AyoAWYRwUVaEQXotBF/XZHetaYr1mk9p3qNQkTEnv8zOWFF5Sv5R0I3XM\nABqk5Lz2WyRJIIL0g/9TdWZF88KrQhnr1q9JnTgAg3aTvyTR8c+Oik+BlRUe1zw1\n7TUvuVJkR50uEKZ0IVdWGtNCEzPl1PPMxzUAomozhZRh7vhuBk+yThaDByinaSvC\nxVzufk3hjrnPA0+ZyD4nVcMs3oivl5Vxli/KJrdYiQoErkbTmEgDCzSm0g/14BvJ\nHmlpITgolQGA1SFQCJcrioYn1Eyv4hSrnQtwP+t3t/G4OHbbEeu2pgqvZKEMngNL\naUGla0fwJtytoAqspWxuFwYxJWl5cse1xpsbrf5aiDphqCfSZARr8jzeYtYF8sex\n/8yzy1YmgDlSLng0kFqjLgP8SHLEgKMCOCP/haAHBaDqIu/9FMPofqlG0YJuqTB3\ndf0zohxxcIDZn17qElOVCRoUQVoz2rbYKHHZ8/MVWM7lPmUIqt2G6HlcMo1xYUa4\nfgCQMqEubg==\n-----END CERTIFICATE-----" CERT="-----BEGIN CERTIFICATE-----\nMIIFtTCCA52gAwIBAgIUXKLNUFsfUc3xSpmCKj35dAP3oe8wDQYJKoZIhvcNAQEL\nBQAwcTELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRUwEwYDVQQHDAxTYW5GcmFu\nY2lzY28xGDAWBgNVBAoMD0dsb2JlQW5kQ2l0aXplbjEPMA0GA1UECwwGTGF5ZXI4\nMRMwEQYDVQQDDAptVExTUm9vdENBMB4XDTI1MDkyNDE5MTAyNVoXDTM1MDkyMjE5\nMTAyNVowdDELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRUwEwYDVQQHDAxTYW5G\ncmFuY2lzY28xGDAWBgNVBAoMD0dsb2JlQW5kQ2l0aXplbjEPMA0GA1UECwwGTGF5\nZXI4MRYwFAYDVQQDDA1mb3J3YXJkLXByb3h5MIICIjANBgkqhkiG9w0BAQEFAAOC\nAg8AMIICCgKCAgEAq4JLgH1rmbSe2/6VFydzdxGsq0oDE9cKOagWxdd9lFqen+LL\nB+vDFV6d0n+CHDkhLsgi5ASGhSl5o232FI/Tgf4YqHQ0C3touGs2ysuI51WT2hzV\nPOFHemgveHNKk+yVHabZp5ypm4kYgMegknhFAtKytAC4g6IcHUX+t9gz+ucA0D8C\n+kuYJiaVd2IML8rF4RCGruZ6WuOcLeHF7byfucT/r9tT3IhdbHtLWOOw6mmtRuAl\nUexiXN+ipxmaAgSyAZrR9CwOo2xp9p5XL2Mn3VKHa6AGZya8QobnN16Dm+JOCoIl\n/C8b7w2H9mC6/GA2kmdxuD0+MyWFnK9DrbT+VYBgubXHeyKAR9mddn0ur7WNBeMl\ntNKufyS9J0Od3S8dERjw05Fl7jGJJj82B1zR/bKbq3v3IIteeTHm6sea2fV0vIw1\n2MBUF5ax60Jdeo71vHf4CVg01F3LBb9ar1LRVNgvrqThach6CDbvHPICXEFHV9OP\nil8PHIgUwon5iySfzfjPxIF0EnkGUR7B5iFYlcG1vSvbSW30sGQ67a9RWBfIll1m\nON1PhLZ5rnslgj6KxNReheaLSe/u4f17QxMrwLw/W3vwOm2SymUcMpwCAaLYtxd+\n6W6zaKy9g0wXeUaEbKBzaOdLqixJ4mL6NVYjEqlXJ5BuQ25iauafpRJz/vcCAwEA\nAaNCMEAwHQYDVR0OBBYEFHrJpAogeNfUj4GhwcrHVYMzXhMeMB8GA1UdIwQYMBaA\nFJOQTjwIZb078bHcHzxNnkUiCKW1MA0GCSqGSIb3DQEBCwUAA4ICAQBdQuPBEGma\nU4puro0xFlpIeOVOUf7eVMHeJuBCiDmBr8C+kae9XuShRYP/OMgGxQ7Y95BJo/C7\nTMwF5ugDQrsRod4EaTEO8ZXGcZq91v4YfegAJ9gQpR/OdniO5dDxfPMlY/VZEyK9\nwHIaUVUP+fiWk9yDwuTPZz+0xF3x2oujBO0JZTsHAbGaZ0CtNW1M5JwFF6aVD5Fs\nPave34wfDEbUg/7WKnZy6pNoxlkgdEsS1QThQ817pE9tTpGJS+9r+JjTSg9AL/W9\nQTvBB96WUsrRqJ2V+XXZ6FjDzPmWC4cfABh24S1NIdygB6uUJ6WLg8HN9tIjH+6O\noDvxCBjUE1444O6c9Z6cdxBvdICrIRN9wc1WhspBqRf4nXtRidFyrV2KDW9TMQXZ\nMOGXrhqgC1lwtKTc2iT8x74qQe8ARSArKNeTjJEiGwbc7+9wG8lf0b0zKWxwlTmg\nmAy8jVEkHmLvkZqKmaW59wUxkaAiBO2TNwjiwDkwPY8ET6DyGd/x4oG6QdocKF87\naOetHnMjTSpUj01sKFpveyTqoAF0UNilmJsUE2AO0BHsxj7Pe+ysLx1yUPiuKJX2\nUMHK/obbGgwu/5BGDXoafQlYu77ZyL/4oqnGYk/s7IGJaw/yVOrmGuuaF0h8RJfV\nOYxd3wTVC8r46A49VExTqvuYdwaAiovdaw==\n-----END CERTIFICATE-----" KEY="-----BEGIN PRIVATE KEY-----\nMIIJRAIBADANBgkqhkiG9w0BAQEFAASCCS4wggkqAgEAAoICAQCrgkuAfWuZtJ7b\n/pUXJ3N3EayrSgMT1wo5qBbF132UWp6f4ssH68MVXp3Sf4IcOSEuyCLkBIaFKXmj\nbfYUj9OB/hiodDQLe2i4azbKy4jnVZPaHNU84Ud6aC94c0qT7JUdptmnnKmbiRiA\nx6CSeEUC0rK0ALiDohwdRf632DP65wDQPwL6S5gmJpV3YgwvysXhEIau5npa45wt\n4cXtvJ+5xP+v21PciF1se0tY47Dqaa1G4CVR7GJc36KnGZoCBLIBmtH0LA6jbGn2\nnlcvYyfdUodroAZnJrxChuc3XoOb4k4KgiX8LxvvDYf2YLr8YDaSZ3G4PT4zJYWc\nr0OttP5VgGC5tcd7IoBH2Z12fS6vtY0F4yW00q5/JL0nQ53dLx0RGPDTkWXuMYkm\nPzYHXNH9spure/cgi155Mebqx5rZ9XS8jDXYwFQXlrHrQl16jvW8d/gJWDTUXcsF\nv1qvUtFU2C+upOFpyHoINu8c8gJcQUdX04+KXw8ciBTCifmLJJ/N+M/EgXQSeQZR\nHsHmIViVwbW9K9tJbfSwZDrtr1FYF8iWXWY43U+EtnmueyWCPorE1F6F5otJ7+7h\n/XtDEyvAvD9be/A6bZLKZRwynAIBoti3F37pbrNorL2DTBd5RoRsoHNo50uqLEni\nYvo1ViMSqVcnkG5DbmJq5p+lEnP+9wIDAQABAoICACmQw1GZk9lFf/abJXDeG8qw\nmuNMZaCKTi0ZAqPiDMpGiAkBwujhh38HVkJsqpDCe7tFv8b5Hczp91PXU3s6PC1V\n8o3o07AwsXl4amgNmdlO0S1cLYW6p0MQOuj7MAjXnm/4PumzOxu5xxl2yACXa0o6\n3Bppzk4AnMWvcAMIP9i/4V+W1dbpOS+NzE2JkqCGiRx5j9qVevPKE9C+1eQ/AYrZ\nJoptIk7hMZsX6nPZgsfc4qS5r/HB0zjk7huHRd7VWnqvFdESWF3c6XVefIy8gC3Q\nUYeQ2dxn89o/rYuquUSvPPCpCCGtHRz7b4cTfF2rx64FqfbXyNpGbrJBe6p+oeCb\nRnB0vnEmiEIvw4C0P0rGz9nOS69TaphTfRFVmDOFgwYDNzZz3DD/YdsQY1XMQ6pH\n+m4Nr7+EPjFk6AYSgvbmRLL8WGGI6Tv9/ezVvdflPClYLMjOGIyVPRfOldgad01T\nlI/vWDHITWJZqHwh4VBZvQaUeoRPcuabMeBIV3d7FBcsQqOwRbpbL/Piu8ydZJ8Y\n+5gUYfh/F7ouJGyuFm77R2uEhSjpbJMxu1Mku2SqG2gAime5YJBRxExUAXky9QyO\nXic4XxX67iFA4P3ATkFA5kQF3m+1uuwfYC3gIs+aiZuAuQuZ4OO5Ay3TxFCMeuTk\nfsS+COfEhsdXUG7Wn2aZAoIBAQDYClEGkfeftTvyXYbISRGeG/Nni3UUS4aNSIsv\nRNlAKnD+n60VwiBpOTr6IEP1MB1eseQvvNVqiemb5Oa8jaDV8Gkon0uBHTBQo62K\nFIRMsM8jsMGvO0RRtd/2iWRu1WnzqzA5TXX2sN/whjxeT0DUrnGKsqUI6PT8STjj\nQo/IHT7uulJPnbVZ+G+CZyQKjaBaOOy6VMYsDXhMm71zNQkM2AgzOYSLwpqtSAHM\nwHWHGkZGvetsVL6hHcLcWVxkuo7saeMcLMPfYBLkV8oX/HayZ0oc+x2RYslQwokq\ng7oFEsvQjb166MXD5gSaOjAkjb9FshBsxU5XKC3gA1xqbOo1AoIBAQDLO2Izu1Nu\nhsJ4UMWgfeYSf9/i8T5CXGYs9VytYNCFpBc530RQ4olu+SY9vrUt3+OW3xeDtWX/\nMm/pq0DxpjkvD5y5fYnsBI2Cd3o3L6WKEsqKnBOJbS5omrcwWSOO18z+8+BF9O55\n+RTR8dWHw+XxvajFTqzcLojtvExyvRwKI3WVaNtZWgU7YmsVxoQ93/PIYIKgdbhI\nv+tbc8DuxnbDA+qJhFipuTI07fclE/hupmgmd2sDiFjiyPbk85dXpLsxnAhlE9ZT\n2YSBhVQz0a68CRiVUcoWv7FsMKlgJtfICmzDH3qrB00Dp7FhXinlubrgMWy+kw5v\n6k/h2WqZhIn7AoIBAQDMOPRfQZzTXH9OnRrMOkZtL/7n4uzKQpru86SfCnZUMcqe\n6FK2Psxkq7UUvWuAW/tniMIsXlVgYP50X+2+UCO6GYlO3UaCxxTlJdTmsn5eAMXO\n90ggXeY3V7ZfV4GZRCTkMu9jO9ZHXOxUcpCelkyywDSU6EsaIR11X8JnEoTYpszW\n30rv+CV252KB4v4u+7KZlzYw7fJnslQGFzL/tSLZAV6/DaA+fbe6FledNlHjZPMJ\n7H6f6XxK0ddidRbiIXj6Ax6tg6OlhSxWrqZcBkwuWXW176wDw16K+Vqw1dUC9sG0\nZEi551EL3mR7ZoYcB+LH/4uHRvzHZzP2jzbNZCgBAoIBAQCEuVN41W23UOrQCHAI\nUDBhBIICg+pVDGLuGY9c601C+dbxRI4pBMkcYDpJOLK6Mu0/KpMAwQbLkvTjdaQE\nLLpLsbZ4rTPVn2OLQNvgDo3djkgYHbXkmhkk12WrfYtrTiPinQJqrXrQzYp7UaRR\n9e3F4kbGFItvgDSMjdyfUkFtnZq86K3XvKKOFcg5gFv8zLU4t06X3EltuWjLYN0v\nEw2cboJNLNF6hifzyTUOUex81tBNzs9kjzb9ZKFZBHxiEILv8ybIXBwsxnFy5NAI\nx3eF9arIWZHRKX+FWIJE+RkS2zwMchJ6f1ocePeuzwAttw4EPEL4crGLBUsGBCdJ\n+vThAoIBAQC1KrUB170d08W6WMdK8sbzL67M5pj8B44yp6650nNAVJhXMRuw846X\nH1KqUzV1elQdmvKJDogkUYllRVByHGX05Lr5BGAkmDxBMqYxUn56IhEfQ18ejy6x\nzUFwg4hLLWZzNqkIwGNb5B9wFzHFy39vwIlsy1mMIswxjucsH/lENRaPF2Lr5Tia\n1VRDJqXSZ2ytGyKsgBhFXsZUXmnxBNmjFuI62ROhOlrYJ3bwfTRDwkqQQWxPDj7f\nfTK7R5D69flh4CBEM6oSIZm7KEMXW/C5AkYc/jjW3/bDNuVy7JpZ/aokTxHCqW8e\n7zyoZqd4bAj7xvPuj8lMdThXtKSz86hK\n-----END PRIVATE KEY-----" diff --git a/forward-proxy/Cargo.toml b/forward-proxy/Cargo.toml index ec1ced1..7c4b52a 100644 --- a/forward-proxy/Cargo.toml +++ b/forward-proxy/Cargo.toml @@ -4,24 +4,30 @@ version = "0.3.5" edition = "2024" [dependencies] +pingora-router = { path = "../pingora-router", version = "0.1.0" } async-trait = "0.1" bytes = "1.10.1" -clap = { version = "3.2.25", features = ["derive"] } serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" -log = "0.4.27" chrono = "0.4.40" -reqwest = { version="0.11", default-features=false, features=["json", "rustls-tls"] } -tokio = { version = "1.44.2", features = ["rt-multi-thread", "macros"] } +reqwest = { version = "0.11", default-features = false, features = [ + "json", + "rustls-tls", +] } pingora = { version = "0.5.0", features = ["lb", "boringssl"] } -jsonwebtoken = "9.3.1" dotenv = "0.15.0" -futures = "0.3.31" -pingora-router = { path = "../pingora-router", version = "0.1.0" } -once_cell = "1.21.3" pingora-error = "0.5.0" boring = "4.17.0" -env_logger = "0.11.8" utils = { path = "../utils", version = "0.1.0" } hex = "0.4.3" envy = "0.4.2" +hotpath = { git = "https://github.com/pawurb/hotpath.git", branch = "develop", optional = true } +tracing = "0.1" +tokio = { version = "1.48.0", features = ["full"] } + +[features] +hotpath = ["dep:hotpath", "hotpath/hotpath"] +hotpath-alloc-bytes-total = ["hotpath/hotpath-alloc-bytes-total"] +hotpath-alloc-count-total = ["hotpath/hotpath-alloc-count-total"] +hotpath-alloc-self = ["hotpath/hotpath-alloc-self"] +hotpath-off = ["hotpath/hotpath-off"] diff --git a/forward-proxy/benchmarks/base_impl.txt b/forward-proxy/benchmarks/base_impl.txt new file mode 100644 index 0000000..d74f884 --- /dev/null +++ b/forward-proxy/benchmarks/base_impl.txt @@ -0,0 +1,27 @@ +[hotpath] timing - Execution duration of functions. +guard_timeout::main: 60.00s ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| Function | Calls | Avg | P50 | P95 | P99 | Total | % Total | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| guard_timeout::main | 1 | 60.00s | 10.01s | 10.01s | 10.01s | 60.00s | 100.00% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| handler::get_public_key | 5 | 13.61ms | 8.72ms | 36.96ms | 36.96ms | 68.07ms | 0.11% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| handler::handle_init_tunnel_request | 5 | 13.65ms | 8.73ms | 37.03ms | 37.03ms | 68.26ms | 0.11% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| proxy::response_body_filter | 162 | 46.88µs | 15.46µs | 172.03µs | 657.41µs | 7.59ms | 0.01% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| proxy::response_filter | 26 | 123.00ns | 83.00ns | 167.00ns | 1.33µs | 3.21µs | 0.00% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| proxy::upstream_request_filter | 26 | 89.00ns | 83.00ns | 167.00ns | 459.00ns | 2.33µs | 0.00% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| handler::handle_init_tunnel_response | 5 | 197.25µs | 98.75µs | 653.82µs | 653.82µs | 986.25µs | 0.00% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| proxy::request_filter | 41 | 702.00ns | 291.00ns | 1.08µs | 12.38µs | 28.79µs | 0.00% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| proxy::upstream_peer | 30 | 301.00ns | 250.00ns | 875.00ns | 1.00µs | 9.04µs | 0.00% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| handler::verify_int_fp_jwt | 43 | 45.69µs | 23.76µs | 83.90µs | 680.45µs | 1.96ms | 0.00% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| proxy::request_body_filter | 56 | 214.00ns | 42.00ns | 625.00ns | 4.92µs | 12.00µs | 0.00% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ \ No newline at end of file diff --git a/forward-proxy/benchmarks/reduce_cloning_in_pringora_router.txt b/forward-proxy/benchmarks/reduce_cloning_in_pringora_router.txt new file mode 100644 index 0000000..7d2b550 --- /dev/null +++ b/forward-proxy/benchmarks/reduce_cloning_in_pringora_router.txt @@ -0,0 +1,27 @@ +[hotpath] timing - Execution duration of functions. +guard_timeout::main: 60.01s ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| Function | Calls | Avg | P50 | P95 | P99 | Total | % Total | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| guard_timeout::main | 1 | 60.01s | 10.01s | 10.01s | 10.01s | 60.01s | 100.00% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| handler::get_public_key | 5 | 13.71ms | 8.78ms | 33.01ms | 33.01ms | 68.54ms | 0.11% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| handler::handle_init_tunnel_request | 5 | 13.76ms | 8.83ms | 33.05ms | 33.05ms | 68.78ms | 0.11% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| handler::verify_int_fp_jwt | 46 | 33.89µs | 17.26µs | 85.89µs | 293.38µs | 1.56ms | 0.00% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| handler::handle_init_tunnel_response | 5 | 42.98µs | 30.85µs | 96.77µs | 96.77µs | 214.92µs | 0.00% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| proxy::request_filter | 40 | 490.00ns | 208.00ns | 1.92µs | 2.96µs | 19.62µs | 0.00% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| proxy::upstream_request_filter | 28 | 58.00ns | 42.00ns | 84.00ns | 84.00ns | 1.63µs | 0.00% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| proxy::upstream_peer | 28 | 297.00ns | 208.00ns | 750.00ns | 1.25µs | 8.33µs | 0.00% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| proxy::response_body_filter | 72 | 25.17µs | 8.84µs | 51.94µs | 768.51µs | 1.81ms | 0.00% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| proxy::request_body_filter | 58 | 116.00ns | 42.00ns | 458.00ns | 500.00ns | 6.75µs | 0.00% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ +| proxy::response_filter | 28 | 77.00ns | 42.00ns | 208.00ns | 250.00ns | 2.17µs | 0.00% | ++--------------------------------------+-------+----------+----------+----------+----------+----------+---------+ \ No newline at end of file diff --git a/forward-proxy/src/config.rs b/forward-proxy/src/config.rs index 92e7880..e5da437 100644 --- a/forward-proxy/src/config.rs +++ b/forward-proxy/src/config.rs @@ -39,19 +39,19 @@ pub struct TlsConfig { pub key: String, } -impl LogConfig { - pub fn to_level_filter(&self) -> log::LevelFilter { - match self.log_level.to_uppercase().as_str() { - "INFO" => log::LevelFilter::Info, - "DEBUG" => log::LevelFilter::Debug, - "WARNING" => log::LevelFilter::Warn, - "ERROR" => log::LevelFilter::Error, - "TRACE" => log::LevelFilter::Trace, - "OFF" => log::LevelFilter::Off, - _ => log::max_level() - } - } -} +// impl LogConfig { +// pub fn to_level_filter(&self) -> log::LevelFilter { +// match self.log_level.to_uppercase().as_str() { +// "INFO" => log::LevelFilter::Info, +// "DEBUG" => log::LevelFilter::Debug, +// "WARNING" => log::LevelFilter::Warn, +// "ERROR" => log::LevelFilter::Error, +// "TRACE" => log::LevelFilter::Trace, +// "OFF" => log::LevelFilter::Off, +// _ => log::max_level() +// } +// } +// } impl TlsConfig { pub fn load(&mut self) -> Result<(), String> { diff --git a/forward-proxy/src/handler/mod.rs b/forward-proxy/src/handler/mod.rs index 3efd032..e5b87cf 100644 --- a/forward-proxy/src/handler/mod.rs +++ b/forward-proxy/src/handler/mod.rs @@ -1,20 +1,23 @@ -use std::collections::HashMap; -use std::sync::{Arc, Mutex}; -use log::{error, info}; +use crate::config::HandlerConfig; +use crate::handler::types::request::InitTunnelRequest; +use crate::handler::types::response::{ + ErrorResponse, FpHealthcheckError, FpHealthcheckSuccess, InitTunnelResponseFromRP, + InitTunnelResponseToINT, +}; use pingora::http::StatusCode; -use reqwest::Client; use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; -use pingora_router::handler::{APIHandlerResponse, DefaultHandlerTrait, RequestBodyTrait}; -use crate::handler::types::response::{ErrorResponse, FpHealthcheckError, FpHealthcheckSuccess, InitTunnelResponseFromRP, InitTunnelResponseToINT}; use pingora_router::handler::ResponseBodyTrait; +use pingora_router::handler::{APIHandlerResponse, DefaultHandlerTrait, RequestBodyTrait}; +use reqwest::Client; use serde::Deserialize; -use crate::handler::types::request::InitTunnelRequest; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use tracing::{error, info, trace}; use utils; use utils::jwt::JWTClaims; -use crate::config::HandlerConfig; -pub mod types; pub mod consts; +pub mod types; pub struct ForwardHandler { pub config: HandlerConfig, @@ -43,31 +46,45 @@ impl ForwardHandler { } } + #[cfg_attr(feature = "hotpath", hotpath::measure)] async fn get_public_key( &self, backend_url: String, ctx: &mut Layer8Context, - ) -> Result - { + ) -> Result { let client = Client::new(); - let res = client.get( - //todo - // the input backend_url is originally from interceptor request, - // the interceptor only accepts URLs with http(s) scheme. - // But the authentication server expects a URL without scheme - format!( - "{}{}", - self.config.auth_get_certificate_url, - backend_url.replace("http://", "").replace("https://", "") + let mut val = format!( + "{}{}", + self.config.auth_get_certificate_url, + backend_url.replace("http://", "").replace("https://", "") + ); + + info!("Getting public key from: {}", val); + val = "http://localhost:5001/sp-pub-key?backend_url=http://localhost:6193".to_string(); + + trace!( + "----------------------\n AccessToken {token}\n--------------------", + token = self.config.auth_access_token + ); + + let res = client + .get( + //todo + // the input backend_url is originally from interceptor request, + // the interceptor only accepts URLs with http(s) scheme. + // But the authentication server expects a URL without scheme + val, + ) + .header( + "Authorization", + format!("Bearer {}", self.config.auth_access_token), ) - ) - .header("Authorization", format!("Bearer {}", self.config.auth_access_token)) .send() .await .map_err(|e| { let response_body = ErrorResponse { - error: format!("Failed to connect to layer8: {}", e) + error: format!("Failed to connect to layer8: {}", e), }; APIHandlerResponse { @@ -78,7 +95,10 @@ impl ForwardHandler { if !res.status().is_success() { let response_body = ErrorResponse { - error: format!("Failed to get public key from layer8, status code: {}", res.status().as_u16()), + error: format!( + "Failed to get public key from layer8, status code: {}", + res.status().as_u16() + ), }; error!("Sending error response: {:?}", response_body); @@ -102,8 +122,8 @@ impl ForwardHandler { } })?; - let pub_key = utils::cert::extract_x509_pem(cert.x509_certificate.clone()) - .map_err(|e| { + let pub_key = + utils::cert::extract_x509_pem(cert.x509_certificate.clone()).map_err(|e| { error!("Failed to parse x509 certificate: {:?}", e); APIHandlerResponse { status: StatusCode::INTERNAL_SERVER_ERROR, @@ -121,10 +141,8 @@ impl ForwardHandler { } /// Verify `int_fp_jwt` and return `fp_rp_jwt` - pub fn verify_int_fp_jwt( - &self, - token: &str, - ) -> Result { + #[cfg_attr(feature = "hotpath", hotpath::measure)] + pub fn verify_int_fp_jwt(&self, token: &str) -> Result { return match utils::jwt::verify_jwt_token(token, &self.config.jwt_virtual_connection_key) { Ok(_claims) => { // todo check claims if needed @@ -133,47 +151,48 @@ impl ForwardHandler { let jwts = self.jwts_storage.lock().unwrap(); jwts.get(token).cloned() } { - None => { - Err("token not found!".to_string()) - } - Some(session) => Ok(session) + None => Err("token not found!".to_string()), + Some(session) => Ok(session), } } - Err(err) => Err(err.to_string()) + Err(err) => Err(err.to_string()), }; } /// Validate request body and get ntor certificate for the given backend URL. + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub async fn handle_init_tunnel_request(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { // validate request body - let received_body = match ForwardHandler::parse_request_body::< - InitTunnelRequest, - ErrorResponse - >(&ctx.get_request_body()) - { - Ok(res) => res.to_bytes(), - Err(Some(e)) => { - return APIHandlerResponse { - status: StatusCode::BAD_REQUEST, - body: Some(e.to_bytes()), - }; - } - Err(None) => { - return APIHandlerResponse { - status: StatusCode::BAD_REQUEST, - body: None, - }; - } - }; + let received_body = + match ForwardHandler::parse_request_body::( + &ctx.get_request_body(), + ) { + Ok(res) => res.to_bytes(), + Err(Some(e)) => { + return APIHandlerResponse { + status: StatusCode::BAD_REQUEST, + body: Some(e.to_bytes()), + }; + } + Err(None) => { + return APIHandlerResponse { + status: StatusCode::BAD_REQUEST, + body: None, + }; + } + }; // get public key to initialize encrypted tunnel { // it's safe to use unwrap here because this param was already checked in `request_filter` - let backend_url = ctx.param("backend_url").unwrap_or(&"".to_string()).to_string(); + let backend_url = ctx + .param("backend_url") + .unwrap_or(&"".to_string()) + .to_string(); let server_certificate = match self.get_public_key(backend_url.to_string(), ctx).await { Ok(cert) => cert, - Err(err) => return err + Err(err) => return err, }; info!("Server certificate: {:?}", server_certificate); @@ -193,11 +212,18 @@ impl ForwardHandler { } } + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub fn handle_init_tunnel_response(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { - let ntor_server_id = ctx.get(&consts::CtxKeys::NTorServerId.to_string()).unwrap_or(&"".to_string()).clone(); + let ntor_server_id = ctx + .get(&consts::CtxKeys::NTorServerId.to_string()) + .unwrap_or(&"".to_string()) + .clone(); let ntor_static_public_key = hex::decode( - ctx.get(&consts::CtxKeys::NTorStaticPublicKey.to_string()).clone().unwrap_or(&"".to_string()) - ).unwrap_or_default(); + ctx.get(&consts::CtxKeys::NTorStaticPublicKey.to_string()) + .clone() + .unwrap_or(&"".to_string()), + ) + .unwrap_or_default(); let response_body = ctx.get_response_body(); @@ -217,7 +243,10 @@ impl ForwardHandler { }; let int_fp_session = IntFPSession { - rp_base_url: ctx.param("backend_url").unwrap_or(&"".to_string()).to_string(), + rp_base_url: ctx + .param("backend_url") + .unwrap_or(&"".to_string()) + .to_string(), fp_rp_jwt: res_from_rp.fp_rp_jwt, }; @@ -241,12 +270,14 @@ impl ForwardHandler { }; } + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub fn handle_healthcheck(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { if let Some(error) = ctx.param("error") { if error == "true" { let response_bytes = FpHealthcheckError { - fp_healthcheck_error: "this is placeholder for a custom error".to_string() - }.to_bytes(); + fp_healthcheck_error: "this is placeholder for a custom error".to_string(), + } + .to_bytes(); ctx.insert_response_header("x-fp-healthcheck-error", "response-header-error"); return APIHandlerResponse { @@ -258,7 +289,8 @@ impl ForwardHandler { let response_bytes = FpHealthcheckSuccess { fp_healthcheck_success: "this is placeholder for a custom body".to_string(), - }.to_bytes(); + } + .to_bytes(); ctx.insert_response_header("x-fp-healthcheck-success", "response-header-success"); @@ -267,4 +299,4 @@ impl ForwardHandler { body: Some(response_bytes), }; } -} \ No newline at end of file +} diff --git a/forward-proxy/src/main.rs b/forward-proxy/src/main.rs index fdabaf2..3e43571 100644 --- a/forward-proxy/src/main.rs +++ b/forward-proxy/src/main.rs @@ -1,14 +1,13 @@ -mod proxy; -mod handler; -mod config; - -use std::fs::OpenOptions; -use crate::handler::ForwardHandler; -use env_logger::{Env, Target}; -use log::{debug, info}; -use proxy::ForwardProxy; use pingora::prelude::*; +use proxy::ForwardProxy; +use tracing::{debug, info}; + use crate::config::FPConfig; +use crate::handler::ForwardHandler; + +mod config; +mod handler; +mod proxy; fn load_config() -> FPConfig { // Load environment variables from .env file @@ -17,34 +16,49 @@ fn load_config() -> FPConfig { // Deserialize from env vars let mut config: FPConfig = envy::from_env().expect("Failed to load config"); - config.tls_config.load().expect("Failed to load TLS configuration"); - - let target = match config.log_config.log_path.as_str() { - "console" => Target::Stdout, - path => { - let file = OpenOptions::new() - .append(true) - .create(true) - .open(path) - .expect("Can't create log file!"); - - Target::Pipe(Box::new(file)) - } - }; - - env_logger::Builder::from_env(Env::default() - .write_style_or("RUST_LOG_STYLE", "always")) - .format_file(true) - .format_line_number(true) - .filter(None, config.log_config.to_level_filter()) - .target(target) - .init(); + config + .tls_config + .load() + .expect("Failed to load TLS configuration"); + + // let target = match config.log_config.log_path.as_str() { + // "console" => Target::Stdout, + // path => { + // let file = OpenOptions::new() + // .append(true) + // .create(true) + // .open(path) + // .expect("Can't create log file!"); + + // Target::Pipe(Box::new(file)) + // } + // }; + + // env_logger::Builder::from_env(Env::default() + // .write_style_or("RUST_LOG_STYLE", "always")) + // .format_file(true) + // .format_line_number(true) + // .filter(None, config.log_config.to_level_filter()) + // .target(target) + // .init(); debug!("Loaded ForwardProxyConfig: {:?}", config); config } fn main() { + #[cfg(feature = "hotpath")] + let guard = hotpath::GuardBuilder::new("guard_timeout::main") + .percentiles(&[50, 95, 99]) + .build(); + + #[cfg(feature = "hotpath")] + std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_secs(60)); + drop(guard); + std::process::exit(0); + }); + let config = load_config(); info!("Starting server..."); @@ -52,14 +66,15 @@ fn main() { let mut server = Server::new(Some(Opt { conf: std::env::var("SERVER_CONF").ok(), ..Default::default() - })).expect("Failed to create server"); + })) + .expect("Failed to create server"); server.bootstrap(); let fp_handler = ForwardHandler::new(config.handler_config); let mut proxy = http_proxy_service( &server.configuration, - ForwardProxy::new(config.tls_config, fp_handler) + ForwardProxy::new(config.tls_config, fp_handler), ); proxy.add_tcp(&format!("{}:{}", config.listen_address, config.listen_port)); diff --git a/forward-proxy/src/proxy.rs b/forward-proxy/src/proxy.rs index 39e32d0..d160c89 100644 --- a/forward-proxy/src/proxy.rs +++ b/forward-proxy/src/proxy.rs @@ -1,7 +1,10 @@ +use crate::config::TlsConfig; +use crate::handler::consts::HeaderKeys; +use crate::handler::types::response::ErrorResponse; +use crate::handler::{ForwardHandler, consts}; use async_trait::async_trait; use boring::x509::X509; use bytes::Bytes; -use log::{debug, error, info}; use pingora::Error; use pingora::OrErr; use pingora::http::{RequestHeader, ResponseHeader, StatusCode}; @@ -9,16 +12,13 @@ use pingora::listeners::tls::TLS_CONF_ERR; use pingora::prelude::{HttpPeer, ProxyHttp, Session}; use pingora::upstreams::peer::PeerOptions; use pingora::utils::tls::CertKey; +use pingora_error::ErrorType; use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; +use pingora_router::handler::ResponseBodyTrait; use reqwest::header::TRANSFER_ENCODING; use std::sync::Arc; use std::time::Duration; -use pingora_error::ErrorType; -use pingora_router::handler::{ResponseBodyTrait}; -use crate::config::TlsConfig; -use crate::handler::consts::HeaderKeys; -use crate::handler::{consts, ForwardHandler}; -use crate::handler::types::response::ErrorResponse; +use tracing::{debug, error, info}; pub struct ForwardProxy { tls_config: TlsConfig, @@ -52,9 +52,12 @@ impl ProxyHttp for ForwardProxy { mut e: Box, ) -> Box { let mut retry = false; - if e.etype == ErrorType::ConnectTimedout || e.etype == ErrorType::ConnectError - || e.etype == ErrorType::ConnectRefused { - let mut addrs = ctx.get(&consts::CtxKeys::UpstreamAddress.to_string()) + if e.etype == ErrorType::ConnectTimedout + || e.etype == ErrorType::ConnectError + || e.etype == ErrorType::ConnectRefused + { + let mut addrs = ctx + .get(&consts::CtxKeys::UpstreamAddress.to_string()) .unwrap_or(&"".to_string()) .clone(); @@ -69,13 +72,16 @@ impl ProxyHttp for ForwardProxy { } error!( "Failed to connect to upstream addr: {}, err: {}, retry: {}", - peer._address.to_string(), e, retry + peer._address.to_string(), + e, + retry ); } e.set_retry(retry); e } + #[cfg_attr(feature = "hotpath", hotpath::measure)] async fn upstream_peer( &self, _session: &mut Session, @@ -94,8 +100,14 @@ impl ProxyHttp for ForwardProxy { // // Code below is for step 4(this is a client to RP), presenting the client's TLS certificate. - let addrs = ctx.get(&consts::CtxKeys::UpstreamAddress.to_string()).unwrap_or(&"".to_string()).clone(); - let sni = ctx.get(&consts::CtxKeys::UpstreamSNI.to_string()).unwrap_or(&"".to_string()).clone(); + let addrs = ctx + .get(&consts::CtxKeys::UpstreamAddress.to_string()) + .unwrap_or(&"".to_string()) + .clone(); + let sni = ctx + .get(&consts::CtxKeys::UpstreamSNI.to_string()) + .unwrap_or(&"".to_string()) + .clone(); debug!("upstream_addr: {}, upstream_sni: {}", addrs, sni); // HttpPeer cannot connect to upstream without a valid socket(IP:PORT) address. @@ -112,9 +124,15 @@ impl ProxyHttp for ForwardProxy { break; } Err(err) => { - error!("Panic occurred while creating HttpPeer for {}: {:?}", addr, err); + error!( + "Panic occurred while creating HttpPeer for {}: {:?}", + addr, err + ); address_list.retain(|&x| x != addr); - ctx.set(consts::CtxKeys::UpstreamAddress.to_string(), address_list.join(",")); + ctx.set( + consts::CtxKeys::UpstreamAddress.to_string(), + address_list.join(","), + ); } } } @@ -134,8 +152,9 @@ impl ProxyHttp for ForwardProxy { let ca_cert = X509::from_pem(&self.tls_config.ca_cert.clone().into_bytes()) .or_err(TLS_CONF_ERR, "Failed to load CA certificate")?; - let key = boring::pkey::PKey::private_key_from_pem(&self.tls_config.key.clone().into_bytes()) - .or_err(TLS_CONF_ERR, "Failed to load private key")?; + let key = + boring::pkey::PKey::private_key_from_pem(&self.tls_config.key.clone().into_bytes()) + .or_err(TLS_CONF_ERR, "Failed to load private key")?; // The certificate to present in mTLS connections to upstream // The organization implementing mTLS acts as its own certificate authority. @@ -156,6 +175,7 @@ impl ProxyHttp for ForwardProxy { Ok(Box::new(peer)) } + #[cfg_attr(feature = "hotpath", hotpath::measure)] async fn request_filter( &self, session: &mut Session, @@ -188,7 +208,7 @@ impl ProxyHttp for ForwardProxy { let mut error_response_bytes: Vec = vec![]; match ( session.req_header().uri.path(), - session.req_header().method.as_str() + session.req_header().method.as_str(), ) { ("/healthcheck", "GET") => { let handler_response = self.handler.handle_healthcheck(ctx); @@ -198,9 +218,15 @@ impl ProxyHttp for ForwardProxy { header .insert_header(key.clone(), val.clone()) .map_err(|e| { - error!("Cannot add request header {}:{:?}, err: {:?}", key.clone(), val.clone(), e) - }).unwrap_or_default(); - }; + error!( + "Cannot add request header {}:{:?}, err: {:?}", + key.clone(), + val.clone(), + e + ) + }) + .unwrap_or_default(); + } let mut response_bytes = vec![]; if let Some(body_bytes) = handler_response.body { @@ -213,7 +239,10 @@ impl ProxyHttp for ForwardProxy { session.write_response_header_ref(&header).await?; println!(); - info!("[RESPONSE {}] Header: {:?}", request_summary, header.headers); + info!( + "[RESPONSE {}] Header: {:?}", + request_summary, header.headers + ); info!( "[RESPONSE {}] Body: {}", request_summary, @@ -231,70 +260,71 @@ impl ProxyHttp for ForwardProxy { ("/init-tunnel", "POST") => { if let Some(url) = ctx.param("backend_url") { if let Some(url) = utils::validate_url(url) { - let socket_addr = url.socket_addrs(|| None).unwrap_or_default() + let socket_addr = url + .socket_addrs(|| None) + .unwrap_or_default() .iter() .map(|addr| addr.to_string()) .collect::>() .join(","); - ctx.set( - consts::CtxKeys::UpstreamAddress.to_string(), - socket_addr, - ); + ctx.set(consts::CtxKeys::UpstreamAddress.to_string(), socket_addr); ctx.set( consts::CtxKeys::UpstreamSNI.to_string(), url.domain().unwrap_or_default().to_string(), ); } else { error_response_bytes = ErrorResponse { - error: "Invalid backend_url".to_string() - }.to_bytes(); + error: "Invalid backend_url".to_string(), + } + .to_bytes(); } } else { error_response_bytes = ErrorResponse { - error: "backend_url is a required param".to_string() - }.to_bytes(); + error: "backend_url is a required param".to_string(), + } + .to_bytes(); } } ("/proxy", "POST") => { - error_response_bytes = match ctx.get_request_header().get(HeaderKeys::IntFpJwtKey.as_str()) { - None => { - ErrorResponse { - error: "Missing int_fp_jwt header".to_string() - }.to_bytes() + error_response_bytes = match ctx + .get_request_header() + .get(HeaderKeys::IntFpJwtKey.as_str()) + { + None => ErrorResponse { + error: "Missing int_fp_jwt header".to_string(), } - Some(int_fp_jwt) => { - match self.handler.verify_int_fp_jwt(int_fp_jwt.as_str()) { - Ok(session) => { - debug!("IntFPSession: {:?}", session); - ctx.set(consts::CtxKeys::FpRpJwt.to_string(), session.fp_rp_jwt); - - if let Some(url) = utils::validate_url(&session.rp_base_url) { - let socket_addr = url.socket_addrs(|| None).unwrap_or_default() - .iter() - .map(|addr| addr.to_string()) - .collect::>() - .join(","); - ctx.set( - consts::CtxKeys::UpstreamAddress.to_string(), - socket_addr, - ); - ctx.set( - consts::CtxKeys::UpstreamSNI.to_string(), - url.domain().unwrap_or_default().to_string(), - ); - vec![] - } else { - ErrorResponse { - error: "Invalid backend_url".to_string() - }.to_bytes() + .to_bytes(), + Some(int_fp_jwt) => match self.handler.verify_int_fp_jwt(int_fp_jwt.as_str()) { + Ok(session) => { + debug!("IntFPSession: {:?}", session); + ctx.set(consts::CtxKeys::FpRpJwt.to_string(), session.fp_rp_jwt); + + if let Some(url) = utils::validate_url(&session.rp_base_url) { + let socket_addr = url + .socket_addrs(|| None) + .unwrap_or_default() + .iter() + .map(|addr| addr.to_string()) + .collect::>() + .join(","); + ctx.set(consts::CtxKeys::UpstreamAddress.to_string(), socket_addr); + ctx.set( + consts::CtxKeys::UpstreamSNI.to_string(), + url.domain().unwrap_or_default().to_string(), + ); + vec![] + } else { + ErrorResponse { + error: "Invalid backend_url".to_string(), } - } - Err(err) => { - error!("Error verifying int_fp_jwt: {}", err); - ErrorResponse { error: err }.to_bytes() + .to_bytes() } } - } + Err(err) => { + error!("Error verifying int_fp_jwt: {}", err); + ErrorResponse { error: err }.to_bytes() + } + }, } } _ => { @@ -306,10 +336,15 @@ impl ProxyHttp for ForwardProxy { } if error_response_bytes.len() > 0 { - error!("[RESPONSE] Error: {}", utils::bytes_to_string(&error_response_bytes)); + error!( + "[RESPONSE] Error: {}", + utils::bytes_to_string(&error_response_bytes) + ); let header = ResponseHeader::build(StatusCode::BAD_REQUEST, None)?; session.write_response_header_ref(&header).await?; - session.write_response_body(Some(Bytes::from(error_response_bytes)), true).await?; + session + .write_response_body(Some(Bytes::from(error_response_bytes)), true) + .await?; session.set_keepalive(None); return Ok(true); } @@ -317,6 +352,7 @@ impl ProxyHttp for ForwardProxy { Ok(false) } + #[cfg_attr(feature = "hotpath", hotpath::measure)] async fn request_body_filter( &self, session: &mut Session, @@ -362,9 +398,9 @@ impl ProxyHttp for ForwardProxy { handler_response.status, utils::bytes_to_string(&handler_response.body.unwrap_or_default()) ); - return Err(pingora::Error::new( - pingora::ErrorType::HTTPStatus(u16::from(handler_response.status)), - )); + return Err(pingora::Error::new(pingora::ErrorType::HTTPStatus( + u16::from(handler_response.status), + ))); } info!( @@ -381,6 +417,7 @@ impl ProxyHttp for ForwardProxy { Ok(()) } + #[cfg_attr(feature = "hotpath", hotpath::measure)] async fn upstream_request_filter( &self, session: &mut Session, @@ -392,16 +429,19 @@ impl ProxyHttp for ForwardProxy { { match session.req_header().uri.path() { "/proxy" => { - match upstream_request.headers.get(HeaderKeys::IntFpJwtKey.as_str()) { + match upstream_request + .headers + .get(HeaderKeys::IntFpJwtKey.as_str()) + { None => { error!( "[REQUEST {}] Missing required header: {}", session.request_summary(), HeaderKeys::IntFpJwtKey.as_str() ); - return Err(pingora::Error::new( - pingora::ErrorType::HTTPStatus(u16::from(StatusCode::BAD_REQUEST)), - )); + return Err(pingora::Error::new(pingora::ErrorType::HTTPStatus( + u16::from(StatusCode::BAD_REQUEST), + ))); } Some(token) => { let token_str = token.to_str().or_err( @@ -415,15 +455,18 @@ impl ProxyHttp for ForwardProxy { session.request_summary(), HeaderKeys::IntFpJwtKey.as_str() ); - return Err(pingora::Error::new( - pingora::ErrorType::HTTPStatus(u16::from(StatusCode::BAD_REQUEST)), - )); + return Err(pingora::Error::new(pingora::ErrorType::HTTPStatus( + u16::from(StatusCode::BAD_REQUEST), + ))); } match self.handler.verify_int_fp_jwt(token_str) { Ok(session) => { upstream_request - .insert_header(HeaderKeys::FpRpJwtKey.as_str(), session.fp_rp_jwt) + .insert_header( + HeaderKeys::FpRpJwtKey.as_str(), + session.fp_rp_jwt, + ) .unwrap_or_default(); upstream_request.remove_header(HeaderKeys::IntFpJwtKey.as_str()); } @@ -434,12 +477,10 @@ impl ProxyHttp for ForwardProxy { HeaderKeys::IntFpJwtKey.as_str(), err ); - return Err( - pingora::Error::explain( - pingora::ErrorType::InvalidHTTPHeader, - err, - ) - ); + return Err(pingora::Error::explain( + pingora::ErrorType::InvalidHTTPHeader, + err, + )); } } } @@ -458,13 +499,13 @@ impl ProxyHttp for ForwardProxy { Ok(()) } + #[cfg_attr(feature = "hotpath", hotpath::measure)] async fn response_filter( &self, _session: &mut Session, upstream_response: &mut ResponseHeader, _ctx: &mut Self::CTX, - ) -> pingora::Result<()> - { + ) -> pingora::Result<()> { upstream_response.insert_header("Access-Control-Allow-Origin", "*")?; upstream_response.insert_header("Access-Control-Allow-Methods", "POST")?; upstream_response.insert_header("Access-Control-Allow-Headers", "*")?; @@ -479,6 +520,7 @@ impl ProxyHttp for ForwardProxy { Ok(()) } + #[cfg_attr(feature = "hotpath", hotpath::measure)] fn response_body_filter( &self, session: &mut Session, @@ -523,11 +565,9 @@ impl ProxyHttp for ForwardProxy { handler_response.status, utils::bytes_to_string(&handler_response.body.unwrap_or_default()) ); - return Err(pingora::Error::new( - pingora::ErrorType::HTTPStatus( - u16::from(StatusCode::INTERNAL_SERVER_ERROR) - ), - )); + return Err(pingora::Error::new(pingora::ErrorType::HTTPStatus( + u16::from(StatusCode::INTERNAL_SERVER_ERROR), + ))); } info!( diff --git a/pingora-router/Cargo.toml b/pingora-router/Cargo.toml index 4da2e2f..feb199b 100644 --- a/pingora-router/Cargo.toml +++ b/pingora-router/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] -log = "0.4.27" +tracing = "0.1" futures = "0.3.31" pingora = { version = "0.5.0", features = ["lb", "boringssl"] } serde = "1.0.219" diff --git a/pingora-router/src/ctx.rs b/pingora-router/src/ctx.rs index b1a0608..84a5601 100644 --- a/pingora-router/src/ctx.rs +++ b/pingora-router/src/ctx.rs @@ -1,7 +1,7 @@ -use std::collections::HashMap; +use crate::utils::get_request_body; use pingora::http::{Method, RequestHeader}; use pingora::proxy::Session; -use crate::utils::get_request_body; +use std::collections::HashMap; /* * Each type in this crate serves a specific purpose and may be updated as requirements evolve. @@ -20,10 +20,16 @@ pub struct Layer8ContextRequestSummary { impl Layer8ContextRequestSummary { pub(crate) fn from(session: &Session) -> Self { let method = session.req_header().method.clone(); - let scheme = session.req_header().uri.scheme() + let scheme = session + .req_header() + .uri + .scheme() .map(|s| s.to_string()) .unwrap_or_else(|| "".to_string()); - let host = session.req_header().uri.host() + let host = session + .req_header() + .uri + .host() .map(|h| h.to_string()) .unwrap_or_else(|| "".to_string()); let path = session.req_header().uri.path().to_string(); @@ -109,11 +115,10 @@ impl Layer8Context { pub async fn read_request_body(&mut self, session: &mut Session) -> pingora::Result { match get_request_body(session).await { Ok(body) => self.request.body = body, - Err(err) => return Err(err) + Err(err) => return Err(err), }; Ok(true) } - } impl Layer8ContextTrait for Layer8Context { @@ -134,8 +139,10 @@ impl Layer8ContextTrait for Layer8Context { fn set_request_header(&mut self, header: RequestHeader) { for (key, val) in header.headers.iter() { - self.request.header.insert(key.to_string(), val.to_str().unwrap_or("").to_string()); - }; + self.request + .header + .insert(key.to_string(), val.to_str().unwrap_or("").to_string()); + } } fn get_request_header(&self) -> &Layer8Header { @@ -143,7 +150,9 @@ impl Layer8ContextTrait for Layer8Context { } fn insert_response_header(&mut self, key: &str, val: &str) { - self.response.header.insert(key.to_lowercase().to_string(), val.to_string()); + self.response + .header + .insert(key.to_lowercase().to_string(), val.to_string()); } fn remove_response_header(&mut self, key: &str) -> Option { diff --git a/pingora-router/src/handler.rs b/pingora-router/src/handler.rs index e2f3413..828271d 100644 --- a/pingora-router/src/handler.rs +++ b/pingora-router/src/handler.rs @@ -1,9 +1,9 @@ -use std::fmt::Debug; -use pingora::http::StatusCode; use crate::ctx::Layer8Context; use futures::future::BoxFuture; +use pingora::http::StatusCode; use serde::de::Deserialize; use serde::ser::Serialize; +use std::fmt::Debug; /* * Each type in this crate has a specific purpose and may be updated as requirements evolve. @@ -41,7 +41,9 @@ use serde::ser::Serialize; /// async move { h.handle(ctx).await }.boxed() /// }); /// ``` -pub type APIHandler = Box Fn(&'a T, &'a mut Layer8Context) -> BoxFuture<'a, APIHandlerResponse> + Send + Sync>; +pub type APIHandler = Box< + dyn for<'a> Fn(&'a T, &'a mut Layer8Context) -> BoxFuture<'a, APIHandlerResponse> + Send + Sync, +>; /// `APIHandlerResponse` contains information returned by handlers and can be /// shared across handlers during request processing. @@ -68,13 +70,15 @@ pub trait ResponseBodyTrait: Serialize + for<'de> Deserialize<'de> + Debug { serde_json::to_vec(self).unwrap() } - fn from_bytes(bytes: Vec) -> Result, serde_json::Error> { - serde_json::from_slice(&bytes) + fn from_bytes(bytes: &[u8]) -> Result, serde_json::Error> { + serde_json::from_slice(bytes) } /// Override this method to handle error serialization if your handler implements /// the `DefaultHandler` trait. - fn from_json_err(_err: serde_json::Error) -> Option {None} + fn from_json_err(_err: serde_json::Error) -> Option { + None + } } /// `RequestBodyTrait` provides a default method to deserialize the request body bytes @@ -89,8 +93,8 @@ pub trait RequestBodyTrait: Serialize + for<'de> Deserialize<'de> + Debug { serde_json::to_vec(self).unwrap() } - fn from_bytes(bytes: Vec) -> Result, serde_json::Error> { - serde_json::from_slice(&bytes) + fn from_bytes(bytes: &[u8]) -> Result, serde_json::Error> { + serde_json::from_slice(bytes) } } @@ -107,11 +111,14 @@ pub trait RequestBodyTrait: Serialize + for<'de> Deserialize<'de> + Debug { /// If deserialization fails, it returns no body, an error response of type `E: impl /// ResponseBodyTrait` (constructed from the JSON error), and a 400 Bad Request status. pub trait DefaultHandlerTrait { - fn parse_request_body(data: &Vec) -> Result> + fn parse_request_body(data: &[u8]) -> Result> + where + T: RequestBodyTrait, + E: ResponseBodyTrait, { - match T::from_bytes(data.clone()) { + match T::from_bytes(data) { Ok(body) => Ok(*body), - Err(e) => Err(E::from_json_err(e)) + Err(e) => Err(E::from_json_err(e)), } } -} \ No newline at end of file +} diff --git a/pingora-router/src/utils.rs b/pingora-router/src/utils.rs index 7c2ced0..90eb609 100644 --- a/pingora-router/src/utils.rs +++ b/pingora-router/src/utils.rs @@ -1,16 +1,14 @@ -use log::error; use pingora::prelude::Session; +use tracing::error; pub(crate) async fn get_request_body(session: &mut Session) -> pingora::Result> { let mut body = Vec::new(); loop { match session.read_request_body().await { - Ok(option) => { - match option { - Some(chunk) => body.extend_from_slice(&chunk), - None => break, - } - } + Ok(option) => match option { + Some(chunk) => body.extend_from_slice(&chunk), + None => break, + }, Err(err) => { error!("ERROR: {err}"); return Err(err); @@ -18,4 +16,4 @@ pub(crate) async fn get_request_body(session: &mut Session) -> pingora::Result log::LevelFilter { - match self.log_level.to_uppercase().as_str() { - "INFO" => log::LevelFilter::Info, - "DEBUG" => log::LevelFilter::Debug, - "WARNING" => log::LevelFilter::Warn, - "ERROR" => log::LevelFilter::Error, - "TRACE" => log::LevelFilter::Trace, - "OFF" => log::LevelFilter::Off, - _ => log::max_level() - } - } -} +// impl LogConfig { +// pub fn to_level_filter(&self) -> tracing::LevelFilter { +// match self.log_level.to_uppercase().as_str() { +// "INFO" => log::LevelFilter::Info, +// "DEBUG" => log::LevelFilter::Debug, +// "WARNING" => log::LevelFilter::Warn, +// "ERROR" => log::LevelFilter::Error, +// "TRACE" => log::LevelFilter::Trace, +// "OFF" => log::LevelFilter::Off, +// _ => log::max_level(), +// } +// } +// } #[derive(Debug, Deserialize, Clone)] pub(super) struct ServerConfig { pub listen_address: String, #[serde(deserialize_with = "utils::deserializer::string_to_number")] - pub listen_port: u16 + pub listen_port: u16, } #[derive(Debug, Deserialize, Clone)] diff --git a/reverse-proxy/src/handler/init_tunnel/handler.rs b/reverse-proxy/src/handler/init_tunnel/handler.rs index 8898a63..bb001e2 100644 --- a/reverse-proxy/src/handler/init_tunnel/handler.rs +++ b/reverse-proxy/src/handler/init_tunnel/handler.rs @@ -1,11 +1,11 @@ -use log::{error, info}; +use crate::handler::common::consts::INIT_TUNNEL_TO_BACKEND_PATH; +use crate::handler::common::types::ErrorResponse; +use crate::handler::init_tunnel::{InitEncryptedTunnelRequest, InitTunnelRequestToBackend}; use pingora::http::StatusCode; use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; use pingora_router::handler::{APIHandlerResponse, DefaultHandlerTrait, ResponseBodyTrait}; use reqwest::Client; -use crate::handler::common::consts::INIT_TUNNEL_TO_BACKEND_PATH; -use crate::handler::common::types::ErrorResponse; -use crate::handler::init_tunnel::{InitEncryptedTunnelRequest, InitTunnelRequestToBackend}; +use tracing::{error, info}; /// Struct containing only associated methods (no instance methods or fields) pub(crate) struct InitTunnelHandler {} @@ -13,21 +13,21 @@ pub(crate) struct InitTunnelHandler {} impl DefaultHandlerTrait for InitTunnelHandler {} impl InitTunnelHandler { + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub(crate) async fn validate_request_body( ctx: &mut Layer8Context, backend_url: String, - ) -> Result - { + ) -> Result { return match InitTunnelHandler::parse_request_body::< InitEncryptedTunnelRequest, - ErrorResponse + ErrorResponse, >(&ctx.get_request_body()) { Ok(res) => Ok(res), Err(err) => { let body = match err { None => None, - Some(err_response) => Some(err_response.to_bytes()) + Some(err_response) => Some(err_response.to_bytes()), }; InitTunnelHandler::send_result_to_be(backend_url, false).await; @@ -41,9 +41,7 @@ impl InitTunnelHandler { } pub(crate) async fn send_result_to_be(backend_url: String, result: bool) { - let body = InitTunnelRequestToBackend { - success: result, - }; + let body = InitTunnelRequestToBackend { success: result }; let request_url = format!("{backend_url}{INIT_TUNNEL_TO_BACKEND_PATH}"); @@ -51,17 +49,24 @@ impl InitTunnelHandler { info!("{log_meta} request to BE body: {:?}", body); let client = Client::new(); - match client.post(request_url) + match client + .post(request_url) .header("Content-Type", "application/json") .json(&body) .send() .await { Ok(res) => { - info!("{log_meta} Response sending init-tunnel result to BE: {:?}", res) + info!( + "{log_meta} Response sending init-tunnel result to BE: {:?}", + res + ) } Err(err) => { - error!("{log_meta} Error sending init-tunnel result to BE: {:?}", err) + error!( + "{log_meta} Error sending init-tunnel result to BE: {:?}", + err + ) } } } diff --git a/reverse-proxy/src/handler/mod.rs b/reverse-proxy/src/handler/mod.rs index 1e3676d..7fe55d9 100644 --- a/reverse-proxy/src/handler/mod.rs +++ b/reverse-proxy/src/handler/mod.rs @@ -1,23 +1,23 @@ -use std::collections::HashMap; -use std::sync::{Mutex, MutexGuard}; -use log::debug; +use crate::config::{HandlerConfig, RPConfig}; +use crate::handler::healthcheck::{RpHealthcheckError, RpHealthcheckSuccess}; +use init_tunnel::InitEncryptedTunnelResponse; +use init_tunnel::handler::InitTunnelHandler; use ntor::common::{InitSessionMessage, NTorParty}; use ntor::server::NTorServer; use pingora::http::StatusCode; use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; use pingora_router::handler::{APIHandlerResponse, ResponseBodyTrait}; -use init_tunnel::handler::InitTunnelHandler; use proxy::handler::ProxyHandler; -use init_tunnel::InitEncryptedTunnelResponse; -use utils::{new_uuid}; +use std::collections::HashMap; +use std::sync::{Mutex, MutexGuard}; +use tracing::debug; use utils::jwt::JWTClaims; -use crate::config::{HandlerConfig, RPConfig}; -use crate::handler::healthcheck::{RpHealthcheckError, RpHealthcheckSuccess}; +use utils::new_uuid; mod common; +mod healthcheck; mod init_tunnel; mod proxy; -mod healthcheck; thread_local! { // @@ -42,6 +42,7 @@ impl ReverseHandler { } } + #[cfg_attr(feature = "hotpath", hotpath::measure)] fn get_ntor_shared_secret(&self, session_id: String) -> Result, APIHandlerResponse> { let shared_secret = NTOR_SHARED_SECRETS.with(|memory| { let guard = memory.lock().unwrap(); @@ -50,24 +51,23 @@ impl ReverseHandler { return match shared_secret { Some(secret) => Ok(secret.clone()), - None => { - Err(APIHandlerResponse { - status: StatusCode::UNAUTHORIZED, - body: Some("Invalid or expired nTor session ID".as_bytes().to_vec()), - }) - } + None => Err(APIHandlerResponse { + status: StatusCode::UNAUTHORIZED, + body: Some("Invalid or expired nTor session ID".as_bytes().to_vec()), + }), }; } + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub async fn handle_init_tunnel(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { // validate request body - let request_body = match InitTunnelHandler::validate_request_body( - ctx, - self.config.backend_url.clone(), - ).await { - Ok(res) => res, - Err(res) => return res - }; + let request_body = + match InitTunnelHandler::validate_request_body(ctx, self.config.backend_url.clone()) + .await + { + Ok(res) => res, + Err(res) => return res, + }; debug!("[REQUEST /init-tunnel] Parsed body: {:?}", request_body); // todo I think there are prettier ways to use nTor since we are free to modify the nTor crate, but I'm lazy @@ -113,7 +113,10 @@ impl ReverseHandler { NTOR_SHARED_SECRETS.with(|memory| { let mut guard: MutexGuard>> = memory.lock().unwrap(); - guard.insert(ntor_session_id, ntor_server.get_shared_secret().unwrap_or_default()); + guard.insert( + ntor_session_id, + ntor_server.get_shared_secret().unwrap_or_default(), + ); }); APIHandlerResponse { @@ -122,8 +125,8 @@ impl ReverseHandler { } } + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub async fn handle_proxy_request(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { - // validate request headers (nTor session ID) let session_id = match ProxyHandler::validate_request_headers(ctx, &self.jwt_secret) { Ok(session_id) => session_id, @@ -155,34 +158,39 @@ impl ReverseHandler { let wrapped_response = match ProxyHandler::rebuild_user_request( self.config.backend_url.clone(), wrapped_request, - ).await { + ) + .await + { Ok(res) => res, Err(res) => return res, }; - debug!("[RESPONSE /proxy] Wrapped Backend response: {:?}", wrapped_response); + debug!( + "[RESPONSE /proxy] Wrapped Backend response: {:?}", + wrapped_response + ); return match ProxyHandler::encrypt_response_body( wrapped_response, self.config.ntor_server_id.clone(), shared_secret, ) { - Ok(encrypted_message) => { - APIHandlerResponse { - status: StatusCode::OK, - body: Some(encrypted_message.to_bytes()), - } - } - Err(res) => res + Ok(encrypted_message) => APIHandlerResponse { + status: StatusCode::OK, + body: Some(encrypted_message.to_bytes()), + }, + Err(res) => res, }; } + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub async fn handle_healthcheck(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { if let Some(error) = ctx.param("error") { if error == "true" { let response_bytes = RpHealthcheckError { - rp_healthcheck_error: "this is placeholder for a custom error".to_string() - }.to_bytes(); + rp_healthcheck_error: "this is placeholder for a custom error".to_string(), + } + .to_bytes(); ctx.insert_response_header("x-rp-healthcheck-error", "response-header-error"); return APIHandlerResponse { @@ -194,7 +202,8 @@ impl ReverseHandler { let response_bytes = RpHealthcheckSuccess { rp_healthcheck_success: "this is placeholder for a custom body".to_string(), - }.to_bytes(); + } + .to_bytes(); ctx.insert_response_header("x-rp-healthcheck-success", "response-header-success"); @@ -203,4 +212,4 @@ impl ReverseHandler { body: Some(response_bytes), }; } -} \ No newline at end of file +} diff --git a/reverse-proxy/src/handler/proxy/handler.rs b/reverse-proxy/src/handler/proxy/handler.rs index 9134363..7cffc4a 100644 --- a/reverse-proxy/src/handler/proxy/handler.rs +++ b/reverse-proxy/src/handler/proxy/handler.rs @@ -1,16 +1,16 @@ -use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; -use reqwest::header::HeaderMap; -use pingora_router::handler::{APIHandlerResponse, DefaultHandlerTrait, ResponseBodyTrait}; -use log::{debug, error}; +use crate::handler::common::consts::HeaderKeys; +use crate::handler::common::types::ErrorResponse; +use crate::handler::proxy::{EncryptedMessage, L8RequestObject, L8ResponseObject}; use ntor::common::NTorParty; use ntor::server::NTorServer; -use reqwest::Client; use pingora::http::StatusCode; +use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; +use pingora_router::handler::{APIHandlerResponse, DefaultHandlerTrait, ResponseBodyTrait}; +use reqwest::Client; +use reqwest::header::HeaderMap; +use tracing::{debug, error}; use utils::bytes_to_json; use utils::jwt::JWTClaims; -use crate::handler::common::consts::{HeaderKeys}; -use crate::handler::common::types::ErrorResponse; -use crate::handler::proxy::{EncryptedMessage, L8ResponseObject, L8RequestObject}; /// Struct containing only associated methods (no instance methods or fields) pub struct ProxyHandler {} @@ -18,28 +18,34 @@ pub struct ProxyHandler {} impl DefaultHandlerTrait for ProxyHandler {} impl ProxyHandler { - + #[cfg_attr(feature = "hotpath", hotpath::measure)] fn validate_jwt_token( ctx: &mut Layer8Context, header_key: HeaderKeys, - jwt_secret: &Vec + jwt_secret: &Vec, ) -> Result { match ctx.get_request_header().get(header_key.as_str()) { None => { return Err(APIHandlerResponse { status: StatusCode::BAD_REQUEST, - body: Some(ErrorResponse { - error: format!("Missing {} header", header_key.as_str()), - }.to_bytes()), + body: Some( + ErrorResponse { + error: format!("Missing {} header", header_key.as_str()), + } + .to_bytes(), + ), }); - }, + } Some(token) => { if token.is_empty() { return Err(APIHandlerResponse { status: StatusCode::BAD_REQUEST, - body: Some(ErrorResponse { - error: format!("Empty {} header", header_key.as_str()), - }.to_bytes()), + body: Some( + ErrorResponse { + error: format!("Empty {} header", header_key.as_str()), + } + .to_bytes(), + ), }); } @@ -50,28 +56,31 @@ impl ProxyHandler { error!("Error verifying {} token: {:?}", header_key.as_str(), err); Err(APIHandlerResponse { status: StatusCode::BAD_REQUEST, - body: Some(ErrorResponse { - error: err.to_string(), - }.to_bytes()), + body: Some( + ErrorResponse { + error: err.to_string(), + } + .to_bytes(), + ), }) - }, + } } } } } /// Validates the request headers and get ntor_session_id in return. + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub(crate) fn validate_request_headers( ctx: &mut Layer8Context, jwt_secret: &Vec, - ) -> Result - { + ) -> Result { // verify fp_rp_jwt header match ProxyHandler::validate_jwt_token(ctx, HeaderKeys::FpRpJwtKey, jwt_secret) { Ok(_claims) => { // todo!() nothing to validate at the moment } - Err(err) => return Err(err) + Err(err) => return Err(err), } return match ProxyHandler::validate_jwt_token(ctx, HeaderKeys::IntRpJwtKey, jwt_secret) { @@ -81,24 +90,26 @@ impl ProxyHandler { Some(ntor_session_id) => Ok(ntor_session_id), None => Err(APIHandlerResponse { status: StatusCode::BAD_REQUEST, - body: Some(ErrorResponse { - error: "Missing ntor_session_id in JWT claims".to_string(), - }.to_bytes()), + body: Some( + ErrorResponse { + error: "Missing ntor_session_id in JWT claims".to_string(), + } + .to_bytes(), + ), }), } } - Err(err) => return Err(err) + Err(err) => return Err(err), }; } + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub(crate) fn validate_request_body( - ctx: &mut Layer8Context - ) -> Result - { - match ProxyHandler::parse_request_body::< - EncryptedMessage, - ErrorResponse - >(&ctx.get_request_body()) { + ctx: &mut Layer8Context, + ) -> Result { + match ProxyHandler::parse_request_body::( + &ctx.get_request_body(), + ) { Ok(res) => Ok(res), Err(err) => { let body = match err { @@ -116,57 +127,69 @@ impl ProxyHandler { } } + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub(crate) fn decrypt_request_body( request_body: EncryptedMessage, ntor_server_id: String, shared_secret: Vec, - ) -> Result - { + ) -> Result { let mut ntor_server = NTorServer::new(ntor_server_id); ntor_server.set_shared_secret(shared_secret.clone()); // Decrypt the request body using nTor shared secret - let decrypted_data = ntor_server.decrypt(ntor::common::EncryptedMessage { - nonce: <[u8; 12]>::try_from(request_body.nonce).unwrap_or_default(), - data: request_body.data, - }).map_err(|err| { - return APIHandlerResponse { - status: StatusCode::BAD_REQUEST, - body: Some(format!("Decryption failed: {}", err).as_bytes().to_vec()), - }; - })?; - // let decrypted_data = request_body.data; - - // parse decrypted data into WrappedUserRequest - let wrapped_request: L8RequestObject = bytes_to_json(decrypted_data) + let decrypted_data = ntor_server + .decrypt(ntor::common::EncryptedMessage { + nonce: <[u8; 12]>::try_from(request_body.nonce).unwrap_or_default(), + data: request_body.data, + }) .map_err(|err| { return APIHandlerResponse { status: StatusCode::BAD_REQUEST, - body: Some(format!("Failed to parse request body: {}", err).as_bytes().to_vec()), + body: Some(format!("Decryption failed: {}", err).as_bytes().to_vec()), }; })?; + // let decrypted_data = request_body.data; + + // parse decrypted data into WrappedUserRequest + let wrapped_request: L8RequestObject = bytes_to_json(decrypted_data).map_err(|err| { + return APIHandlerResponse { + status: StatusCode::BAD_REQUEST, + body: Some( + format!("Failed to parse request body: {}", err) + .as_bytes() + .to_vec(), + ), + }; + })?; Ok(wrapped_request) } + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub(crate) async fn rebuild_user_request( backend_url: String, - wrapped_request: L8RequestObject - ) -> Result - { + wrapped_request: L8RequestObject, + ) -> Result { let header_map = utils::hashmap_to_headermap(&wrapped_request.headers) .unwrap_or_else(|_| HeaderMap::new()); - debug!("[FORWARD {}] Reconstructed request headers: {:?}", wrapped_request.uri, header_map); + debug!( + "[FORWARD {}] Reconstructed request headers: {:?}", + wrapped_request.uri, header_map + ); let origin_url = format!("{}{}", backend_url, wrapped_request.uri); - debug!("[FORWARD {}] Request URL: {}", wrapped_request.uri, origin_url); + debug!( + "[FORWARD {}] Request URL: {}", + wrapped_request.uri, origin_url + ); let client = Client::new(); - let response = client.request( - wrapped_request.method.parse().unwrap_or_default(), - origin_url.as_str(), - ) + let response = client + .request( + wrapped_request.method.parse().unwrap_or_default(), + origin_url.as_str(), + ) .headers(header_map.clone()) .body(wrapped_request.body) .send() @@ -175,7 +198,8 @@ impl ProxyHandler { return match response { Ok(success_res) => { let status = success_res.status().as_u16(); - let status_text = success_res.status() + let status_text = success_res + .status() .canonical_reason() .unwrap_or("OK") .to_string(); @@ -188,8 +212,7 @@ impl ProxyHandler { debug!( "[FORWARD {}] Response from backend headers: {:?}", - wrapped_request.uri, - serialized_headers + wrapped_request.uri, serialized_headers ); debug!( "[FORWARD {}] Response from backend body: {}", @@ -209,7 +232,9 @@ impl ProxyHandler { } Err(err) => { error!("[FORWARD] Error while building request to BE: {:?}", err); - let status = err.status().unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR); + let status = err + .status() + .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR); let err_body = ErrorResponse { error: format!("Backend error: {}", status), }; @@ -222,12 +247,12 @@ impl ProxyHandler { }; } + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub(crate) fn encrypt_response_body( response_body: L8ResponseObject, ntor_server_id: String, shared_secret: Vec, - ) -> Result - { + ) -> Result { let mut ntor_server = NTorServer::new(ntor_server_id); ntor_server.set_shared_secret(shared_secret); diff --git a/reverse-proxy/src/main.rs b/reverse-proxy/src/main.rs index dfa4b42..7816c0a 100644 --- a/reverse-proxy/src/main.rs +++ b/reverse-proxy/src/main.rs @@ -1,65 +1,78 @@ -mod handler; -mod proxy; -mod tls_conf; +use std::sync::Arc; -use std::fs::OpenOptions; -use crate::handler::ReverseHandler; -use env_logger::{self, Env, Target}; use futures::FutureExt; use pingora::server::Server; use pingora::server::configuration::Opt; use pingora::{listeners::tls::TlsSettings, prelude::http_proxy_service}; use pingora_router::handler::APIHandler; use pingora_router::router::Router; -use std::sync::Arc; -use log::{debug, error}; +use tracing::{debug, error}; + use crate::config::RPConfig; +use crate::handler::ReverseHandler; use crate::proxy::ReverseProxy; mod config; +mod handler; +mod proxy; +mod tls_conf; fn load_config() -> RPConfig { // Load environment variables from .env file dotenv::dotenv().ok(); // Deserialize from env vars - let config: RPConfig = envy::from_env().map_err(|e| { - error!("Failed to load configuration: {}", e); - }).unwrap(); - - let target = match config.log.log_path.as_str() { - "console" => Target::Stdout, - path => { - let file = OpenOptions::new() - .append(true) - .create(true) - .open(path) - .expect("Can't create log file!"); - - Target::Pipe(Box::new(file)) - } - }; - - env_logger::Builder::from_env(Env::default() - .write_style_or("RUST_LOG_STYLE", "always")) - .format_file(true) - .format_line_number(true) - .filter(None, config.log.to_level_filter()) - .target(target) - .init(); + let config: RPConfig = envy::from_env() + .map_err(|e| { + error!("Failed to load configuration: {}", e); + }) + .unwrap(); + + // let target = match config.log.log_path.as_str() { + // "console" => {Target::Stdout,} + // path => { + // // let file = OpenOptions::new() + // // .append(true) + // // .create(true) + // // .open(path) + // // .expect("Can't create log file!"); + + // // Target::Pipe(Box::new(file)) + // } + // }; + + // env_logger::Builder::from_env(Env::default().write_style_or("RUST_LOG_STYLE", "always")) + // .format_file(true) + // .format_line_number(true) + // .filter(None, config.log.to_level_filter()) + // .target(target) + // .init(); debug!("Loaded ReverseProxyConfig: {:?}", config); config } fn main() { + #[cfg(feature = "hotpath")] + let guard = hotpath::GuardBuilder::new("guard_timeout::main") + .percentiles(&[50, 95, 99]) + .build(); + + #[cfg(feature = "hotpath")] + std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_secs(60)); + drop(guard); + std::process::exit(0); + }); + // Load environment variables from .env file let rp_config = load_config(); let mut my_server = Server::new(Some(Opt { conf: std::env::var("SERVER_CONF").ok(), ..Default::default() - })).unwrap(); + })) + .unwrap(); my_server.bootstrap(); let handle_init_tunnel: APIHandler> = @@ -77,26 +90,22 @@ fn main() { router.post("/proxy".to_string(), Box::new([handle_proxy])); router.get("/healthcheck".to_string(), Box::new([handle_healthcheck])); - let mut my_proxy = http_proxy_service( - &my_server.configuration, - ReverseProxy::new(router), - ); + let mut my_proxy = http_proxy_service(&my_server.configuration, ReverseProxy::new(router)); if rp_config.tls.enable_tls { my_proxy.add_tls_with_settings( &format!( "{}:{}", - rp_config.server.listen_address, - rp_config.server.listen_port + rp_config.server.listen_address, rp_config.server.listen_port ), None, - TlsSettings::with_callbacks(Box::new(rp_config.tls)).expect("Cannot set TlsSettings callbacks") + TlsSettings::with_callbacks(Box::new(rp_config.tls)) + .expect("Cannot set TlsSettings callbacks"), ); } else { my_proxy.add_tcp(&format!( "{}:{}", - rp_config.server.listen_address, - rp_config.server.listen_port + rp_config.server.listen_address, rp_config.server.listen_port )); } diff --git a/reverse-proxy/src/proxy.rs b/reverse-proxy/src/proxy.rs index 0b11655..af7d6bf 100644 --- a/reverse-proxy/src/proxy.rs +++ b/reverse-proxy/src/proxy.rs @@ -1,37 +1,39 @@ -use pingora::prelude::{HttpPeer, ProxyHttp}; -use pingora::proxy::Session; -use pingora::http::{ResponseHeader, StatusCode}; -use log::{error, info}; use async_trait::async_trait; use bytes::Bytes; +use pingora::http::{ResponseHeader, StatusCode}; +use pingora::prelude::{HttpPeer, ProxyHttp}; +use pingora::proxy::Session; use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; use pingora_router::router::Router; +use tracing::{error, info}; pub struct ReverseProxy { - router: Router + router: Router, } impl ReverseProxy { pub fn new(router: Router) -> Self { - ReverseProxy { - router - } + ReverseProxy { router } } async fn set_headers( session: &mut Session, ctx: &mut Layer8Context, - response_status: StatusCode + response_status: StatusCode, ) -> pingora::Result<()> { let mut header = ResponseHeader::build(response_status, None)?; let response_header = ctx.get_response_header().clone(); for (key, val) in response_header.iter() { - header.insert_header(key.clone(), val.clone()).unwrap_or_default(); - }; + header + .insert_header(key.clone(), val.clone()) + .unwrap_or_default(); + } // Common headers - header.insert_header("Content-Type", "application/json").unwrap_or_default(); + header + .insert_header("Content-Type", "application/json") + .unwrap_or_default(); header .insert_header("Access-Control-Allow-Origin", "*") .unwrap_or_default(); @@ -46,8 +48,12 @@ impl ReverseProxy { .unwrap_or_default(); println!(); - info!("[RESPONSE {} {}] Header: {:?}", session.req_header().method, - session.req_header().uri.to_string(), header.headers); + info!( + "[RESPONSE {} {}] Header: {:?}", + session.req_header().method, + session.req_header().uri.to_string(), + header.headers + ); session.write_response_header_ref(&header).await } } @@ -65,13 +71,17 @@ impl ProxyHttp for ReverseProxy { _session: &mut Session, _ctx: &mut Self::CTX, ) -> pingora::Result> { - let peer: Box = - Box::new(HttpPeer::new("", false, "".to_string())); + let peer: Box = Box::new(HttpPeer::new("", false, "".to_string())); Ok(peer) } /// Handle request/response data by creating a new request to BE and respond to FP - async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> pingora::Result + #[cfg_attr(feature = "hotpath", hotpath::measure)] + async fn request_filter( + &self, + session: &mut Session, + ctx: &mut Self::CTX, + ) -> pingora::Result where Self::CTX: Send + Sync, { @@ -81,7 +91,11 @@ impl ProxyHttp for ReverseProxy { let request_summary = session.request_summary(); println!(); info!("[REQUEST {}] {:?}", request_summary, ctx.request); - info!("[REQUEST {}] Decoded body: {}", request_summary, String::from_utf8_lossy(&*ctx.get_request_body())); + info!( + "[REQUEST {}] Decoded body: {}", + request_summary, + String::from_utf8_lossy(&*ctx.get_request_body()) + ); println!(); let handler_response = self.router.call_handler(ctx).await; @@ -99,11 +113,17 @@ impl ProxyHttp for ReverseProxy { }; ReverseProxy::::set_headers(session, ctx, handler_response.status).await?; - info!("[RESPONSE {}] Body: {}", request_summary, String::from_utf8_lossy(&*response_bytes)); + info!( + "[RESPONSE {}] Body: {}", + request_summary, + String::from_utf8_lossy(&*response_bytes) + ); println!(); // Write the response body to the session after setting headers - session.write_response_body(Some(Bytes::from(response_bytes)), true).await?; + session + .write_response_body(Some(Bytes::from(response_bytes)), true) + .await?; Ok(true) } @@ -118,12 +138,8 @@ impl ProxyHttp for ReverseProxy { .response_written() .map_or(0, |resp| resp.status.as_u16()); - if !e.is_none() { - error!( - "{} error: {}", - self.request_summary(session, ctx), - e.as_ref().unwrap_or_default() - ); + if let Some(e) = e { + error!("{} error: {}", self.request_summary(session, ctx), e); } info!( diff --git a/reverse-proxy/src/tls_conf.rs b/reverse-proxy/src/tls_conf.rs index fea85c1..3111a9c 100644 --- a/reverse-proxy/src/tls_conf.rs +++ b/reverse-proxy/src/tls_conf.rs @@ -2,9 +2,9 @@ use boring::{ pkey::{PKey, Public}, ssl::{SslAlert, SslRef, SslVerifyError, SslVerifyMode}, }; -use log::debug; use pingora::{listeners::TlsAccept, protocols::tls::TlsRef}; use serde::Deserialize; +use tracing::{debug, error}; #[derive(Debug, Deserialize, Clone)] pub struct TlsConfig { @@ -21,7 +21,7 @@ impl TlsAccept for TlsConfig { // set the hostname for the SSL context ssl.set_hostname("localhost") .inspect_err(|e| { - log::error!("Failed to set hostname: {}", e); + error!("Failed to set hostname: {}", e); }) .unwrap(); @@ -29,12 +29,12 @@ impl TlsAccept for TlsConfig { { let key = PKey::private_key_from_pem(&self.key.clone().into_bytes()) .inspect_err(|e| { - log::error!("Failed to parse server private key: {}", e); + error!("Failed to parse server private key: {}", e); }) .unwrap(); ssl.set_private_key(&key) .inspect_err(|e| { - log::error!("Failed to set server private key: {}", e); + error!("Failed to set server private key: {}", e); }) .unwrap(); } @@ -43,13 +43,13 @@ impl TlsAccept for TlsConfig { { let cert = boring::x509::X509::from_pem(&self.cert.clone().into_bytes()) .inspect_err(|e| { - log::error!("Failed to parse server certificate: {}", e); + error!("Failed to parse server certificate: {}", e); }) .unwrap(); ssl.set_certificate(&cert) .inspect_err(|e| { - log::error!("Failed to set server certificate: {}", e); + error!("Failed to set server certificate: {}", e); }) .unwrap(); } @@ -57,13 +57,13 @@ impl TlsAccept for TlsConfig { // the CA certificate is used to verify the client certificate let ca_cert = boring::x509::X509::from_pem(&self.ca_cert.clone().into_bytes()) .inspect_err(|e| { - log::error!("Failed to parse CA certificate: {}", e); + error!("Failed to parse CA certificate: {}", e); }) .unwrap(); ssl.set_custom_verify_callback( SslVerifyMode::PEER, - Self::verify_callback(ca_cert.public_key().unwrap_or_default()), + Self::verify_callback(ca_cert.public_key().unwrap()), ); } } @@ -82,14 +82,14 @@ impl TlsConfig { ssl: &mut TlsRef, ) -> Result<(), SslVerifyError> { if ssl.verify_mode() != SslVerifyMode::PEER { - log::error!("SSL verify mode is not set to PEER, cannot verify client certificate"); + error!("SSL verify mode is not set to PEER, cannot verify client certificate"); return Err(SslVerifyError::Invalid(SslAlert::INTERNAL_ERROR)); } let client_cert = match ssl.peer_certificate() { Some(val) => val, None => { - log::error!("Failed to get client certificate"); + error!("Failed to get client certificate"); return Err(SslVerifyError::Invalid(SslAlert::NO_CERTIFICATE)); } }; @@ -98,8 +98,11 @@ impl TlsConfig { debug!("Debug Client certificate: {:?}", client_cert.subject_name()); // Verify the client certificate against the server's CA - if !client_cert.verify(&server_ca_public_key).unwrap_or_default() { - log::error!("Client certificate verification failed"); + if !client_cert + .verify(&server_ca_public_key) + .unwrap_or_default() + { + error!("Client certificate verification failed"); return Err(SslVerifyError::Invalid(SslAlert::BAD_CERTIFICATE)); } debug!("Client certificate verification succeeded"); diff --git a/utils/Cargo.toml b/utils/Cargo.toml index ce69d8b..5335662 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -4,12 +4,15 @@ version = "0.1.0" edition = "2024" [dependencies] -reqwest = { version="0.11", default-features=false, features=["json", "rustls-tls"] } +reqwest = { version = "0.11", default-features = false, features = [ + "json", + "rustls-tls", +] } uuid = { version = "1.16.0", features = ["v4"] } serde_json = "1.0.140" serde = { version = "1.0.219", features = ["derive"] } base64 = "0.21.7" -log = "0.4.27" +tracing = "0.1" chrono = "0.4.40" jsonwebtoken = "9.3.1" url = "2.5.4" diff --git a/utils/src/lib.rs b/utils/src/lib.rs index 7fd1d08..38c6555 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -11,7 +11,7 @@ use uuid::Uuid; use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use serde::{Deserialize, Serialize}; -use log::error; +use tracing::error; pub fn to_reqwest_header(map: HashMap) -> HeaderMap { let mut header_map = HeaderMap::new();