diff --git a/.gitignore b/.gitignore index 847c65c..c970dc3 100644 --- a/.gitignore +++ b/.gitignore @@ -28,7 +28,7 @@ test/cypress/screenshots **/Cargo.lock -docker/layer8-volumes/influxdb2-data +docker/influxdb2-data docker/layer8-volumes/pg-data certs/ntor/*.pem diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 684f117..b38d5d4 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -53,7 +53,7 @@ services: depends_on: auth-postgres: condition: service_healthy - auth-influxdb2: + influxdb2: condition: service_healthy auth-telegraf: condition: service_healthy @@ -64,14 +64,22 @@ services: layer8-network: ipv4_address: 10.10.10.103 - auth-influxdb2: - container_name: auth-influxdb2 + influxdb2: + container_name: influxdb2 image: influxdb:2 restart: always ports: - 8086:8086 volumes: - - ./layer8-volumes/influxdb2-data:/var/lib/influxdb2 + - ./influxdb2-data:/var/lib/influxdb2 + environment: + - DOCKER_INFLUXDB_INIT_MODE=setup + - DOCKER_INFLUXDB_INIT_USERNAME=admin + - DOCKER_INFLUXDB_INIT_PASSWORD=12341234 + - DOCKER_INFLUXDB_INIT_ORG=globeandcitizen + - DOCKER_INFLUXDB_INIT_BUCKET=layer8-bucket + - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=layer8-admin-token + - DOCKER_INFLUXDB_INIT_RETENTION=30d healthcheck: test: [ "CMD", "curl", "-f", "http://localhost:8086/health" ] interval: 5s @@ -135,7 +143,7 @@ services: depends_on: auth-postgres: condition: service_healthy - auth-influxdb2: + influxdb2: condition: service_healthy auth-telegraf: condition: service_healthy diff --git a/docker/layer8-volumes/.env.docker b/docker/layer8-volumes/.env.docker index eec8182..0387a41 100644 --- a/docker/layer8-volumes/.env.docker +++ b/docker/layer8-volumes/.env.docker @@ -22,9 +22,9 @@ INFLUXDB_URL=http://10.10.10.104:8086 INFLUXDB_URL_TELEGRAF=http://host.docker.internal:8086 INFLUXDB_USERNAME=admin INFLUXDB_PASSWORD=somethingthatyoudontknow -INFLUXDB_ORG=layer8 -INFLUXDB_BUCKET=layer8 -INFLUXDB_TOKEN=DEFAULT_TOKEN_FOR_TESTING +INFLUXDB_ORG=globeandcitizen +INFLUXDB_BUCKET=layer8-bucket +INFLUXDB_TOKEN=layer8-admin-token OTEL_EXPORTER_OTLP_ENDPOINT=10.10.10.105:4317 CREATE_TEST_USER=true diff --git a/forward-proxy/.env.dev b/forward-proxy/.env.dev index 31b3166..5b1546f 100644 --- a/forward-proxy/.env.dev +++ b/forward-proxy/.env.dev @@ -5,7 +5,12 @@ PATH_TO_SERVER_CONF="../server_conf.yml" # Logging configurations LOG_LEVEL=trace +# default to "json" if not "plain" +LOG_FORMAT=plain +# "console" or folder path LOG_PATH=console +# leave empty if LOG_PATH is console +LOG_FILENAME=forward-proxy.log # Handler configurations JWT_VIRTUAL_CONNECTION_KEY=secret @@ -19,3 +24,8 @@ CA_CERT="-----BEGIN CERTIFICATE-----\nMIIF0zCCA7ugAwIBAgIUQbJs4Jhd2NiSeL0Y+SZ4z7 CERT="-----BEGIN CERTIFICATE-----\nMIIFtTCCA52gAwIBAgIUXKLNUFsfUc3xSpmCKj35dAP3oe8wDQYJKoZIhvcNAQEL\nBQAwcTELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRUwEwYDVQQHDAxTYW5GcmFu\nY2lzY28xGDAWBgNVBAoMD0dsb2JlQW5kQ2l0aXplbjEPMA0GA1UECwwGTGF5ZXI4\nMRMwEQYDVQQDDAptVExTUm9vdENBMB4XDTI1MDkyNDE5MTAyNVoXDTM1MDkyMjE5\nMTAyNVowdDELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRUwEwYDVQQHDAxTYW5G\ncmFuY2lzY28xGDAWBgNVBAoMD0dsb2JlQW5kQ2l0aXplbjEPMA0GA1UECwwGTGF5\nZXI4MRYwFAYDVQQDDA1mb3J3YXJkLXByb3h5MIICIjANBgkqhkiG9w0BAQEFAAOC\nAg8AMIICCgKCAgEAq4JLgH1rmbSe2/6VFydzdxGsq0oDE9cKOagWxdd9lFqen+LL\nB+vDFV6d0n+CHDkhLsgi5ASGhSl5o232FI/Tgf4YqHQ0C3touGs2ysuI51WT2hzV\nPOFHemgveHNKk+yVHabZp5ypm4kYgMegknhFAtKytAC4g6IcHUX+t9gz+ucA0D8C\n+kuYJiaVd2IML8rF4RCGruZ6WuOcLeHF7byfucT/r9tT3IhdbHtLWOOw6mmtRuAl\nUexiXN+ipxmaAgSyAZrR9CwOo2xp9p5XL2Mn3VKHa6AGZya8QobnN16Dm+JOCoIl\n/C8b7w2H9mC6/GA2kmdxuD0+MyWFnK9DrbT+VYBgubXHeyKAR9mddn0ur7WNBeMl\ntNKufyS9J0Od3S8dERjw05Fl7jGJJj82B1zR/bKbq3v3IIteeTHm6sea2fV0vIw1\n2MBUF5ax60Jdeo71vHf4CVg01F3LBb9ar1LRVNgvrqThach6CDbvHPICXEFHV9OP\nil8PHIgUwon5iySfzfjPxIF0EnkGUR7B5iFYlcG1vSvbSW30sGQ67a9RWBfIll1m\nON1PhLZ5rnslgj6KxNReheaLSe/u4f17QxMrwLw/W3vwOm2SymUcMpwCAaLYtxd+\n6W6zaKy9g0wXeUaEbKBzaOdLqixJ4mL6NVYjEqlXJ5BuQ25iauafpRJz/vcCAwEA\nAaNCMEAwHQYDVR0OBBYEFHrJpAogeNfUj4GhwcrHVYMzXhMeMB8GA1UdIwQYMBaA\nFJOQTjwIZb078bHcHzxNnkUiCKW1MA0GCSqGSIb3DQEBCwUAA4ICAQBdQuPBEGma\nU4puro0xFlpIeOVOUf7eVMHeJuBCiDmBr8C+kae9XuShRYP/OMgGxQ7Y95BJo/C7\nTMwF5ugDQrsRod4EaTEO8ZXGcZq91v4YfegAJ9gQpR/OdniO5dDxfPMlY/VZEyK9\nwHIaUVUP+fiWk9yDwuTPZz+0xF3x2oujBO0JZTsHAbGaZ0CtNW1M5JwFF6aVD5Fs\nPave34wfDEbUg/7WKnZy6pNoxlkgdEsS1QThQ817pE9tTpGJS+9r+JjTSg9AL/W9\nQTvBB96WUsrRqJ2V+XXZ6FjDzPmWC4cfABh24S1NIdygB6uUJ6WLg8HN9tIjH+6O\noDvxCBjUE1444O6c9Z6cdxBvdICrIRN9wc1WhspBqRf4nXtRidFyrV2KDW9TMQXZ\nMOGXrhqgC1lwtKTc2iT8x74qQe8ARSArKNeTjJEiGwbc7+9wG8lf0b0zKWxwlTmg\nmAy8jVEkHmLvkZqKmaW59wUxkaAiBO2TNwjiwDkwPY8ET6DyGd/x4oG6QdocKF87\naOetHnMjTSpUj01sKFpveyTqoAF0UNilmJsUE2AO0BHsxj7Pe+ysLx1yUPiuKJX2\nUMHK/obbGgwu/5BGDXoafQlYu77ZyL/4oqnGYk/s7IGJaw/yVOrmGuuaF0h8RJfV\nOYxd3wTVC8r46A49VExTqvuYdwaAiovdaw==\n-----END CERTIFICATE-----" KEY="-----BEGIN PRIVATE KEY-----\nMIIJRAIBADANBgkqhkiG9w0BAQEFAASCCS4wggkqAgEAAoICAQCrgkuAfWuZtJ7b\n/pUXJ3N3EayrSgMT1wo5qBbF132UWp6f4ssH68MVXp3Sf4IcOSEuyCLkBIaFKXmj\nbfYUj9OB/hiodDQLe2i4azbKy4jnVZPaHNU84Ud6aC94c0qT7JUdptmnnKmbiRiA\nx6CSeEUC0rK0ALiDohwdRf632DP65wDQPwL6S5gmJpV3YgwvysXhEIau5npa45wt\n4cXtvJ+5xP+v21PciF1se0tY47Dqaa1G4CVR7GJc36KnGZoCBLIBmtH0LA6jbGn2\nnlcvYyfdUodroAZnJrxChuc3XoOb4k4KgiX8LxvvDYf2YLr8YDaSZ3G4PT4zJYWc\nr0OttP5VgGC5tcd7IoBH2Z12fS6vtY0F4yW00q5/JL0nQ53dLx0RGPDTkWXuMYkm\nPzYHXNH9spure/cgi155Mebqx5rZ9XS8jDXYwFQXlrHrQl16jvW8d/gJWDTUXcsF\nv1qvUtFU2C+upOFpyHoINu8c8gJcQUdX04+KXw8ciBTCifmLJJ/N+M/EgXQSeQZR\nHsHmIViVwbW9K9tJbfSwZDrtr1FYF8iWXWY43U+EtnmueyWCPorE1F6F5otJ7+7h\n/XtDEyvAvD9be/A6bZLKZRwynAIBoti3F37pbrNorL2DTBd5RoRsoHNo50uqLEni\nYvo1ViMSqVcnkG5DbmJq5p+lEnP+9wIDAQABAoICACmQw1GZk9lFf/abJXDeG8qw\nmuNMZaCKTi0ZAqPiDMpGiAkBwujhh38HVkJsqpDCe7tFv8b5Hczp91PXU3s6PC1V\n8o3o07AwsXl4amgNmdlO0S1cLYW6p0MQOuj7MAjXnm/4PumzOxu5xxl2yACXa0o6\n3Bppzk4AnMWvcAMIP9i/4V+W1dbpOS+NzE2JkqCGiRx5j9qVevPKE9C+1eQ/AYrZ\nJoptIk7hMZsX6nPZgsfc4qS5r/HB0zjk7huHRd7VWnqvFdESWF3c6XVefIy8gC3Q\nUYeQ2dxn89o/rYuquUSvPPCpCCGtHRz7b4cTfF2rx64FqfbXyNpGbrJBe6p+oeCb\nRnB0vnEmiEIvw4C0P0rGz9nOS69TaphTfRFVmDOFgwYDNzZz3DD/YdsQY1XMQ6pH\n+m4Nr7+EPjFk6AYSgvbmRLL8WGGI6Tv9/ezVvdflPClYLMjOGIyVPRfOldgad01T\nlI/vWDHITWJZqHwh4VBZvQaUeoRPcuabMeBIV3d7FBcsQqOwRbpbL/Piu8ydZJ8Y\n+5gUYfh/F7ouJGyuFm77R2uEhSjpbJMxu1Mku2SqG2gAime5YJBRxExUAXky9QyO\nXic4XxX67iFA4P3ATkFA5kQF3m+1uuwfYC3gIs+aiZuAuQuZ4OO5Ay3TxFCMeuTk\nfsS+COfEhsdXUG7Wn2aZAoIBAQDYClEGkfeftTvyXYbISRGeG/Nni3UUS4aNSIsv\nRNlAKnD+n60VwiBpOTr6IEP1MB1eseQvvNVqiemb5Oa8jaDV8Gkon0uBHTBQo62K\nFIRMsM8jsMGvO0RRtd/2iWRu1WnzqzA5TXX2sN/whjxeT0DUrnGKsqUI6PT8STjj\nQo/IHT7uulJPnbVZ+G+CZyQKjaBaOOy6VMYsDXhMm71zNQkM2AgzOYSLwpqtSAHM\nwHWHGkZGvetsVL6hHcLcWVxkuo7saeMcLMPfYBLkV8oX/HayZ0oc+x2RYslQwokq\ng7oFEsvQjb166MXD5gSaOjAkjb9FshBsxU5XKC3gA1xqbOo1AoIBAQDLO2Izu1Nu\nhsJ4UMWgfeYSf9/i8T5CXGYs9VytYNCFpBc530RQ4olu+SY9vrUt3+OW3xeDtWX/\nMm/pq0DxpjkvD5y5fYnsBI2Cd3o3L6WKEsqKnBOJbS5omrcwWSOO18z+8+BF9O55\n+RTR8dWHw+XxvajFTqzcLojtvExyvRwKI3WVaNtZWgU7YmsVxoQ93/PIYIKgdbhI\nv+tbc8DuxnbDA+qJhFipuTI07fclE/hupmgmd2sDiFjiyPbk85dXpLsxnAhlE9ZT\n2YSBhVQz0a68CRiVUcoWv7FsMKlgJtfICmzDH3qrB00Dp7FhXinlubrgMWy+kw5v\n6k/h2WqZhIn7AoIBAQDMOPRfQZzTXH9OnRrMOkZtL/7n4uzKQpru86SfCnZUMcqe\n6FK2Psxkq7UUvWuAW/tniMIsXlVgYP50X+2+UCO6GYlO3UaCxxTlJdTmsn5eAMXO\n90ggXeY3V7ZfV4GZRCTkMu9jO9ZHXOxUcpCelkyywDSU6EsaIR11X8JnEoTYpszW\n30rv+CV252KB4v4u+7KZlzYw7fJnslQGFzL/tSLZAV6/DaA+fbe6FledNlHjZPMJ\n7H6f6XxK0ddidRbiIXj6Ax6tg6OlhSxWrqZcBkwuWXW176wDw16K+Vqw1dUC9sG0\nZEi551EL3mR7ZoYcB+LH/4uHRvzHZzP2jzbNZCgBAoIBAQCEuVN41W23UOrQCHAI\nUDBhBIICg+pVDGLuGY9c601C+dbxRI4pBMkcYDpJOLK6Mu0/KpMAwQbLkvTjdaQE\nLLpLsbZ4rTPVn2OLQNvgDo3djkgYHbXkmhkk12WrfYtrTiPinQJqrXrQzYp7UaRR\n9e3F4kbGFItvgDSMjdyfUkFtnZq86K3XvKKOFcg5gFv8zLU4t06X3EltuWjLYN0v\nEw2cboJNLNF6hifzyTUOUex81tBNzs9kjzb9ZKFZBHxiEILv8ybIXBwsxnFy5NAI\nx3eF9arIWZHRKX+FWIJE+RkS2zwMchJ6f1ocePeuzwAttw4EPEL4crGLBUsGBCdJ\n+vThAoIBAQC1KrUB170d08W6WMdK8sbzL67M5pj8B44yp6650nNAVJhXMRuw846X\nH1KqUzV1elQdmvKJDogkUYllRVByHGX05Lr5BGAkmDxBMqYxUn56IhEfQ18ejy6x\nzUFwg4hLLWZzNqkIwGNb5B9wFzHFy39vwIlsy1mMIswxjucsH/lENRaPF2Lr5Tia\n1VRDJqXSZ2ytGyKsgBhFXsZUXmnxBNmjFuI62ROhOlrYJ3bwfTRDwkqQQWxPDj7f\nfTK7R5D69flh4CBEM6oSIZm7KEMXW/C5AkYc/jjW3/bDNuVy7JpZ/aokTxHCqW8e\n7zyoZqd4bAj7xvPuj8lMdThXtKSz86hK\n-----END PRIVATE KEY-----" +# influxDB +INFLUXDB_URL=http://localhost:8086 +INFLUXDB_ORG=layer8 +INFLUXDB_BUCKET=layer8 +INFLUXDB_AUTH_TOKEN=DEFAULT_TOKEN_FOR_TESTING diff --git a/forward-proxy/.env.docker b/forward-proxy/.env.docker index 6afbcb9..9b03c53 100644 --- a/forward-proxy/.env.docker +++ b/forward-proxy/.env.docker @@ -5,9 +5,13 @@ PATH_TO_SERVER_CONF="../server_conf.yml" # not yet used # Logging configurations LOG_LEVEL=trace +# default to "json" if not "plain" +LOG_FORMAT=plain +# can be set to 'console' or a directory path +#LOG_PATH=/var/log/layer8 LOG_PATH=console -# can be set to 'console' or a file path -#LOG_PATH=/var/log/layer8/log.txt +# leave empty if LOG_PATH is console +LOG_FILENAME=forward-proxy.log # Handler configurations JWT_VIRTUAL_CONNECTION_KEY=secret @@ -20,3 +24,9 @@ ENABLE_TLS=true CA_CERT="-----BEGIN CERTIFICATE-----\nMIIF0zCCA7ugAwIBAgIUQbJs4Jhd2NiSeL0Y+SZ4z7W6l7swDQYJKoZIhvcNAQEL\nBQAwcTELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRUwEwYDVQQHDAxTYW5GcmFu\nY2lzY28xGDAWBgNVBAoMD0dsb2JlQW5kQ2l0aXplbjEPMA0GA1UECwwGTGF5ZXI4\nMRMwEQYDVQQDDAptVExTUm9vdENBMB4XDTI1MDkyNDE5MDQ1OFoXDTM1MDkyMjE5\nMDQ1OFowcTELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRUwEwYDVQQHDAxTYW5G\ncmFuY2lzY28xGDAWBgNVBAoMD0dsb2JlQW5kQ2l0aXplbjEPMA0GA1UECwwGTGF5\nZXI4MRMwEQYDVQQDDAptVExTUm9vdENBMIICIjANBgkqhkiG9w0BAQEFAAOCAg8A\nMIICCgKCAgEAmQsqgvtgfiQvhaziGzWMAZRzCViScRCrwkvmeE2U6Q/etk1PacuH\ndkJAvDQgLBsXCThg6G0hjw6BHX/i5seBQ6XHBNXEhAR70X3Y9VDCOoOLbqI22XAH\nZrX5gswBUzXbxxbxPrc7lK8N8s2M/LS4ADo9QWu4gtDnfr2eEWQR1P7o8llIn3z+\nGJStvzD0Jlfa39Gv9sIjPsoe7hvSpDiYqwGYDpeBLxn2SU39tyyyxDOH5NrpHG0z\nYMh20kK/0qkiVknd7/oqbSIv1EKfLLnw7p09LmsaHhlR+w+kQt6Z2Dzvae1qMj/D\nWnf1fgBOThZdnfUDznFGLjhPpGIvyjB8zk16wgYW7OIslqqbPPSLWm3ls3qpU0pl\nn2ROOWSLufXobh0Ly8vLLGVN8mwYeaTyXK4ELC0ScrpzBJbQ0mN3xulCMulk54iZ\nLy8KA1DtiwWyyV1cCvb36kfEPavU9zCYUgjvpVp3+fato0DBqe4ukMBUlb5C9qQ0\nKEkz/z75yWAe6bSVqXnYVuIihoxumM6oRMJwfOeOU0yyIfIkhwmDxw+yqeUDVibw\nseq5Ul4qVulocSAYPUX4ym/rPqC0SI+Z3EJZSLbiwjh1jmMiLbrsguYROSIOFrVO\noyYmjCUnc5Dxs04Q8VK80NuDFxCVEaK19esxFzCcFYDTiXoocem6EccCAwEAAaNj\nMGEwHQYDVR0OBBYEFJOQTjwIZb078bHcHzxNnkUiCKW1MB8GA1UdIwQYMBaAFJOQ\nTjwIZb078bHcHzxNnkUiCKW1MA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQD\nAgGGMA0GCSqGSIb3DQEBCwUAA4ICAQAyCYD4IShYuU66OQAPz0/P4rVUr+mfKX0s\nYhnWRY+hLm4M0zp51/H9XGnHUZwGf3No3I0vWFDZHZShKt/GemCZy/WOJzjDGH71\ncNraMm/Bn2c7RXEqzK0qaKdgVo4vrS9inGExGtTRywEc+fLgH+myVR+Dy6JfxpeW\nFarMpu5AyoAWYRwUVaEQXotBF/XZHetaYr1mk9p3qNQkTEnv8zOWFF5Sv5R0I3XM\nABqk5Lz2WyRJIIL0g/9TdWZF88KrQhnr1q9JnTgAg3aTvyTR8c+Oik+BlRUe1zw1\n7TUvuVJkR50uEKZ0IVdWGtNCEzPl1PPMxzUAomozhZRh7vhuBk+yThaDByinaSvC\nxVzufk3hjrnPA0+ZyD4nVcMs3oivl5Vxli/KJrdYiQoErkbTmEgDCzSm0g/14BvJ\nHmlpITgolQGA1SFQCJcrioYn1Eyv4hSrnQtwP+t3t/G4OHbbEeu2pgqvZKEMngNL\naUGla0fwJtytoAqspWxuFwYxJWl5cse1xpsbrf5aiDphqCfSZARr8jzeYtYF8sex\n/8yzy1YmgDlSLng0kFqjLgP8SHLEgKMCOCP/haAHBaDqIu/9FMPofqlG0YJuqTB3\ndf0zohxxcIDZn17qElOVCRoUQVoz2rbYKHHZ8/MVWM7lPmUIqt2G6HlcMo1xYUa4\nfgCQMqEubg==\n-----END CERTIFICATE-----" CERT="-----BEGIN CERTIFICATE-----\nMIIFtTCCA52gAwIBAgIUXKLNUFsfUc3xSpmCKj35dAP3oe8wDQYJKoZIhvcNAQEL\nBQAwcTELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRUwEwYDVQQHDAxTYW5GcmFu\nY2lzY28xGDAWBgNVBAoMD0dsb2JlQW5kQ2l0aXplbjEPMA0GA1UECwwGTGF5ZXI4\nMRMwEQYDVQQDDAptVExTUm9vdENBMB4XDTI1MDkyNDE5MTAyNVoXDTM1MDkyMjE5\nMTAyNVowdDELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRUwEwYDVQQHDAxTYW5G\ncmFuY2lzY28xGDAWBgNVBAoMD0dsb2JlQW5kQ2l0aXplbjEPMA0GA1UECwwGTGF5\nZXI4MRYwFAYDVQQDDA1mb3J3YXJkLXByb3h5MIICIjANBgkqhkiG9w0BAQEFAAOC\nAg8AMIICCgKCAgEAq4JLgH1rmbSe2/6VFydzdxGsq0oDE9cKOagWxdd9lFqen+LL\nB+vDFV6d0n+CHDkhLsgi5ASGhSl5o232FI/Tgf4YqHQ0C3touGs2ysuI51WT2hzV\nPOFHemgveHNKk+yVHabZp5ypm4kYgMegknhFAtKytAC4g6IcHUX+t9gz+ucA0D8C\n+kuYJiaVd2IML8rF4RCGruZ6WuOcLeHF7byfucT/r9tT3IhdbHtLWOOw6mmtRuAl\nUexiXN+ipxmaAgSyAZrR9CwOo2xp9p5XL2Mn3VKHa6AGZya8QobnN16Dm+JOCoIl\n/C8b7w2H9mC6/GA2kmdxuD0+MyWFnK9DrbT+VYBgubXHeyKAR9mddn0ur7WNBeMl\ntNKufyS9J0Od3S8dERjw05Fl7jGJJj82B1zR/bKbq3v3IIteeTHm6sea2fV0vIw1\n2MBUF5ax60Jdeo71vHf4CVg01F3LBb9ar1LRVNgvrqThach6CDbvHPICXEFHV9OP\nil8PHIgUwon5iySfzfjPxIF0EnkGUR7B5iFYlcG1vSvbSW30sGQ67a9RWBfIll1m\nON1PhLZ5rnslgj6KxNReheaLSe/u4f17QxMrwLw/W3vwOm2SymUcMpwCAaLYtxd+\n6W6zaKy9g0wXeUaEbKBzaOdLqixJ4mL6NVYjEqlXJ5BuQ25iauafpRJz/vcCAwEA\nAaNCMEAwHQYDVR0OBBYEFHrJpAogeNfUj4GhwcrHVYMzXhMeMB8GA1UdIwQYMBaA\nFJOQTjwIZb078bHcHzxNnkUiCKW1MA0GCSqGSIb3DQEBCwUAA4ICAQBdQuPBEGma\nU4puro0xFlpIeOVOUf7eVMHeJuBCiDmBr8C+kae9XuShRYP/OMgGxQ7Y95BJo/C7\nTMwF5ugDQrsRod4EaTEO8ZXGcZq91v4YfegAJ9gQpR/OdniO5dDxfPMlY/VZEyK9\nwHIaUVUP+fiWk9yDwuTPZz+0xF3x2oujBO0JZTsHAbGaZ0CtNW1M5JwFF6aVD5Fs\nPave34wfDEbUg/7WKnZy6pNoxlkgdEsS1QThQ817pE9tTpGJS+9r+JjTSg9AL/W9\nQTvBB96WUsrRqJ2V+XXZ6FjDzPmWC4cfABh24S1NIdygB6uUJ6WLg8HN9tIjH+6O\noDvxCBjUE1444O6c9Z6cdxBvdICrIRN9wc1WhspBqRf4nXtRidFyrV2KDW9TMQXZ\nMOGXrhqgC1lwtKTc2iT8x74qQe8ARSArKNeTjJEiGwbc7+9wG8lf0b0zKWxwlTmg\nmAy8jVEkHmLvkZqKmaW59wUxkaAiBO2TNwjiwDkwPY8ET6DyGd/x4oG6QdocKF87\naOetHnMjTSpUj01sKFpveyTqoAF0UNilmJsUE2AO0BHsxj7Pe+ysLx1yUPiuKJX2\nUMHK/obbGgwu/5BGDXoafQlYu77ZyL/4oqnGYk/s7IGJaw/yVOrmGuuaF0h8RJfV\nOYxd3wTVC8r46A49VExTqvuYdwaAiovdaw==\n-----END CERTIFICATE-----" KEY="-----BEGIN PRIVATE KEY-----\nMIIJRAIBADANBgkqhkiG9w0BAQEFAASCCS4wggkqAgEAAoICAQCrgkuAfWuZtJ7b\n/pUXJ3N3EayrSgMT1wo5qBbF132UWp6f4ssH68MVXp3Sf4IcOSEuyCLkBIaFKXmj\nbfYUj9OB/hiodDQLe2i4azbKy4jnVZPaHNU84Ud6aC94c0qT7JUdptmnnKmbiRiA\nx6CSeEUC0rK0ALiDohwdRf632DP65wDQPwL6S5gmJpV3YgwvysXhEIau5npa45wt\n4cXtvJ+5xP+v21PciF1se0tY47Dqaa1G4CVR7GJc36KnGZoCBLIBmtH0LA6jbGn2\nnlcvYyfdUodroAZnJrxChuc3XoOb4k4KgiX8LxvvDYf2YLr8YDaSZ3G4PT4zJYWc\nr0OttP5VgGC5tcd7IoBH2Z12fS6vtY0F4yW00q5/JL0nQ53dLx0RGPDTkWXuMYkm\nPzYHXNH9spure/cgi155Mebqx5rZ9XS8jDXYwFQXlrHrQl16jvW8d/gJWDTUXcsF\nv1qvUtFU2C+upOFpyHoINu8c8gJcQUdX04+KXw8ciBTCifmLJJ/N+M/EgXQSeQZR\nHsHmIViVwbW9K9tJbfSwZDrtr1FYF8iWXWY43U+EtnmueyWCPorE1F6F5otJ7+7h\n/XtDEyvAvD9be/A6bZLKZRwynAIBoti3F37pbrNorL2DTBd5RoRsoHNo50uqLEni\nYvo1ViMSqVcnkG5DbmJq5p+lEnP+9wIDAQABAoICACmQw1GZk9lFf/abJXDeG8qw\nmuNMZaCKTi0ZAqPiDMpGiAkBwujhh38HVkJsqpDCe7tFv8b5Hczp91PXU3s6PC1V\n8o3o07AwsXl4amgNmdlO0S1cLYW6p0MQOuj7MAjXnm/4PumzOxu5xxl2yACXa0o6\n3Bppzk4AnMWvcAMIP9i/4V+W1dbpOS+NzE2JkqCGiRx5j9qVevPKE9C+1eQ/AYrZ\nJoptIk7hMZsX6nPZgsfc4qS5r/HB0zjk7huHRd7VWnqvFdESWF3c6XVefIy8gC3Q\nUYeQ2dxn89o/rYuquUSvPPCpCCGtHRz7b4cTfF2rx64FqfbXyNpGbrJBe6p+oeCb\nRnB0vnEmiEIvw4C0P0rGz9nOS69TaphTfRFVmDOFgwYDNzZz3DD/YdsQY1XMQ6pH\n+m4Nr7+EPjFk6AYSgvbmRLL8WGGI6Tv9/ezVvdflPClYLMjOGIyVPRfOldgad01T\nlI/vWDHITWJZqHwh4VBZvQaUeoRPcuabMeBIV3d7FBcsQqOwRbpbL/Piu8ydZJ8Y\n+5gUYfh/F7ouJGyuFm77R2uEhSjpbJMxu1Mku2SqG2gAime5YJBRxExUAXky9QyO\nXic4XxX67iFA4P3ATkFA5kQF3m+1uuwfYC3gIs+aiZuAuQuZ4OO5Ay3TxFCMeuTk\nfsS+COfEhsdXUG7Wn2aZAoIBAQDYClEGkfeftTvyXYbISRGeG/Nni3UUS4aNSIsv\nRNlAKnD+n60VwiBpOTr6IEP1MB1eseQvvNVqiemb5Oa8jaDV8Gkon0uBHTBQo62K\nFIRMsM8jsMGvO0RRtd/2iWRu1WnzqzA5TXX2sN/whjxeT0DUrnGKsqUI6PT8STjj\nQo/IHT7uulJPnbVZ+G+CZyQKjaBaOOy6VMYsDXhMm71zNQkM2AgzOYSLwpqtSAHM\nwHWHGkZGvetsVL6hHcLcWVxkuo7saeMcLMPfYBLkV8oX/HayZ0oc+x2RYslQwokq\ng7oFEsvQjb166MXD5gSaOjAkjb9FshBsxU5XKC3gA1xqbOo1AoIBAQDLO2Izu1Nu\nhsJ4UMWgfeYSf9/i8T5CXGYs9VytYNCFpBc530RQ4olu+SY9vrUt3+OW3xeDtWX/\nMm/pq0DxpjkvD5y5fYnsBI2Cd3o3L6WKEsqKnBOJbS5omrcwWSOO18z+8+BF9O55\n+RTR8dWHw+XxvajFTqzcLojtvExyvRwKI3WVaNtZWgU7YmsVxoQ93/PIYIKgdbhI\nv+tbc8DuxnbDA+qJhFipuTI07fclE/hupmgmd2sDiFjiyPbk85dXpLsxnAhlE9ZT\n2YSBhVQz0a68CRiVUcoWv7FsMKlgJtfICmzDH3qrB00Dp7FhXinlubrgMWy+kw5v\n6k/h2WqZhIn7AoIBAQDMOPRfQZzTXH9OnRrMOkZtL/7n4uzKQpru86SfCnZUMcqe\n6FK2Psxkq7UUvWuAW/tniMIsXlVgYP50X+2+UCO6GYlO3UaCxxTlJdTmsn5eAMXO\n90ggXeY3V7ZfV4GZRCTkMu9jO9ZHXOxUcpCelkyywDSU6EsaIR11X8JnEoTYpszW\n30rv+CV252KB4v4u+7KZlzYw7fJnslQGFzL/tSLZAV6/DaA+fbe6FledNlHjZPMJ\n7H6f6XxK0ddidRbiIXj6Ax6tg6OlhSxWrqZcBkwuWXW176wDw16K+Vqw1dUC9sG0\nZEi551EL3mR7ZoYcB+LH/4uHRvzHZzP2jzbNZCgBAoIBAQCEuVN41W23UOrQCHAI\nUDBhBIICg+pVDGLuGY9c601C+dbxRI4pBMkcYDpJOLK6Mu0/KpMAwQbLkvTjdaQE\nLLpLsbZ4rTPVn2OLQNvgDo3djkgYHbXkmhkk12WrfYtrTiPinQJqrXrQzYp7UaRR\n9e3F4kbGFItvgDSMjdyfUkFtnZq86K3XvKKOFcg5gFv8zLU4t06X3EltuWjLYN0v\nEw2cboJNLNF6hifzyTUOUex81tBNzs9kjzb9ZKFZBHxiEILv8ybIXBwsxnFy5NAI\nx3eF9arIWZHRKX+FWIJE+RkS2zwMchJ6f1ocePeuzwAttw4EPEL4crGLBUsGBCdJ\n+vThAoIBAQC1KrUB170d08W6WMdK8sbzL67M5pj8B44yp6650nNAVJhXMRuw846X\nH1KqUzV1elQdmvKJDogkUYllRVByHGX05Lr5BGAkmDxBMqYxUn56IhEfQ18ejy6x\nzUFwg4hLLWZzNqkIwGNb5B9wFzHFy39vwIlsy1mMIswxjucsH/lENRaPF2Lr5Tia\n1VRDJqXSZ2ytGyKsgBhFXsZUXmnxBNmjFuI62ROhOlrYJ3bwfTRDwkqQQWxPDj7f\nfTK7R5D69flh4CBEM6oSIZm7KEMXW/C5AkYc/jjW3/bDNuVy7JpZ/aokTxHCqW8e\n7zyoZqd4bAj7xvPuj8lMdThXtKSz86hK\n-----END PRIVATE KEY-----" + +# influxDB +INFLUXDB_URL=http://localhost:8086 +INFLUXDB_ORG=layer8 +INFLUXDB_BUCKET=layer8 +INFLUXDB_AUTH_TOKEN=DEFAULT_TOKEN_FOR_TESTING diff --git a/forward-proxy/Cargo.toml b/forward-proxy/Cargo.toml index ec1ced1..d32c48a 100644 --- a/forward-proxy/Cargo.toml +++ b/forward-proxy/Cargo.toml @@ -9,9 +9,8 @@ bytes = "1.10.1" clap = { version = "3.2.25", features = ["derive"] } serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" -log = "0.4.27" chrono = "0.4.40" -reqwest = { version="0.11", default-features=false, features=["json", "rustls-tls"] } +reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } tokio = { version = "1.44.2", features = ["rt-multi-thread", "macros"] } pingora = { version = "0.5.0", features = ["lb", "boringssl"] } jsonwebtoken = "9.3.1" @@ -21,7 +20,9 @@ pingora-router = { path = "../pingora-router", version = "0.1.0" } once_cell = "1.21.3" pingora-error = "0.5.0" boring = "4.17.0" -env_logger = "0.11.8" utils = { path = "../utils", version = "0.1.0" } hex = "0.4.3" envy = "0.4.2" +tracing = "0.1.41" +influxdb2 = { version = "0.5.2", default-features = false, features = ["rustls"] } + diff --git a/forward-proxy/Dockerfile b/forward-proxy/Dockerfile index 2b9affd..7aaaf26 100644 --- a/forward-proxy/Dockerfile +++ b/forward-proxy/Dockerfile @@ -20,7 +20,6 @@ RUN apt-get update && apt-get install -y \ WORKDIR /usr/src/app # Copy source code -COPY ./certs ./certs COPY ./pingora-router ./pingora-router COPY ./utils ./utils COPY ./forward-proxy ./forward-proxy @@ -44,7 +43,6 @@ RUN useradd -m layer8 # Copy only the built binary from the builder stage COPY --from=builder /usr/src/app/forward-proxy/target/release/forward-proxy /usr/local/bin/forward-proxy -COPY --from=builder /usr/src/app/certs /usr/local/certs # Switch to non-root user USER layer8 diff --git a/forward-proxy/src/config.rs b/forward-proxy/src/config.rs index 92e7880..34cf625 100644 --- a/forward-proxy/src/config.rs +++ b/forward-proxy/src/config.rs @@ -11,13 +11,20 @@ pub struct FPConfig { #[serde(flatten)] pub tls_config: TlsConfig, #[serde(flatten)] // This flattens the HandlerConfig fields into this struct - pub handler_config: HandlerConfig + pub handler_config: HandlerConfig, + #[serde(flatten)] + pub influxdb_config: InfluxDBConfig, } #[derive(Debug, Deserialize)] pub struct LogConfig { - pub log_path: String, pub log_level: String, + /// default to "json" if not "plain" + pub log_format: String, + /// "console" or folder path + pub log_path: String, + /// required if log_path is not "console" + pub log_filename: String, } #[derive(Debug, Deserialize)] @@ -39,25 +46,10 @@ pub struct TlsConfig { pub key: String, } -impl LogConfig { - pub fn to_level_filter(&self) -> log::LevelFilter { - match self.log_level.to_uppercase().as_str() { - "INFO" => log::LevelFilter::Info, - "DEBUG" => log::LevelFilter::Debug, - "WARNING" => log::LevelFilter::Warn, - "ERROR" => log::LevelFilter::Error, - "TRACE" => log::LevelFilter::Trace, - "OFF" => log::LevelFilter::Off, - _ => log::max_level() - } - } -} - -impl TlsConfig { - pub fn load(&mut self) -> Result<(), String> { - // todo validate certs? - // this method was created to load certificates from files but now certs are directly in config. - // so it does nothing for now, but kept for future use to validate certs if needed - Ok(()) - } +#[derive(Debug, Deserialize)] +pub struct InfluxDBConfig { + pub influxdb_url: String, + pub influxdb_org: String, + pub influxdb_bucket: String, + pub influxdb_auth_token: String, } \ No newline at end of file diff --git a/forward-proxy/src/handler/consts.rs b/forward-proxy/src/handler/consts.rs index 1077394..9253f6e 100644 --- a/forward-proxy/src/handler/consts.rs +++ b/forward-proxy/src/handler/consts.rs @@ -1,43 +1,45 @@ -// can be replaced by constants, will see -pub enum HeaderKeys { - #[allow(dead_code)] - IntRpJwtKey, - IntFpJwtKey, - FpRpJwtKey -} +pub struct HeaderKeys; impl HeaderKeys { - pub fn as_str(&self) -> &'static str { - match self { - HeaderKeys::IntRpJwtKey => "int_rp_jwt", - HeaderKeys::IntFpJwtKey => "int_fp_jwt", - HeaderKeys::FpRpJwtKey => "fp_rp_jwt", - } - } + #[allow(dead_code)] + pub const INT_RP_JWT: &'static str = "int_rp_jwt"; + pub const INT_FP_JWT: &'static str = "int_fp_jwt"; + pub const FP_RP_JWT: &'static str = "fp_rp_jwt"; } -pub enum CtxKeys { - NTorServerId, - NTorStaticPublicKey, - UpstreamAddress, - UpstreamSNI, +pub struct CtxKeys; + +impl CtxKeys { + pub const NTOR_SERVER_ID: &'static str = "ntor_server_id"; + pub const NTOR_STATIC_PUBLIC_KEY: &'static str = "ntor_static_public_key"; + pub const UPSTREAM_ADDRESS: &'static str = "upstream_address"; + pub const UPSTREAM_SNI: &'static str = "upstream_sni"; #[allow(dead_code)] - IntRPJwt, + pub const INT_RP_JWT: &'static str = "int_rp_jwt"; #[allow(dead_code)] - IntFPJwt, - FpRpJwt, + pub const INT_FP_JWT: &'static str = "int_fp_jwt"; + pub const FP_RP_JWT: &'static str = "fp_rp_jwt"; + pub const BACKEND_AUTH_CLIENT_ID: &'static str = "backend_auth_client_id"; +} + +pub struct LogTypes; + +impl LogTypes { + pub const ACCESS_LOG: &'static str = "ACCESS_LOG"; + pub const ACCESS_LOG_RESULT: &'static str = "ACCESS_LOG_RESULT"; + pub const UPSTREAM_CONNECT: &'static str = "UPSTREAM_CONNECT"; + pub const HANDLE_CLIENT_REQUEST: &'static str = "HANDLE_CLIENT_REQUEST"; + pub const HANDLE_UPSTREAM_RESPONSE: &'static str = "HANDLE_UPSTREAM_RESPONSE"; + pub const HEALTHCHECK: &'static str = "HEALTHCHECK"; + pub const INFLUXDB: &'static str = "INFLUXDB"; + pub const AUTHENTICATION_SERVER: &'static str = "AUTHENTICATION_SERVER"; +} + +pub struct RequestPaths; + +impl RequestPaths { + pub const PROXY: &'static str = "/proxy"; + pub const INIT_TUNNEL: &'static str = "/init-tunnel"; + pub const HEALTHCHECK: &'static str = "/healthcheck"; } -impl CtxKeys { - pub fn to_string(&self) -> String { - match self { - CtxKeys::NTorServerId => "ntor_server_id".to_string(), - CtxKeys::NTorStaticPublicKey => "ntor_static_public_key".to_string(), - CtxKeys::UpstreamAddress => "upstream_address".to_string(), - CtxKeys::UpstreamSNI => "upstream_sni".to_string(), - CtxKeys::IntRPJwt => "int_rp_jwt".to_string(), - CtxKeys::IntFPJwt => "int_fp_jwt".to_string(), - CtxKeys::FpRpJwt => "fp_rp_jwt".to_string(), - } - } -} \ No newline at end of file diff --git a/forward-proxy/src/handler/mod.rs b/forward-proxy/src/handler/mod.rs index 3efd032..9c4805c 100644 --- a/forward-proxy/src/handler/mod.rs +++ b/forward-proxy/src/handler/mod.rs @@ -1,24 +1,29 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; -use log::{error, info}; + use pingora::http::StatusCode; use reqwest::Client; -use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; -use pingora_router::handler::{APIHandlerResponse, DefaultHandlerTrait, RequestBodyTrait}; -use crate::handler::types::response::{ErrorResponse, FpHealthcheckError, FpHealthcheckSuccess, InitTunnelResponseFromRP, InitTunnelResponseToINT}; -use pingora_router::handler::ResponseBodyTrait; +use pingora_router::{ + ctx::{Layer8Context, Layer8ContextTrait}, + handler::{APIHandlerResponse, DefaultHandlerTrait, RequestBodyTrait, ResponseBodyTrait} +}; use serde::Deserialize; -use crate::handler::types::request::InitTunnelRequest; -use utils; -use utils::jwt::JWTClaims; +use tracing::{debug, error, info}; + +use crate::handler::types::{ + response::{ErrorResponse, FpHealthcheckError, FpHealthcheckSuccess, InitTunnelResponseFromRP, InitTunnelResponseToINT}, + request::InitTunnelRequest +}; +use utils::{self, jwt::JWTClaims}; use crate::config::HandlerConfig; +use crate::handler::consts::LogTypes; pub mod types; pub mod consts; pub struct ForwardHandler { pub config: HandlerConfig, - jwts_storage: Arc>>, + jwts_storage: Arc>>, // int_fp_jwt -> IntFPSession } impl DefaultHandlerTrait for ForwardHandler {} @@ -31,6 +36,7 @@ struct NTorServerCertificate { #[derive(Clone, Debug, Default)] pub struct IntFPSession { + pub client_id: String, pub rp_base_url: String, pub fp_rp_jwt: String, } @@ -49,22 +55,24 @@ impl ForwardHandler { ctx: &mut Layer8Context, ) -> Result { + let correlation_id = ctx.get_correlation_id(); + let client = Client::new(); - let res = client.get( - //todo - // the input backend_url is originally from interceptor request, - // the interceptor only accepts URLs with http(s) scheme. - // But the authentication server expects a URL without scheme - format!( - "{}{}", - self.config.auth_get_certificate_url, - backend_url.replace("http://", "").replace("https://", "") - ) - ) + //todo + // the input backend_url is originally from interceptor request, + // the interceptor only accepts URLs with http(s) scheme. + // But the authentication server expects a URL without scheme + let request_path = format!( + "{}{}", + self.config.auth_get_certificate_url, + backend_url.replace("http://", "").replace("https://", "") + ); + let res = client.get(&request_path) .header("Authorization", format!("Bearer {}", self.config.auth_access_token)) .send() .await + // unable to connect .map_err(|e| { let response_body = ErrorResponse { error: format!("Failed to connect to layer8: {}", e) @@ -76,11 +84,16 @@ impl ForwardHandler { } })?; + // connected but request failed if !res.status().is_success() { let response_body = ErrorResponse { error: format!("Failed to get public key from layer8, status code: {}", res.status().as_u16()), }; - error!("Sending error response: {:?}", response_body); + error!( + %correlation_id, + log_type=LogTypes::AUTHENTICATION_SERVER, + "Failed to get ntor certificate for {request_path}: {response_body:?}" + ); ctx.insert_response_header("Connection", "close"); // Ensure connection closes??? @@ -92,26 +105,46 @@ impl ForwardHandler { #[derive(Deserialize, Debug)] struct AuthServerResponse { pub x509_certificate: String, + pub client_id: String, } - let cert: AuthServerResponse = res.json().await.map_err(|err| { - error!("Failed to parse authentication server response: {:?}", err); + let auth_res: AuthServerResponse = res.json().await.map_err(|err| { + error!( + %correlation_id, + log_type=LogTypes::AUTHENTICATION_SERVER, + "Failed to parse authentication server response: {:?}", + err + ); APIHandlerResponse { status: StatusCode::INTERNAL_SERVER_ERROR, body: None, } })?; - let pub_key = utils::cert::extract_x509_pem(cert.x509_certificate.clone()) + // save `client_id` to ctx for later use + ctx.set(consts::CtxKeys::BACKEND_AUTH_CLIENT_ID.to_string(), auth_res.client_id.clone()); + + let pub_key = utils::cert::extract_x509_pem(auth_res.x509_certificate.clone()) .map_err(|e| { - error!("Failed to parse x509 certificate: {:?}", e); + error!( + %correlation_id, + log_type=LogTypes::AUTHENTICATION_SERVER, + "Failed to parse x509 certificate: {:?}", + e + ); APIHandlerResponse { status: StatusCode::INTERNAL_SERVER_ERROR, body: None, } })?; - info!("AuthenticationServer response: {:?}", cert); + debug!(%correlation_id, "AuthenticationServer response: {:?}", auth_res); + info!( + %correlation_id, + log_type=LogTypes::AUTHENTICATION_SERVER, + "Obtained ntor credentials for backend_url: {}", + backend_url + ); Ok(NTorServerCertificate { server_id: backend_url, // todo I still prefer taking the server_id value from certificate's subject @@ -175,14 +208,14 @@ impl ForwardHandler { Ok(cert) => cert, Err(err) => return err }; - info!("Server certificate: {:?}", server_certificate); + debug!("Server certificate: {:?}", server_certificate); ctx.set( - consts::CtxKeys::NTorServerId.to_string(), + consts::CtxKeys::NTOR_SERVER_ID.to_string(), server_certificate.server_id, ); ctx.set( - consts::CtxKeys::NTorStaticPublicKey.to_string(), + consts::CtxKeys::NTOR_STATIC_PUBLIC_KEY.to_string(), hex::encode(server_certificate.public_key), ); } @@ -194,16 +227,21 @@ impl ForwardHandler { } pub fn handle_init_tunnel_response(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { - let ntor_server_id = ctx.get(&consts::CtxKeys::NTorServerId.to_string()).unwrap_or(&"".to_string()).clone(); + let ntor_server_id = ctx.get(&consts::CtxKeys::NTOR_SERVER_ID.to_string()).unwrap_or(&"".to_string()).clone(); let ntor_static_public_key = hex::decode( - ctx.get(&consts::CtxKeys::NTorStaticPublicKey.to_string()).clone().unwrap_or(&"".to_string()) + ctx.get(&consts::CtxKeys::NTOR_STATIC_PUBLIC_KEY.to_string()).clone().unwrap_or(&"".to_string()) ).unwrap_or_default(); let response_body = ctx.get_response_body(); return match utils::bytes_to_json::(response_body) { Err(e) => { - error!("Error parsing RP response: {:?}", e); + error!( + correlation_id=ctx.get_correlation_id(), + log_type=LogTypes::HANDLE_UPSTREAM_RESPONSE, + "Error parsing RP response: {:?}", + e + ); APIHandlerResponse { status: StatusCode::INTERNAL_SERVER_ERROR, body: None, @@ -217,6 +255,7 @@ impl ForwardHandler { }; let int_fp_session = IntFPSession { + client_id: ctx.get(&consts::CtxKeys::BACKEND_AUTH_CLIENT_ID.to_string()).unwrap_or(&"".to_string()).to_string(), rp_base_url: ctx.param("backend_url").unwrap_or(&"".to_string()).to_string(), fp_rp_jwt: res_from_rp.fp_rp_jwt, }; diff --git a/forward-proxy/src/main.rs b/forward-proxy/src/main.rs index fdabaf2..0b0e18a 100644 --- a/forward-proxy/src/main.rs +++ b/forward-proxy/src/main.rs @@ -1,53 +1,42 @@ mod proxy; mod handler; mod config; +mod statistics; -use std::fs::OpenOptions; use crate::handler::ForwardHandler; -use env_logger::{Env, Target}; -use log::{debug, info}; use proxy::ForwardProxy; use pingora::prelude::*; +use tokio::runtime::Runtime; use crate::config::FPConfig; +use tracing::{info, debug}; +use crate::statistics::Statistics; fn load_config() -> FPConfig { // Load environment variables from .env file dotenv::dotenv().ok(); // Deserialize from env vars - let mut config: FPConfig = envy::from_env().expect("Failed to load config"); + let config: FPConfig = envy::from_env().expect("Failed to load config"); - config.tls_config.load().expect("Failed to load TLS configuration"); - - let target = match config.log_config.log_path.as_str() { - "console" => Target::Stdout, - path => { - let file = OpenOptions::new() - .append(true) - .create(true) - .open(path) - .expect("Can't create log file!"); - - Target::Pipe(Box::new(file)) - } - }; - - env_logger::Builder::from_env(Env::default() - .write_style_or("RUST_LOG_STYLE", "always")) - .format_file(true) - .format_line_number(true) - .filter(None, config.log_config.to_level_filter()) - .target(target) - .init(); - - debug!("Loaded ForwardProxyConfig: {:?}", config); + debug!(name: "FPConfig", value = ?config); config } fn main() { let config = load_config(); + // let influxdb_client = InfluxDBClient::new(&config.influxdb_config); - info!("Starting server..."); + // Initialize the async runtime + let rt = Runtime::new().unwrap(); + rt.block_on(Statistics::init_influxdb_client(&config.influxdb_config)); + + + let _logger_guard = utils::log::init_logger( + config.log_config.log_level.clone(), + config.log_config.log_format.clone(), + config.log_config.log_path.clone(), + config.log_config.log_filename.clone(), + ); let mut server = Server::new(Some(Opt { conf: std::env::var("SERVER_CONF").ok(), @@ -59,12 +48,14 @@ fn main() { let mut proxy = http_proxy_service( &server.configuration, - ForwardProxy::new(config.tls_config, fp_handler) + ForwardProxy::new(config.tls_config, fp_handler), ); proxy.add_tcp(&format!("{}:{}", config.listen_address, config.listen_port)); server.add_service(proxy); + info!("Starting server at {}:{}", config.listen_address, config.listen_port); + server.run_forever(); } diff --git a/forward-proxy/src/proxy.rs b/forward-proxy/src/proxy.rs index 39e32d0..fcec0b8 100644 --- a/forward-proxy/src/proxy.rs +++ b/forward-proxy/src/proxy.rs @@ -1,7 +1,11 @@ +use crate::config::TlsConfig; +use crate::handler::ForwardHandler; +use crate::handler::consts::{CtxKeys, HeaderKeys, LogTypes, RequestPaths}; +use crate::handler::types::response::ErrorResponse; +use crate::statistics::Statistics; use async_trait::async_trait; use boring::x509::X509; use bytes::Bytes; -use log::{debug, error, info}; use pingora::Error; use pingora::OrErr; use pingora::http::{RequestHeader, ResponseHeader, StatusCode}; @@ -9,16 +13,13 @@ use pingora::listeners::tls::TLS_CONF_ERR; use pingora::prelude::{HttpPeer, ProxyHttp, Session}; use pingora::upstreams::peer::PeerOptions; use pingora::utils::tls::CertKey; +use pingora_error::ErrorType; use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; +use pingora_router::handler::ResponseBodyTrait; use reqwest::header::TRANSFER_ENCODING; use std::sync::Arc; use std::time::Duration; -use pingora_error::ErrorType; -use pingora_router::handler::{ResponseBodyTrait}; -use crate::config::TlsConfig; -use crate::handler::consts::HeaderKeys; -use crate::handler::{consts, ForwardHandler}; -use crate::handler::types::response::ErrorResponse; +use tracing::{debug, error, info}; pub struct ForwardProxy { tls_config: TlsConfig, @@ -44,38 +45,6 @@ impl ProxyHttp for ForwardProxy { Layer8Context::default() } - fn fail_to_connect( - &self, - _session: &mut Session, - peer: &HttpPeer, - ctx: &mut Self::CTX, - mut e: Box, - ) -> Box { - let mut retry = false; - if e.etype == ErrorType::ConnectTimedout || e.etype == ErrorType::ConnectError - || e.etype == ErrorType::ConnectRefused { - let mut addrs = ctx.get(&consts::CtxKeys::UpstreamAddress.to_string()) - .unwrap_or(&"".to_string()) - .clone(); - - // remove failed socket address from the list - let idx = addrs.find(","); - if let Some(idx) = idx { - // set retry=true to recall Self::upstream_peer to try next address - retry = true; - addrs = addrs[idx + 1..].to_string(); - - ctx.set(consts::CtxKeys::UpstreamAddress.to_string(), addrs); - } - error!( - "Failed to connect to upstream addr: {}, err: {}, retry: {}", - peer._address.to_string(), e, retry - ); - } - e.set_retry(retry); - e - } - async fn upstream_peer( &self, _session: &mut Session, @@ -94,27 +63,55 @@ impl ProxyHttp for ForwardProxy { // // Code below is for step 4(this is a client to RP), presenting the client's TLS certificate. - let addrs = ctx.get(&consts::CtxKeys::UpstreamAddress.to_string()).unwrap_or(&"".to_string()).clone(); - let sni = ctx.get(&consts::CtxKeys::UpstreamSNI.to_string()).unwrap_or(&"".to_string()).clone(); - debug!("upstream_addr: {}, upstream_sni: {}", addrs, sni); + let correlation_id = ctx.get_correlation_id(); + + let addrs = ctx + .get(&CtxKeys::UPSTREAM_ADDRESS.to_string()) + .unwrap_or(&"".to_string()) + .clone(); + let sni = ctx + .get(&CtxKeys::UPSTREAM_SNI.to_string()) + .unwrap_or(&"".to_string()) + .clone(); + info!( + %correlation_id, + log_type = LogTypes::UPSTREAM_CONNECT, + addresses = addrs, + sni = sni + ); // HttpPeer cannot connect to upstream without a valid socket(IP:PORT) address. // A dns name can resolve to multiple socket addresses. // We will try to connect to each address until one succeeds. let mut address_list: Vec<&str> = addrs.split(',').collect(); + let enable_tls = self.tls_config.enable_tls; // clone for move into closure + let upstream_sni = sni.to_string(); // clone for move into closure let mut opt_peer = None; for addr in address_list.clone() { - match std::panic::catch_unwind(|| { - HttpPeer::new(addr, self.tls_config.enable_tls, sni.to_string()) - }) { + match std::panic::catch_unwind(|| HttpPeer::new(addr, enable_tls, upstream_sni.clone())) + { Ok(p) => { + info!( + %correlation_id, + log_type = LogTypes::UPSTREAM_CONNECT, + "Created HttpPeer for addr: {}", addr + ); opt_peer = Some(p); break; } Err(err) => { - error!("Panic occurred while creating HttpPeer for {}: {:?}", addr, err); + error!( + %correlation_id, + log_type = LogTypes::UPSTREAM_CONNECT, + "Panic occurred while creating HttpPeer for addr: {}, error: {:?}", + addr, + err + ); address_list.retain(|&x| x != addr); - ctx.set(consts::CtxKeys::UpstreamAddress.to_string(), address_list.join(",")); + ctx.set( + CtxKeys::UPSTREAM_ADDRESS.to_string(), + address_list.join(","), + ); } } } @@ -122,7 +119,11 @@ impl ProxyHttp for ForwardProxy { let mut peer = match opt_peer { Some(p) => p, None => { - error!("Failed to create HttpPeer for any socket address"); + error!( + %correlation_id, + log_type = LogTypes::UPSTREAM_CONNECT, + "Failed to create HttpPeer for any socket address" + ); return Err(Error::new(ErrorType::ConnectError)); } }; @@ -134,8 +135,9 @@ impl ProxyHttp for ForwardProxy { let ca_cert = X509::from_pem(&self.tls_config.ca_cert.clone().into_bytes()) .or_err(TLS_CONF_ERR, "Failed to load CA certificate")?; - let key = boring::pkey::PKey::private_key_from_pem(&self.tls_config.key.clone().into_bytes()) - .or_err(TLS_CONF_ERR, "Failed to load private key")?; + let key = + boring::pkey::PKey::private_key_from_pem(&self.tls_config.key.clone().into_bytes()) + .or_err(TLS_CONF_ERR, "Failed to load private key")?; // The certificate to present in mTLS connections to upstream // The organization implementing mTLS acts as its own certificate authority. @@ -166,14 +168,21 @@ impl ProxyHttp for ForwardProxy { { // create Context ctx.update(session).await?; - let request_summary = session.request_summary(); - println!(); - info!("[REQUEST {}] {:?}", request_summary, ctx.request); - println!(); + let correlation_id = ctx.set_correlation_id(); + + info!( + %correlation_id, + log_type = LogTypes::ACCESS_LOG, + request_summary = session.request_summary(), + origin = ctx.request.header.get("origin"), + referer = ctx.request.header.get("referer"), + user_agent = ctx.request.header.get("User-Agent"), + ); match session.req_header().method { pingora::http::Method::OPTIONS => { // Handle CORS preflight request + ctx.response.status = StatusCode::NO_CONTENT; let mut header = ResponseHeader::build(StatusCode::NO_CONTENT, None)?; header.insert_header("Access-Control-Allow-Origin", "*")?; header.insert_header("Access-Control-Allow-Methods", "POST")?; @@ -188,9 +197,9 @@ impl ProxyHttp for ForwardProxy { let mut error_response_bytes: Vec = vec![]; match ( session.req_header().uri.path(), - session.req_header().method.as_str() + session.req_header().method.as_str(), ) { - ("/healthcheck", "GET") => { + (RequestPaths::HEALTHCHECK, "GET") => { let handler_response = self.handler.handle_healthcheck(ctx); let mut header = ResponseHeader::build(handler_response.status, None)?; let response_headers = header.headers.clone(); @@ -198,9 +207,17 @@ impl ProxyHttp for ForwardProxy { header .insert_header(key.clone(), val.clone()) .map_err(|e| { - error!("Cannot add request header {}:{:?}, err: {:?}", key.clone(), val.clone(), e) - }).unwrap_or_default(); - }; + error!( + %correlation_id, + log_type = LogTypes::HEALTHCHECK, + "Cannot add request header {}:{:?}, err: {:?}", + key.clone(), + val.clone(), + e + ) + }) + .unwrap_or_default(); + } let mut response_bytes = vec![]; if let Some(body_bytes) = handler_response.body { @@ -212,15 +229,6 @@ impl ProxyHttp for ForwardProxy { session.write_response_header_ref(&header).await?; - println!(); - info!("[RESPONSE {}] Header: {:?}", request_summary, header.headers); - info!( - "[RESPONSE {}] Body: {}", - request_summary, - String::from_utf8_lossy(&*response_bytes) - ); - println!(); - // Write the response body to the session after setting headers session .write_response_body(Some(Bytes::from(response_bytes)), true) @@ -228,76 +236,71 @@ impl ProxyHttp for ForwardProxy { return Ok(true); } - ("/init-tunnel", "POST") => { + (RequestPaths::INIT_TUNNEL, "POST") => { if let Some(url) = ctx.param("backend_url") { if let Some(url) = utils::validate_url(url) { - let socket_addr = url.socket_addrs(|| None).unwrap_or_default() - .iter() - .map(|addr| addr.to_string()) - .collect::>() - .join(","); + let socket_addr = utils::get_socket_addrs(&url); + ctx.set(CtxKeys::UPSTREAM_ADDRESS.to_string(), socket_addr); ctx.set( - consts::CtxKeys::UpstreamAddress.to_string(), - socket_addr, - ); - ctx.set( - consts::CtxKeys::UpstreamSNI.to_string(), + CtxKeys::UPSTREAM_SNI.to_string(), url.domain().unwrap_or_default().to_string(), ); } else { error_response_bytes = ErrorResponse { - error: "Invalid backend_url".to_string() - }.to_bytes(); + error: "Invalid backend_url".to_string(), + } + .to_bytes(); } } else { error_response_bytes = ErrorResponse { - error: "backend_url is a required param".to_string() - }.to_bytes(); + error: "backend_url is a required param".to_string(), + } + .to_bytes(); } } - ("/proxy", "POST") => { - error_response_bytes = match ctx.get_request_header().get(HeaderKeys::IntFpJwtKey.as_str()) { - None => { - ErrorResponse { - error: "Missing int_fp_jwt header".to_string() - }.to_bytes() + (RequestPaths::PROXY, "POST") => { + error_response_bytes = match ctx.get_request_header().get(HeaderKeys::INT_FP_JWT) { + None => ErrorResponse { + error: "Missing int_fp_jwt header".to_string(), } - Some(int_fp_jwt) => { - match self.handler.verify_int_fp_jwt(int_fp_jwt.as_str()) { - Ok(session) => { - debug!("IntFPSession: {:?}", session); - ctx.set(consts::CtxKeys::FpRpJwt.to_string(), session.fp_rp_jwt); - - if let Some(url) = utils::validate_url(&session.rp_base_url) { - let socket_addr = url.socket_addrs(|| None).unwrap_or_default() - .iter() - .map(|addr| addr.to_string()) - .collect::>() - .join(","); - ctx.set( - consts::CtxKeys::UpstreamAddress.to_string(), - socket_addr, - ); - ctx.set( - consts::CtxKeys::UpstreamSNI.to_string(), - url.domain().unwrap_or_default().to_string(), - ); - vec![] - } else { - ErrorResponse { - error: "Invalid backend_url".to_string() - }.to_bytes() + .to_bytes(), + Some(int_fp_jwt) => match self.handler.verify_int_fp_jwt(int_fp_jwt.as_str()) { + Ok(session) => { + debug!(%correlation_id, "IntFPSession: {:?}", session); + ctx.set(CtxKeys::FP_RP_JWT.to_string(), session.fp_rp_jwt); + ctx.set( + CtxKeys::BACKEND_AUTH_CLIENT_ID.to_string(), + session.client_id, + ); + + if let Some(url) = utils::validate_url(&session.rp_base_url) { + let socket_addr = utils::get_socket_addrs(&url); + ctx.set(CtxKeys::UPSTREAM_ADDRESS.to_string(), socket_addr); + ctx.set( + CtxKeys::UPSTREAM_SNI.to_string(), + url.domain().unwrap_or_default().to_string(), + ); + vec![] + } else { + ErrorResponse { + error: "Invalid backend_url".to_string(), } - } - Err(err) => { - error!("Error verifying int_fp_jwt: {}", err); - ErrorResponse { error: err }.to_bytes() + .to_bytes() } } - } + Err(err) => { + error!( + %correlation_id, + log_type = LogTypes::HANDLE_CLIENT_REQUEST, + "Error verifying int_fp_jwt: {}", err + ); + ErrorResponse { error: err }.to_bytes() + } + }, } } _ => { + ctx.response.status = StatusCode::NOT_FOUND; let header = ResponseHeader::build(StatusCode::NOT_FOUND, None)?; session.write_response_header_ref(&header).await?; session.set_keepalive(None); @@ -306,10 +309,13 @@ impl ProxyHttp for ForwardProxy { } if error_response_bytes.len() > 0 { - error!("[RESPONSE] Error: {}", utils::bytes_to_string(&error_response_bytes)); + ctx.response.status = StatusCode::BAD_REQUEST; + ctx.set_response_body(error_response_bytes.clone()); let header = ResponseHeader::build(StatusCode::BAD_REQUEST, None)?; session.write_response_header_ref(&header).await?; - session.write_response_body(Some(Bytes::from(error_response_bytes)), true).await?; + session + .write_response_body(Some(Bytes::from(error_response_bytes)), true) + .await?; session.set_keepalive(None); return Ok(true); } @@ -334,21 +340,26 @@ impl ProxyHttp for ForwardProxy { } if end_of_stream { + let correlation_id = ctx.get_correlation_id(); + info!( - "[REQUEST {}] Decoded body: {}", - session.request_summary(), - String::from_utf8_lossy(&*ctx.get_request_body()), + %correlation_id, + log_type = LogTypes::HANDLE_CLIENT_REQUEST, + request_summary = session.request_summary(), + "Request Body Received: {} bytes.", + &ctx.get_request_body().len() ); // This is the last chunk, we can process the data now let handler_response = match session.req_header().uri.path() { - "/init-tunnel" => self.handler.handle_init_tunnel_request(ctx).await, + RequestPaths::INIT_TUNNEL => self.handler.handle_init_tunnel_request(ctx).await, _ => { info!( - "[FORWARD {}] FP forward request body: {}", - session.request_summary(), - utils::bytes_to_string(&ctx.get_request_body()) + %correlation_id, + log_type = LogTypes::HANDLE_CLIENT_REQUEST, + request_summary = session.request_summary(), + "Forward proxy passing through request body unchanged." ); *body = Some(Bytes::copy_from_slice(ctx.get_request_body().as_slice())); return Ok(()); @@ -357,20 +368,29 @@ impl ProxyHttp for ForwardProxy { if handler_response.status != StatusCode::OK { error!( - "[FORWARD {}] Error in request handler with status: {}, error: {}", - session.request_summary(), + %correlation_id, + log_type = LogTypes::HANDLE_CLIENT_REQUEST, + request_summary = session.request_summary(), + "Failed to handle init-tunnel request with status: {}, error: {}", handler_response.status, utils::bytes_to_string(&handler_response.body.unwrap_or_default()) ); - return Err(pingora::Error::new( - pingora::ErrorType::HTTPStatus(u16::from(handler_response.status)), - )); + return Err(pingora::Error::new(pingora::ErrorType::HTTPStatus( + u16::from(handler_response.status), + ))); } info!( - "[FORWARD {}] Request handler response: status: {}, body: {}", - session.request_summary(), + %correlation_id, + log_type = LogTypes::HANDLE_CLIENT_REQUEST, + request_summary = session.request_summary(), + "Handle init-tunnel Request response with status: {}", handler_response.status, + ); + debug!( + %correlation_id, + request_summary = session.request_summary(), + "Handle init-tunnel response body: {}", utils::bytes_to_string(&handler_response.body.as_ref().unwrap_or(&vec![])) ); let fp_req_body = handler_response.body.as_ref().unwrap_or(&vec![]).clone(); @@ -385,66 +405,72 @@ impl ProxyHttp for ForwardProxy { &self, session: &mut Session, upstream_request: &mut RequestHeader, - _ctx: &mut Self::CTX, + ctx: &mut Self::CTX, ) -> pingora::Result<()> where Self::CTX: Send + Sync, { + let correlation_id = ctx.get_correlation_id(); + match session.req_header().uri.path() { - "/proxy" => { - match upstream_request.headers.get(HeaderKeys::IntFpJwtKey.as_str()) { - None => { + RequestPaths::PROXY => match upstream_request.headers.get(HeaderKeys::INT_FP_JWT) { + None => { + error!( + %correlation_id, + log_type = LogTypes::HANDLE_CLIENT_REQUEST, + request_summary = session.request_summary(), + "Missing required header: {}", + HeaderKeys::INT_FP_JWT + ); + + return Err(pingora::Error::new(pingora::ErrorType::HTTPStatus( + u16::from(StatusCode::BAD_REQUEST), + ))); + } + Some(token) => { + let token_str = token.to_str().or_err( + pingora::ErrorType::InvalidHTTPHeader, + "Invalid header value for token", + )?; + + if token_str.is_empty() { error!( - "[REQUEST {}] Missing required header: {}", - session.request_summary(), - HeaderKeys::IntFpJwtKey.as_str() + %correlation_id, + log_type = LogTypes::HANDLE_CLIENT_REQUEST, + request_summary = session.request_summary(), + "{} token is empty", + HeaderKeys::INT_FP_JWT ); - return Err(pingora::Error::new( - pingora::ErrorType::HTTPStatus(u16::from(StatusCode::BAD_REQUEST)), - )); + + return Err(pingora::Error::new(pingora::ErrorType::HTTPStatus( + u16::from(StatusCode::BAD_REQUEST), + ))); } - Some(token) => { - let token_str = token.to_str().or_err( - pingora::ErrorType::InvalidHTTPHeader, - "Invalid header value for token", - )?; - if token_str.is_empty() { + match self.handler.verify_int_fp_jwt(token_str) { + Ok(session) => { + upstream_request + .insert_header(HeaderKeys::FP_RP_JWT, session.fp_rp_jwt) + .unwrap_or_default(); + upstream_request.remove_header(HeaderKeys::INT_FP_JWT); + } + Err(err) => { error!( - "[REQUEST {}] Empty value for header: {}", - session.request_summary(), - HeaderKeys::IntFpJwtKey.as_str() + %correlation_id, + log_type = LogTypes::HANDLE_CLIENT_REQUEST, + request_summary = session.request_summary(), + "Error verifying {} token: {}", + HeaderKeys::INT_FP_JWT, + err ); - return Err(pingora::Error::new( - pingora::ErrorType::HTTPStatus(u16::from(StatusCode::BAD_REQUEST)), + return Err(pingora::Error::explain( + pingora::ErrorType::InvalidHTTPHeader, + err, )); } - - match self.handler.verify_int_fp_jwt(token_str) { - Ok(session) => { - upstream_request - .insert_header(HeaderKeys::FpRpJwtKey.as_str(), session.fp_rp_jwt) - .unwrap_or_default(); - upstream_request.remove_header(HeaderKeys::IntFpJwtKey.as_str()); - } - Err(err) => { - error!( - "[REQUEST {}] Error verify {} token: {}", - session.request_summary(), - HeaderKeys::IntFpJwtKey.as_str(), - err - ); - return Err( - pingora::Error::explain( - pingora::ErrorType::InvalidHTTPHeader, - err, - ) - ); - } - } } } - } + }, _ => {} } @@ -455,6 +481,10 @@ impl ProxyHttp for ForwardProxy { .unwrap_or_default(); } + upstream_request + .insert_header("x-correlation-id", correlation_id) + .unwrap_or_default(); + Ok(()) } @@ -463,8 +493,7 @@ impl ProxyHttp for ForwardProxy { _session: &mut Session, upstream_response: &mut ResponseHeader, _ctx: &mut Self::CTX, - ) -> pingora::Result<()> - { + ) -> pingora::Result<()> { upstream_response.insert_header("Access-Control-Allow-Origin", "*")?; upstream_response.insert_header("Access-Control-Allow-Methods", "POST")?; upstream_response.insert_header("Access-Control-Allow-Headers", "*")?; @@ -496,20 +525,25 @@ impl ProxyHttp for ForwardProxy { } if end_of_stream { + let correlation_id = ctx.get_correlation_id(); + // This is the last chunk, we can process the data now info!( - "[FORWARD {}] RP Response decoded body: {}", - session.request_summary(), - String::from_utf8_lossy(&*ctx.get_response_body()), + %correlation_id, + log_type = LogTypes::HANDLE_UPSTREAM_RESPONSE, + request_summary = session.request_summary(), + "Response Body Received: {} bytes.", + &ctx.get_response_body().len(), ); let handler_response = match session.req_header().uri.path() { - "/init-tunnel" => self.handler.handle_init_tunnel_response(ctx), + RequestPaths::INIT_TUNNEL => self.handler.handle_init_tunnel_response(ctx), _ => { info!( - "[RESPONSE {}] FP forward response body: {}", - session.request_summary(), - utils::bytes_to_string(&ctx.get_response_body()) + %correlation_id, + log_type = LogTypes::HANDLE_UPSTREAM_RESPONSE, + request_summary = session.request_summary(), + "Forward proxy passing through response body unchanged." ); *body = Some(Bytes::copy_from_slice(ctx.get_response_body().as_slice())); return Ok(None); @@ -518,29 +552,125 @@ impl ProxyHttp for ForwardProxy { if handler_response.status != StatusCode::OK { error!( - "[RESPONSE {}] Error in response handler with status: {}, error: {}", - session.request_summary(), + %correlation_id, + log_type = LogTypes::HANDLE_UPSTREAM_RESPONSE, + request_summary = session.request_summary(), + "Failed to handle init-tunnel Response response with status: {}, error: {}", handler_response.status, utils::bytes_to_string(&handler_response.body.unwrap_or_default()) ); - return Err(pingora::Error::new( - pingora::ErrorType::HTTPStatus( - u16::from(StatusCode::INTERNAL_SERVER_ERROR) - ), - )); + + ctx.response.status = StatusCode::INTERNAL_SERVER_ERROR; + return Err(pingora::Error::new(pingora::ErrorType::HTTPStatus( + u16::from(StatusCode::INTERNAL_SERVER_ERROR), + ))); } info!( - "[RESPONSE {}] FP response with status: {}, body: {}", - session.request_summary(), + %correlation_id, + log_type = LogTypes::HANDLE_UPSTREAM_RESPONSE, + request_summary = session.request_summary(), + "Handle init-tunnel Response response with status: {}", handler_response.status, - utils::bytes_to_string(&handler_response.body.as_ref().unwrap_or(&vec![])) ); + let fp_res_body = handler_response.body.as_ref().unwrap_or(&vec![]).clone(); + ctx.response.status = handler_response.status; + ctx.set_response_body(fp_res_body.clone()); *body = Some(Bytes::copy_from_slice(fp_res_body.as_slice())); } Ok(None) } + + async fn logging(&self, session: &mut Session, e: Option<&Error>, ctx: &mut Self::CTX) + where + Self::CTX: Send + Sync, + { + let correlation_id = ctx.get_correlation_id(); + + let mut status = ctx.response.status.as_u16(); + if let Some(_err) = e { + status = session.response_written().unwrap().status.as_u16(); + } + + if session.req_header().method.as_str() == "POST" + && (session.req_header().uri.path() == RequestPaths::PROXY + || session.req_header().uri.path() == RequestPaths::INIT_TUNNEL) + { + let client_id = ctx + .get(&CtxKeys::BACKEND_AUTH_CLIENT_ID.to_string()) + .unwrap_or(&"".to_string()) + .clone(); + let request_path = session.req_header().uri.path().to_string(); + let total_byte_transferred = + (ctx.get_request_body().len() + ctx.get_response_body().len()) as i64; + let correlation_id = correlation_id.clone(); + + tokio::spawn(async move { + Statistics::update( + client_id, + correlation_id, + request_path, + total_byte_transferred, + status, + ) + .await; + }); + } + + info!( + %correlation_id, + log_type=LogTypes::ACCESS_LOG_RESULT, + request_summary=session.request_summary(), + origin = ctx.request.header.get("origin"), + referer = ctx.request.header.get("referer"), + status=status, + latency_ms=ctx.get_latency_ms(), // todo: is it necessary? + response_body_size=ctx.get_response_body().len(), + user_agent=ctx.request.header.get("User-Agent"), + error=?e, + ); + } + + fn fail_to_connect( + &self, + _session: &mut Session, + peer: &HttpPeer, + ctx: &mut Self::CTX, + mut e: Box, + ) -> Box { + let mut retry = false; + if e.etype == ErrorType::ConnectTimedout + || e.etype == ErrorType::ConnectError + || e.etype == ErrorType::ConnectRefused + { + let mut addrs = ctx + .get(&CtxKeys::UPSTREAM_ADDRESS.to_string()) + .unwrap_or(&"".to_string()) + .clone(); + + // remove failed socket address from the list + let idx = addrs.find(","); + if let Some(idx) = idx { + // set retry=true to recall Self::upstream_peer to try next address + retry = true; + addrs = addrs[idx + 1..].to_string(); + + ctx.set(CtxKeys::UPSTREAM_ADDRESS.to_string(), addrs); + } + + error!( + correlation_id = ctx.get_correlation_id(), + log_type = LogTypes::UPSTREAM_CONNECT, + "Failed to connect to upstream addr: {}, err: {}, retry: {}", + peer._address.to_string(), + e, + retry + ); + } + e.set_retry(retry); + e + } } diff --git a/forward-proxy/src/statistics/influxdb_client.rs b/forward-proxy/src/statistics/influxdb_client.rs new file mode 100644 index 0000000..5cff04b --- /dev/null +++ b/forward-proxy/src/statistics/influxdb_client.rs @@ -0,0 +1,120 @@ +use crate::config::InfluxDBConfig; +use crate::handler::consts::RequestPaths; +use crate::statistics::InfluxDBMeasurements; +use futures::stream; +use influxdb2::Client; +use influxdb2::models::DataPoint; +use pingora::http::StatusCode; +use std::error::Error; + +pub struct InfluxDBClient { + client: Client, + bucket: String, +} + +impl InfluxDBClient { + pub fn new(config: &InfluxDBConfig) -> Self { + let influxdb_client = Client::new( + &config.influxdb_url, + &config.influxdb_org, + &config.influxdb_auth_token, + ); + InfluxDBClient { + client: influxdb_client, + bucket: config.influxdb_bucket.clone(), + } + } + + pub async fn update_statistics( + &self, + client_id: String, + request_path: String, + total_byte_transferred: i64, + response_status: u16, + ) -> Result<(), Box> { + self.increase_total_request(&client_id).await?; + + if response_status == StatusCode::OK { + return match request_path.as_str() { + RequestPaths::PROXY => { + self.add_total_byte_transferred(&client_id, total_byte_transferred) + .await?; + + self.increase_total_success(&client_id).await + } + RequestPaths::INIT_TUNNEL => self.increase_total_tunnel_initiated(&client_id).await, + _ => Ok(()), + }; + } + + Ok(()) + } + + async fn update_counter( + &self, + measurement: &str, + client_id: &str, + value: i64, + ) -> Result<(), Box> { + // Create a data point + let point = DataPoint::builder(measurement) + .tag("client_id", client_id) + .field("counter", value) + .build() + .map_err(|e| { + Box::::from(format!( + "Failed to increase counter for {}: {:?}", + measurement, e + )) + })?; + + // Write to bucket + self.client + .write(self.bucket.as_str(), stream::iter(vec![point])) + .await + .map_err(|e| { + Box::::from(format!( + "Failed to write counter for {}: {:?}", + measurement, e + )) + })?; + Ok(()) + } + + async fn add_total_byte_transferred( + &self, + client_id: &str, + bytes_size: i64, + ) -> Result<(), Box> { + self.update_counter( + InfluxDBMeasurements::TOTAL_BYTE_TRANSFERRED, + client_id, + bytes_size, + ) + .await + } + + async fn increase_total_tunnel_initiated( + &self, + client_id: &str, + ) -> Result<(), Box> { + self.update_counter(InfluxDBMeasurements::TOTAL_TUNNEL_INITIATED, client_id, 1) + .await + } + + async fn increase_total_request( + &self, + client_id: &str, + ) -> Result<(), Box> { + self.update_counter(InfluxDBMeasurements::TOTAL_REQUEST, client_id, 1) + .await + } + + async fn increase_total_success( + &self, + client_id: &str, + ) -> Result<(), Box> { + self.update_counter(InfluxDBMeasurements::TOTAL_SUCCESS, client_id, 1) + .await + } +} diff --git a/forward-proxy/src/statistics/mod.rs b/forward-proxy/src/statistics/mod.rs new file mode 100644 index 0000000..ec9f22b --- /dev/null +++ b/forward-proxy/src/statistics/mod.rs @@ -0,0 +1,57 @@ +mod influxdb_client; + +use crate::config::InfluxDBConfig; +use crate::handler::consts::LogTypes; +use crate::statistics::influxdb_client::InfluxDBClient; +use futures::TryFutureExt; +use once_cell::sync::Lazy; +use tokio::sync::Mutex; +use tracing::error; + +struct InfluxDBMeasurements; + +impl InfluxDBMeasurements { + const TOTAL_BYTE_TRANSFERRED: &'static str = "total_byte_transferred"; + const TOTAL_TUNNEL_INITIATED: &'static str = "total_tunnel_initiated"; + const TOTAL_SUCCESS: &'static str = "total_success"; + const TOTAL_REQUEST: &'static str = "total_request"; +} + +static INFLUXDB_CLIENT: Lazy>> = Lazy::new(|| Mutex::new(None)); + +pub struct Statistics; + +impl Statistics { + pub async fn init_influxdb_client(config: &InfluxDBConfig) { + let mut influxdb_client = INFLUXDB_CLIENT.lock().await; + *influxdb_client = Some(InfluxDBClient::new(&config)); + } + + pub async fn update( + client_id: String, + correlation_id: String, + request_path: String, + total_byte_transferred: i64, + response_status: u16, + ) { + let client = INFLUXDB_CLIENT.lock().await; + if let Some(ref influxdb_client) = *client { + influxdb_client + .update_statistics( + client_id, + request_path, + total_byte_transferred, + response_status, + ) + .map_err(|e| { + error!( + %correlation_id, + log_type = LogTypes::INFLUXDB, + "Failed to update statistics: {:?}", e + ); + }) + .await + .ok(); + } + } +} diff --git a/pingora-router/Cargo.toml b/pingora-router/Cargo.toml index 4da2e2f..6be7d3e 100644 --- a/pingora-router/Cargo.toml +++ b/pingora-router/Cargo.toml @@ -4,8 +4,9 @@ version = "0.1.0" edition = "2024" [dependencies] -log = "0.4.27" futures = "0.3.31" pingora = { version = "0.5.0", features = ["lb", "boringssl"] } serde = "1.0.219" serde_json = "1.0.140" +chrono = "0.4.40" +uuid = "1.16.0" \ No newline at end of file diff --git a/pingora-router/src/ctx.rs b/pingora-router/src/ctx.rs index b1a0608..1426926 100644 --- a/pingora-router/src/ctx.rs +++ b/pingora-router/src/ctx.rs @@ -1,7 +1,9 @@ use std::collections::HashMap; -use pingora::http::{Method, RequestHeader}; +use std::time::Instant; +use pingora::http::{Method, RequestHeader, StatusCode}; use pingora::proxy::Session; use crate::utils::get_request_body; +use uuid; /* * Each type in this crate serves a specific purpose and may be updated as requirements evolve. @@ -68,6 +70,7 @@ impl Layer8ContextRequest { /// shared across handlers during request processing #[derive(Debug, Clone, Default)] pub struct Layer8ContextResponse { + pub status: StatusCode, pub header: Layer8Header, body: Vec, } @@ -82,7 +85,7 @@ pub struct Layer8ContextResponse { /// This struct is designed to provide a unified interface for accessing and modifying /// request and response data, as well as sharing state across middleware and handlers. /// All fields are private and should be accessed or modified only through dedicated `get` and `set` methods. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct Layer8Context { /// `request`: contains all relevant request information needed for processing and handler access pub request: Layer8ContextRequest, @@ -93,6 +96,18 @@ pub struct Layer8Context { /// during request processing. /// Accessed via `get(&self, key: &str)` and `set(&mut self, key: String, value: String)` methods memory: HashMap, + pub latency_start: Instant, // todo: remove if not needed +} + +impl Default for Layer8Context { + fn default() -> Self { + Self { + request: Default::default(), + response: Default::default(), + memory: Default::default(), + latency_start: Instant::now(), + } + } } impl Layer8Context { @@ -189,6 +204,30 @@ impl Layer8ContextTrait for Layer8Context { fn set_request_summary(&mut self, summary: Layer8ContextRequestSummary) { self.request.summary = summary } + + fn set_correlation_id(&mut self) -> String { + let correlation_id: String; + if let Some(cid) = self.get_request_header().get("x-correlation-id") { + correlation_id = cid.clone(); + } else if let Some(cid) = self.get_request_header().get("x-request-id") { + correlation_id = cid.clone(); + } else { + correlation_id = uuid::Uuid::new_v4().to_string(); + } + + self.set("x-correlation-id".to_string(), correlation_id.clone()); + correlation_id + } + + fn get_correlation_id(&self) -> String { + self.get("x-correlation-id") + .unwrap_or(&"".to_string()) + .clone() + } + + fn get_latency_ms(&self) -> i64 { + self.latency_start.elapsed().as_nanos() as i64 + } } /// This trait appears to be redundant and could potentially be removed, @@ -213,6 +252,9 @@ pub trait Layer8ContextTrait { fn get(&self, key: &str) -> Option<&String>; fn set(&mut self, key: String, value: String); fn set_request_summary(&mut self, summary: Layer8ContextRequestSummary); + fn set_correlation_id(&mut self) -> String; + fn get_correlation_id(&self) -> String; + fn get_latency_ms(&self) -> i64; } /// `Layer8Header` is a type alias for a map of HTTP header key-value pairs used diff --git a/pingora-router/src/utils.rs b/pingora-router/src/utils.rs index 7c2ced0..5c7f346 100644 --- a/pingora-router/src/utils.rs +++ b/pingora-router/src/utils.rs @@ -1,4 +1,3 @@ -use log::error; use pingora::prelude::Session; pub(crate) async fn get_request_body(session: &mut Session) -> pingora::Result> { @@ -12,7 +11,6 @@ pub(crate) async fn get_request_body(session: &mut Session) -> pingora::Result { - error!("ERROR: {err}"); return Err(err); } } diff --git a/reverse-proxy/.env.dev b/reverse-proxy/.env.dev index 24d62b5..f40c411 100644 --- a/reverse-proxy/.env.dev +++ b/reverse-proxy/.env.dev @@ -5,7 +5,12 @@ PATH_TO_SERVER_CONF=../server_conf.yml # Logging configuration LOG_LEVEL=trace +# default to "json" if not "plain" +LOG_FORMAT=plain +# "console" or folder path LOG_PATH=console +# leave empty if LOG_PATH is console +LOG_FILENAME=reverse-proxy.log # Handler configuration NTOR_SERVER_ID=ReverseProxyServer diff --git a/reverse-proxy/.env.docker b/reverse-proxy/.env.docker index 860c4d6..4e740df 100644 --- a/reverse-proxy/.env.docker +++ b/reverse-proxy/.env.docker @@ -5,9 +5,13 @@ PATH_TO_SERVER_CONF=../server_conf.yml # Logging configuration LOG_LEVEL=trace -LOG_PATH=console +# default to "json" if not "plain" +LOG_FORMAT=plain # can be set to 'console' or a file path -#LOG_PATH=/var/log/layer8/log.txt +#LOG_PATH=/var/log/layer8 +LOG_PATH=console +# leave empty if LOG_PATH is console +LOG_FILENAME=reverse-proxy.log # Handler configuration NTOR_SERVER_ID=http://10.10.10.102:6193 diff --git a/reverse-proxy/Cargo.toml b/reverse-proxy/Cargo.toml index e629e2a..f971e24 100644 --- a/reverse-proxy/Cargo.toml +++ b/reverse-proxy/Cargo.toml @@ -9,8 +9,6 @@ bytes = "1.10.1" serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" serde_yaml = "0.8.26" -env_logger = "0.11.7" -log = "0.4.27" chrono = "0.4.40" reqwest = { version="0.11", default-features=false, features=["json", "rustls-tls"] } tokio-rustls = "0.26.2" @@ -28,3 +26,4 @@ uuid = { version = "1.16.0", features = ["v4"] } utils = { path = "../utils", version = "0.1.0" } envy = "0.4.2" hex = "0.4.3" +tracing = "0.1.41" diff --git a/reverse-proxy/Dockerfile b/reverse-proxy/Dockerfile index 8d7d21f..dd7efd3 100644 --- a/reverse-proxy/Dockerfile +++ b/reverse-proxy/Dockerfile @@ -20,7 +20,6 @@ RUN apt-get update && apt-get install -y \ WORKDIR /usr/src/app # Copy source code -COPY ./certs ./certs COPY ./pingora-router ./pingora-router COPY ./utils ./utils COPY ./reverse-proxy ./reverse-proxy @@ -44,7 +43,6 @@ RUN useradd -m layer8 # Copy only the built binary from the builder stage COPY --from=builder /usr/src/app/reverse-proxy/target/release/reverse-proxy /usr/local/bin/reverse-proxy -COPY --from=builder /usr/src/app/certs /usr/local/certs # Switch to non-root user USER layer8 diff --git a/reverse-proxy/src/config.rs b/reverse-proxy/src/config.rs index 09293d5..26a0b28 100644 --- a/reverse-proxy/src/config.rs +++ b/reverse-proxy/src/config.rs @@ -15,22 +15,13 @@ pub struct RPConfig { #[derive(Debug, Deserialize, Clone)] pub(super) struct LogConfig { - pub log_path: String, pub log_level: String, -} - -impl LogConfig { - pub fn to_level_filter(&self) -> log::LevelFilter { - match self.log_level.to_uppercase().as_str() { - "INFO" => log::LevelFilter::Info, - "DEBUG" => log::LevelFilter::Debug, - "WARNING" => log::LevelFilter::Warn, - "ERROR" => log::LevelFilter::Error, - "TRACE" => log::LevelFilter::Trace, - "OFF" => log::LevelFilter::Off, - _ => log::max_level() - } - } + /// default to "json" if not "plain" + pub log_format: String, + /// "console" or folder path + pub log_path: String, + /// required if log_path is not "console" + pub log_filename: String, } #[derive(Debug, Deserialize, Clone)] @@ -49,6 +40,5 @@ pub(super) struct HandlerConfig { pub jwt_virtual_connection_secret: Vec, #[serde(deserialize_with = "utils::deserializer::string_to_number")] pub jwt_exp_in_hours: i64, - pub forward_proxy_url: Option, pub backend_url: String, } diff --git a/reverse-proxy/src/handler/common/consts.rs b/reverse-proxy/src/handler/common/consts.rs index ec1f1ba..e85559a 100644 --- a/reverse-proxy/src/handler/common/consts.rs +++ b/reverse-proxy/src/handler/common/consts.rs @@ -1,16 +1,19 @@ -// can be replaced by constants, will see -pub enum HeaderKeys { - FpRpJwtKey, - IntRpJwtKey, -} +pub struct HeaderKeys; impl HeaderKeys { - pub fn as_str(&self) -> &'static str { - match self { - HeaderKeys::FpRpJwtKey => "fp_rp_jwt", - HeaderKeys::IntRpJwtKey => "int_rp_jwt" - } - } + pub const FP_RP_JWT: &'static str = "fp_rp_jwt"; + pub const INT_RP_JWT_KEY: &'static str = "int_rp_jwt"; } -pub const INIT_TUNNEL_TO_BACKEND_PATH: &str = "/init-tunnel"; \ No newline at end of file +pub struct LogTypes; + +impl LogTypes { + pub const ACCESS_LOG: &'static str = "ACCESS_LOG"; + pub const ACCESS_LOG_RESULT: &'static str = "ACCESS_LOG_RESULT"; + pub const HANDLE_INIT_TUNNEL_REQUEST: &'static str = "HANDLE_INIT_TUNNEL_REQUEST"; + pub const HANDLE_PROXY_REQUEST: &'static str = "HANDLE_PROXY_REQUEST"; + pub const HANDLE_BACKEND_RESPONSE: &'static str = "HANDLE_BACKEND_RESPONSE"; + #[allow(dead_code)] + pub const HEALTHCHECK: &'static str = "HEALTHCHECK"; + pub const TLS_HANDSHAKE: &'static str = "TLS_HANDSHAKE"; +} \ No newline at end of file diff --git a/reverse-proxy/src/handler/common/handler.rs b/reverse-proxy/src/handler/common/handler.rs index ad65437..b2cd99c 100644 --- a/reverse-proxy/src/handler/common/handler.rs +++ b/reverse-proxy/src/handler/common/handler.rs @@ -1,3 +1,4 @@ /// Struct containing only associated methods (no instance methods or fields). /// The contents are quite drafting, but the idea is to handle common operations +#[allow(dead_code)] pub struct CommonHandler {} \ No newline at end of file diff --git a/reverse-proxy/src/handler/init_tunnel/handler.rs b/reverse-proxy/src/handler/init_tunnel/handler.rs index 8898a63..4811e9c 100644 --- a/reverse-proxy/src/handler/init_tunnel/handler.rs +++ b/reverse-proxy/src/handler/init_tunnel/handler.rs @@ -1,11 +1,8 @@ -use log::{error, info}; use pingora::http::StatusCode; use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; use pingora_router::handler::{APIHandlerResponse, DefaultHandlerTrait, ResponseBodyTrait}; -use reqwest::Client; -use crate::handler::common::consts::INIT_TUNNEL_TO_BACKEND_PATH; use crate::handler::common::types::ErrorResponse; -use crate::handler::init_tunnel::{InitEncryptedTunnelRequest, InitTunnelRequestToBackend}; +use crate::handler::init_tunnel::{InitEncryptedTunnelRequest}; /// Struct containing only associated methods (no instance methods or fields) pub(crate) struct InitTunnelHandler {} @@ -15,7 +12,7 @@ impl DefaultHandlerTrait for InitTunnelHandler {} impl InitTunnelHandler { pub(crate) async fn validate_request_body( ctx: &mut Layer8Context, - backend_url: String, + _backend_url: String, ) -> Result { return match InitTunnelHandler::parse_request_body::< @@ -30,8 +27,6 @@ impl InitTunnelHandler { Some(err_response) => Some(err_response.to_bytes()) }; - InitTunnelHandler::send_result_to_be(backend_url, false).await; - Err(APIHandlerResponse { status: StatusCode::BAD_REQUEST, body, @@ -39,30 +34,4 @@ impl InitTunnelHandler { } }; } - - pub(crate) async fn send_result_to_be(backend_url: String, result: bool) { - let body = InitTunnelRequestToBackend { - success: result, - }; - - let request_url = format!("{backend_url}{INIT_TUNNEL_TO_BACKEND_PATH}"); - - let log_meta = format!("[FORWARD {}]", request_url); - info!("{log_meta} request to BE body: {:?}", body); - - let client = Client::new(); - match client.post(request_url) - .header("Content-Type", "application/json") - .json(&body) - .send() - .await - { - Ok(res) => { - info!("{log_meta} Response sending init-tunnel result to BE: {:?}", res) - } - Err(err) => { - error!("{log_meta} Error sending init-tunnel result to BE: {:?}", err) - } - } - } } diff --git a/reverse-proxy/src/handler/mod.rs b/reverse-proxy/src/handler/mod.rs index 1e3676d..9a4dec3 100644 --- a/reverse-proxy/src/handler/mod.rs +++ b/reverse-proxy/src/handler/mod.rs @@ -1,9 +1,9 @@ use std::collections::HashMap; use std::sync::{Mutex, MutexGuard}; -use log::debug; use ntor::common::{InitSessionMessage, NTorParty}; use ntor::server::NTorServer; use pingora::http::StatusCode; +use tracing::{debug, info}; use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; use pingora_router::handler::{APIHandlerResponse, ResponseBodyTrait}; use init_tunnel::handler::InitTunnelHandler; @@ -12,9 +12,10 @@ use init_tunnel::InitEncryptedTunnelResponse; use utils::{new_uuid}; use utils::jwt::JWTClaims; use crate::config::{HandlerConfig, RPConfig}; +use crate::handler::common::consts::LogTypes; use crate::handler::healthcheck::{RpHealthcheckError, RpHealthcheckSuccess}; -mod common; +pub(crate) mod common; mod init_tunnel; mod proxy; mod healthcheck; @@ -60,6 +61,8 @@ impl ReverseHandler { } pub async fn handle_init_tunnel(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { + let correlation_id = ctx.get_correlation_id(); + // validate request body let request_body = match InitTunnelHandler::validate_request_body( ctx, @@ -68,7 +71,6 @@ impl ReverseHandler { Ok(res) => res, Err(res) => return res }; - debug!("[REQUEST /init-tunnel] Parsed body: {:?}", request_body); // todo I think there are prettier ways to use nTor since we are free to modify the nTor crate, but I'm lazy let mut ntor_server = NTorServer::new_with_secret( @@ -109,8 +111,13 @@ impl ReverseHandler { fp_rp_jwt, }; - InitTunnelHandler::send_result_to_be(self.config.backend_url.clone(), true).await; - + // InitTunnelHandler::send_result_to_be(self.config.backend_url.clone(), true).await; + info!( + %correlation_id, + log_type=LogTypes::HANDLE_INIT_TUNNEL_REQUEST, + "Save new nTor session: {}", + ntor_session_id + ); NTOR_SHARED_SECRETS.with(|memory| { let mut guard: MutexGuard>> = memory.lock().unwrap(); guard.insert(ntor_session_id, ntor_server.get_shared_secret().unwrap_or_default()); @@ -123,6 +130,7 @@ impl ReverseHandler { } pub async fn handle_proxy_request(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { + let correlation_id = ctx.get_correlation_id(); // validate request headers (nTor session ID) let session_id = match ProxyHandler::validate_request_headers(ctx, &self.jwt_secret) { @@ -149,10 +157,16 @@ impl ReverseHandler { Ok(req) => req, Err(res) => return res, }; - debug!("[REQUEST /proxy] Decrypted request: {:?}", wrapped_request); + + info!( + %correlation_id, + log_type=LogTypes::HANDLE_INIT_TUNNEL_REQUEST, + "Decrypted request body and forward to backend", + ); // reconstruct user request let wrapped_response = match ProxyHandler::rebuild_user_request( + ctx, self.config.backend_url.clone(), wrapped_request, ).await { @@ -160,8 +174,6 @@ impl ReverseHandler { Err(res) => return res, }; - debug!("[RESPONSE /proxy] Wrapped Backend response: {:?}", wrapped_response); - return match ProxyHandler::encrypt_response_body( wrapped_response, self.config.ntor_server_id.clone(), diff --git a/reverse-proxy/src/handler/proxy/handler.rs b/reverse-proxy/src/handler/proxy/handler.rs index 9134363..0b78a5a 100644 --- a/reverse-proxy/src/handler/proxy/handler.rs +++ b/reverse-proxy/src/handler/proxy/handler.rs @@ -1,14 +1,14 @@ use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; use reqwest::header::HeaderMap; use pingora_router::handler::{APIHandlerResponse, DefaultHandlerTrait, ResponseBodyTrait}; -use log::{debug, error}; use ntor::common::NTorParty; use ntor::server::NTorServer; use reqwest::Client; use pingora::http::StatusCode; +use tracing::{debug, error, info}; use utils::bytes_to_json; use utils::jwt::JWTClaims; -use crate::handler::common::consts::{HeaderKeys}; +use crate::handler::common::consts::{HeaderKeys, LogTypes}; use crate::handler::common::types::ErrorResponse; use crate::handler::proxy::{EncryptedMessage, L8ResponseObject, L8RequestObject}; @@ -21,15 +21,15 @@ impl ProxyHandler { fn validate_jwt_token( ctx: &mut Layer8Context, - header_key: HeaderKeys, + header_key: &str, jwt_secret: &Vec ) -> Result { - match ctx.get_request_header().get(header_key.as_str()) { + match ctx.get_request_header().get(header_key) { None => { return Err(APIHandlerResponse { status: StatusCode::BAD_REQUEST, body: Some(ErrorResponse { - error: format!("Missing {} header", header_key.as_str()), + error: format!("Missing {} header", header_key.to_string()), }.to_bytes()), }); }, @@ -38,7 +38,7 @@ impl ProxyHandler { return Err(APIHandlerResponse { status: StatusCode::BAD_REQUEST, body: Some(ErrorResponse { - error: format!("Empty {} header", header_key.as_str()), + error: format!("Empty {} header", header_key.to_string()), }.to_bytes()), }); } @@ -47,7 +47,13 @@ impl ProxyHandler { match utils::jwt::verify_jwt_token(token, jwt_secret) { Ok(data) => Ok(data.claims), Err(err) => { - error!("Error verifying {} token: {:?}", header_key.as_str(), err); + error!( + correlation_id=ctx.get_correlation_id(), + log_type=LogTypes::HANDLE_PROXY_REQUEST, + "Error verifying {} token: {:?}", + header_key, + err + ); Err(APIHandlerResponse { status: StatusCode::BAD_REQUEST, body: Some(ErrorResponse { @@ -67,14 +73,14 @@ impl ProxyHandler { ) -> Result { // verify fp_rp_jwt header - match ProxyHandler::validate_jwt_token(ctx, HeaderKeys::FpRpJwtKey, jwt_secret) { + match ProxyHandler::validate_jwt_token(ctx, HeaderKeys::FP_RP_JWT, jwt_secret) { Ok(_claims) => { // todo!() nothing to validate at the moment } Err(err) => return Err(err) } - return match ProxyHandler::validate_jwt_token(ctx, HeaderKeys::IntRpJwtKey, jwt_secret) { + return match ProxyHandler::validate_jwt_token(ctx, HeaderKeys::INT_RP_JWT_KEY, jwt_secret) { Ok(claims) => { // extract ntor_session_id from claims match claims.ntor_session_id { @@ -95,6 +101,8 @@ impl ProxyHandler { ctx: &mut Layer8Context ) -> Result { + let correlation_id = ctx.get_correlation_id(); + match ProxyHandler::parse_request_body::< EncryptedMessage, ErrorResponse @@ -104,7 +112,12 @@ impl ProxyHandler { let body = match err { None => None, Some(err_response) => { - error!("Error parsing request body: {}", err_response.error); + error!( + %correlation_id, + log_type=LogTypes::HANDLE_PROXY_REQUEST, + "Error parsing request body: {}", + err_response.error + ); Some(err_response.to_bytes()) } }; @@ -150,19 +163,31 @@ impl ProxyHandler { } pub(crate) async fn rebuild_user_request( + ctx: &Layer8Context, backend_url: String, wrapped_request: L8RequestObject ) -> Result { + let correlation_id = ctx.get_correlation_id(); let header_map = utils::hashmap_to_headermap(&wrapped_request.headers) .unwrap_or_else(|_| HeaderMap::new()); - debug!("[FORWARD {}] Reconstructed request headers: {:?}", wrapped_request.uri, header_map); + debug!( + %correlation_id, + log_type=LogTypes::HANDLE_PROXY_REQUEST, + backend_url=backend_url.as_str(), + "Reconstructed request headers: {:?}", + header_map + ); let origin_url = format!("{}{}", backend_url, wrapped_request.uri); - debug!("[FORWARD {}] Request URL: {}", wrapped_request.uri, origin_url); let client = Client::new(); - + info!( + %correlation_id, + log_type=LogTypes::HANDLE_PROXY_REQUEST, + "Send reconstructed request to origin backend URL: {}", + origin_url + ); let response = client.request( wrapped_request.method.parse().unwrap_or_default(), origin_url.as_str(), @@ -186,15 +211,12 @@ impl ProxyHandler { let serialized_headers = utils::headermap_to_hashmap(&success_res.headers()); let serialized_body = success_res.bytes().await.unwrap_or_default().to_vec(); - debug!( - "[FORWARD {}] Response from backend headers: {:?}", - wrapped_request.uri, - serialized_headers - ); - debug!( - "[FORWARD {}] Response from backend body: {}", - wrapped_request.uri, - utils::bytes_to_string(&serialized_body) + info!( + %correlation_id, + log_type=LogTypes::HANDLE_BACKEND_RESPONSE, + "Received response from backend: status={}, url={}", + status, + url.as_str() ); Ok(L8ResponseObject { @@ -208,7 +230,12 @@ impl ProxyHandler { }) } Err(err) => { - error!("[FORWARD] Error while building request to BE: {:?}", err); + error!( + %correlation_id, + log_type=LogTypes::HANDLE_PROXY_REQUEST, + "Error while building request to BE: {:?}", + err + ); let status = err.status().unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR); let err_body = ErrorResponse { error: format!("Backend error: {}", status), diff --git a/reverse-proxy/src/main.rs b/reverse-proxy/src/main.rs index dfa4b42..fe31a78 100644 --- a/reverse-proxy/src/main.rs +++ b/reverse-proxy/src/main.rs @@ -2,9 +2,7 @@ mod handler; mod proxy; mod tls_conf; -use std::fs::OpenOptions; use crate::handler::ReverseHandler; -use env_logger::{self, Env, Target}; use futures::FutureExt; use pingora::server::Server; use pingora::server::configuration::Opt; @@ -12,7 +10,7 @@ use pingora::{listeners::tls::TlsSettings, prelude::http_proxy_service}; use pingora_router::handler::APIHandler; use pingora_router::router::Router; use std::sync::Arc; -use log::{debug, error}; +use tracing::{debug, error}; use crate::config::RPConfig; use crate::proxy::ReverseProxy; @@ -27,28 +25,7 @@ fn load_config() -> RPConfig { error!("Failed to load configuration: {}", e); }).unwrap(); - let target = match config.log.log_path.as_str() { - "console" => Target::Stdout, - path => { - let file = OpenOptions::new() - .append(true) - .create(true) - .open(path) - .expect("Can't create log file!"); - - Target::Pipe(Box::new(file)) - } - }; - - env_logger::Builder::from_env(Env::default() - .write_style_or("RUST_LOG_STYLE", "always")) - .format_file(true) - .format_line_number(true) - .filter(None, config.log.to_level_filter()) - .target(target) - .init(); - - debug!("Loaded ReverseProxyConfig: {:?}", config); + debug!(name: "RPConfig", value = ?config); config } @@ -56,6 +33,13 @@ fn main() { // Load environment variables from .env file let rp_config = load_config(); + let _logger_guard = utils::log::init_logger( + rp_config.log.log_level.clone(), + rp_config.log.log_format.clone(), + rp_config.log.log_path.clone(), + rp_config.log.log_filename.clone(), + ); + let mut my_server = Server::new(Some(Opt { conf: std::env::var("SERVER_CONF").ok(), ..Default::default() diff --git a/reverse-proxy/src/proxy.rs b/reverse-proxy/src/proxy.rs index 0b11655..00a137c 100644 --- a/reverse-proxy/src/proxy.rs +++ b/reverse-proxy/src/proxy.rs @@ -1,11 +1,12 @@ use pingora::prelude::{HttpPeer, ProxyHttp}; use pingora::proxy::Session; use pingora::http::{ResponseHeader, StatusCode}; -use log::{error, info}; use async_trait::async_trait; use bytes::Bytes; +use tracing::{debug, info}; use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; use pingora_router::router::Router; +use crate::handler::common::consts::LogTypes; pub struct ReverseProxy { router: Router @@ -45,9 +46,13 @@ impl ReverseProxy { .insert_header("Access-Control-Max-Age", "86400") .unwrap_or_default(); - println!(); - info!("[RESPONSE {} {}] Header: {:?}", session.req_header().method, - session.req_header().uri.to_string(), header.headers); + let correlation_id = ctx.get_correlation_id(); + info!( + %correlation_id, + log_type=LogTypes::HANDLE_BACKEND_RESPONSE, + "Response Headers: {:?}", + header.headers + ); session.write_response_header_ref(&header).await } } @@ -77,12 +82,19 @@ impl ProxyHttp for ReverseProxy { { // create Context ctx.update(session).await?; + + let correlation_id = ctx.set_correlation_id(); + + info!( + %correlation_id, + log_type=LogTypes::ACCESS_LOG, + request_summary = session.request_summary(), + origin = ctx.request.header.get("origin"), + referer = ctx.request.header.get("referer"), + user_agent = ctx.request.header.get("User-Agent"), + ); + ctx.read_request_body(session).await?; - let request_summary = session.request_summary(); - println!(); - info!("[REQUEST {}] {:?}", request_summary, ctx.request); - info!("[REQUEST {}] Decoded body: {}", request_summary, String::from_utf8_lossy(&*ctx.get_request_body())); - println!(); let handler_response = self.router.call_handler(ctx).await; if handler_response.status == StatusCode::NOT_FOUND && handler_response.body.is_none() { @@ -99,9 +111,6 @@ impl ProxyHttp for ReverseProxy { }; ReverseProxy::::set_headers(session, ctx, handler_response.status).await?; - info!("[RESPONSE {}] Body: {}", request_summary, String::from_utf8_lossy(&*response_bytes)); - println!(); - // Write the response body to the session after setting headers session.write_response_body(Some(Bytes::from(response_bytes)), true).await?; @@ -114,21 +123,24 @@ impl ProxyHttp for ReverseProxy { e: Option<&pingora::Error>, ctx: &mut Self::CTX, ) { - let response_code = session - .response_written() - .map_or(0, |resp| resp.status.as_u16()); - - if !e.is_none() { - error!( - "{} error: {}", - self.request_summary(session, ctx), - e.as_ref().unwrap_or_default() - ); + let mut status = ctx.response.status.as_u16(); + if let Some(_err) = e { + status = session.response_written().unwrap().status.as_u16(); } + let correlation_id = ctx.get_correlation_id(); + info!( - "{} response code: {response_code}", - self.request_summary(session, ctx) + %correlation_id, + log_type=LogTypes::ACCESS_LOG_RESULT, + request_summary=session.request_summary(), + origin = ctx.request.header.get("origin"), + referer = ctx.request.header.get("referer"), + status=status, + latency_ms=ctx.get_latency_ms(), // todo: is it necessary? + response_body_size=ctx.get_response_body().len(), + user_agent=ctx.request.header.get("User-Agent"), + error=?e, ); } } diff --git a/reverse-proxy/src/tls_conf.rs b/reverse-proxy/src/tls_conf.rs index fea85c1..64d0bf7 100644 --- a/reverse-proxy/src/tls_conf.rs +++ b/reverse-proxy/src/tls_conf.rs @@ -2,9 +2,10 @@ use boring::{ pkey::{PKey, Public}, ssl::{SslAlert, SslRef, SslVerifyError, SslVerifyMode}, }; -use log::debug; use pingora::{listeners::TlsAccept, protocols::tls::TlsRef}; use serde::Deserialize; +use tracing::{debug, error, info}; +use crate::handler::common::consts::LogTypes; #[derive(Debug, Deserialize, Clone)] pub struct TlsConfig { @@ -21,7 +22,10 @@ impl TlsAccept for TlsConfig { // set the hostname for the SSL context ssl.set_hostname("localhost") .inspect_err(|e| { - log::error!("Failed to set hostname: {}", e); + error!( + log_type=LogTypes::TLS_HANDSHAKE, + "Failed to set hostname: {}", e + ); }) .unwrap(); @@ -29,12 +33,18 @@ impl TlsAccept for TlsConfig { { let key = PKey::private_key_from_pem(&self.key.clone().into_bytes()) .inspect_err(|e| { - log::error!("Failed to parse server private key: {}", e); + error!( + log_type=LogTypes::TLS_HANDSHAKE, + "Failed to parse server private key: {}", e + ); }) .unwrap(); ssl.set_private_key(&key) .inspect_err(|e| { - log::error!("Failed to set server private key: {}", e); + error!( + log_type=LogTypes::TLS_HANDSHAKE, + "Failed to set server private key: {}", e + ); }) .unwrap(); } @@ -43,13 +53,19 @@ impl TlsAccept for TlsConfig { { let cert = boring::x509::X509::from_pem(&self.cert.clone().into_bytes()) .inspect_err(|e| { - log::error!("Failed to parse server certificate: {}", e); + error!( + log_type=LogTypes::TLS_HANDSHAKE, + "Failed to parse server certificate: {}", e + ); }) .unwrap(); ssl.set_certificate(&cert) .inspect_err(|e| { - log::error!("Failed to set server certificate: {}", e); + error!( + log_type=LogTypes::TLS_HANDSHAKE, + "Failed to set server certificate: {}", e + ); }) .unwrap(); } @@ -57,13 +73,16 @@ impl TlsAccept for TlsConfig { // the CA certificate is used to verify the client certificate let ca_cert = boring::x509::X509::from_pem(&self.ca_cert.clone().into_bytes()) .inspect_err(|e| { - log::error!("Failed to parse CA certificate: {}", e); + error!( + log_type=LogTypes::TLS_HANDSHAKE, + "Failed to parse CA certificate: {}", e + ); }) .unwrap(); ssl.set_custom_verify_callback( SslVerifyMode::PEER, - Self::verify_callback(ca_cert.public_key().unwrap_or_default()), + Self::verify_callback(ca_cert.public_key().unwrap()), ); } } @@ -82,14 +101,20 @@ impl TlsConfig { ssl: &mut TlsRef, ) -> Result<(), SslVerifyError> { if ssl.verify_mode() != SslVerifyMode::PEER { - log::error!("SSL verify mode is not set to PEER, cannot verify client certificate"); + error!( + log_type=LogTypes::TLS_HANDSHAKE, + "SSL verify mode is not set to PEER, cannot verify client certificate" + ); return Err(SslVerifyError::Invalid(SslAlert::INTERNAL_ERROR)); } let client_cert = match ssl.peer_certificate() { Some(val) => val, None => { - log::error!("Failed to get client certificate"); + error!( + log_type=LogTypes::TLS_HANDSHAKE, + "Failed to get client certificate" + ); return Err(SslVerifyError::Invalid(SslAlert::NO_CERTIFICATE)); } }; @@ -99,25 +124,17 @@ impl TlsConfig { // Verify the client certificate against the server's CA if !client_cert.verify(&server_ca_public_key).unwrap_or_default() { - log::error!("Client certificate verification failed"); + error!( + log_type=LogTypes::TLS_HANDSHAKE, + "Client certificate verification failed" + ); return Err(SslVerifyError::Invalid(SslAlert::BAD_CERTIFICATE)); } - debug!("Client certificate verification succeeded"); + info!( + log_type=LogTypes::TLS_HANDSHAKE, + "Client certificate verification succeeded" + ); Ok(()) } } - -mod cert { - pub fn ca_pem(ca_path: String) -> Vec { - std::fs::read(ca_path).expect("Failed to read CA PEM file") - } - - pub fn cert(cert_path: String) -> Vec { - std::fs::read(cert_path).expect("Failed to read cert PEM file") - } - - pub fn key(key_path: String) -> Vec { - std::fs::read(key_path).expect("Failed to read key PEM file") - } -} diff --git a/utils/Cargo.toml b/utils/Cargo.toml index ce69d8b..6eb22a5 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -9,10 +9,13 @@ uuid = { version = "1.16.0", features = ["v4"] } serde_json = "1.0.140" serde = { version = "1.0.219", features = ["derive"] } base64 = "0.21.7" -log = "0.4.27" chrono = "0.4.40" jsonwebtoken = "9.3.1" url = "2.5.4" pem = "3.0.5" x509-parser = "0.17.0" hex = "0.4.3" +tracing = "0.1.41" +tracing-subscriber = { version = "0.3.19", features = ["json", "env-filter", "fmt"] } +tracing-appender = "0.2.3" + diff --git a/utils/src/cert.rs b/utils/src/cert.rs index 95f40da..822944f 100644 --- a/utils/src/cert.rs +++ b/utils/src/cert.rs @@ -7,18 +7,9 @@ pub fn extract_x509_pem(pem: String) -> Result, Box) -> HeaderMap { let mut header_map = HeaderMap::new(); @@ -132,4 +133,12 @@ pub fn headermap_to_hashmap(headers: &HeaderMap) -> HashMap Option { Url::parse(url).ok() +} + +pub fn get_socket_addrs(url: &Url) -> String { + url.socket_addrs(|| None).unwrap_or_default() + .iter() + .map(|addr| addr.to_string()) + .collect::>() + .join(",") } \ No newline at end of file diff --git a/utils/src/log.rs b/utils/src/log.rs new file mode 100644 index 0000000..94ec078 --- /dev/null +++ b/utils/src/log.rs @@ -0,0 +1,59 @@ +use tracing_appender::non_blocking::WorkerGuard; +use tracing_appender::rolling; +use tracing_subscriber::filter::LevelFilter; +use tracing_subscriber::fmt; +use tracing_subscriber::fmt::writer::BoxMakeWriter; + +pub fn init_logger( + level: String, + log_format: String, + log_folder: String, + log_file: String, +) -> Option { + let level_filter = to_level_filter(level); + + // Dynamic writer + let (writer, guard): (BoxMakeWriter, Option) = + if log_folder == "console" { + let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout()); + (BoxMakeWriter::new(non_blocking), Some(guard)) + } else { + let file_appender = rolling::daily(log_folder, log_file); + let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); + (BoxMakeWriter::new(non_blocking), Some(guard)) + }; + + // Structured JSON logger + let builder = fmt::Subscriber::builder() + .with_max_level(level_filter) + .with_target(true) + .with_file(true) // ✅ include file path + .with_line_number(true) // ✅ include line number + .with_ansi(true) + .with_writer(writer); + + if log_format.to_lowercase() != "plain" { + builder + .json() + .with_current_span(true) + .flatten_event(true) + .init(); + } else { + builder.compact().init(); + }; + + tracing::info!("Logger initialized"); + guard +} + +fn to_level_filter(level: String) -> LevelFilter { + match level.to_uppercase().as_str() { + "INFO" => LevelFilter::INFO, + "DEBUG" => LevelFilter::DEBUG, + "WARNING" => LevelFilter::WARN, + "ERROR" => LevelFilter::ERROR, + "TRACE" => LevelFilter::TRACE, + "OFF" => LevelFilter::OFF, + _ => LevelFilter::INFO, + } +}