diff --git a/.gitignore b/.gitignore index 847c65c..c970dc3 100644 --- a/.gitignore +++ b/.gitignore @@ -28,7 +28,7 @@ test/cypress/screenshots **/Cargo.lock -docker/layer8-volumes/influxdb2-data +docker/influxdb2-data docker/layer8-volumes/pg-data certs/ntor/*.pem diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 684f117..b38d5d4 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -53,7 +53,7 @@ services: depends_on: auth-postgres: condition: service_healthy - auth-influxdb2: + influxdb2: condition: service_healthy auth-telegraf: condition: service_healthy @@ -64,14 +64,22 @@ services: layer8-network: ipv4_address: 10.10.10.103 - auth-influxdb2: - container_name: auth-influxdb2 + influxdb2: + container_name: influxdb2 image: influxdb:2 restart: always ports: - 8086:8086 volumes: - - ./layer8-volumes/influxdb2-data:/var/lib/influxdb2 + - ./influxdb2-data:/var/lib/influxdb2 + environment: + - DOCKER_INFLUXDB_INIT_MODE=setup + - DOCKER_INFLUXDB_INIT_USERNAME=admin + - DOCKER_INFLUXDB_INIT_PASSWORD=12341234 + - DOCKER_INFLUXDB_INIT_ORG=globeandcitizen + - DOCKER_INFLUXDB_INIT_BUCKET=layer8-bucket + - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=layer8-admin-token + - DOCKER_INFLUXDB_INIT_RETENTION=30d healthcheck: test: [ "CMD", "curl", "-f", "http://localhost:8086/health" ] interval: 5s @@ -135,7 +143,7 @@ services: depends_on: auth-postgres: condition: service_healthy - auth-influxdb2: + influxdb2: condition: service_healthy auth-telegraf: condition: service_healthy diff --git a/docker/layer8-volumes/.env.docker b/docker/layer8-volumes/.env.docker index eec8182..0387a41 100644 --- a/docker/layer8-volumes/.env.docker +++ b/docker/layer8-volumes/.env.docker @@ -22,9 +22,9 @@ INFLUXDB_URL=http://10.10.10.104:8086 INFLUXDB_URL_TELEGRAF=http://host.docker.internal:8086 INFLUXDB_USERNAME=admin INFLUXDB_PASSWORD=somethingthatyoudontknow -INFLUXDB_ORG=layer8 -INFLUXDB_BUCKET=layer8 -INFLUXDB_TOKEN=DEFAULT_TOKEN_FOR_TESTING +INFLUXDB_ORG=globeandcitizen +INFLUXDB_BUCKET=layer8-bucket +INFLUXDB_TOKEN=layer8-admin-token OTEL_EXPORTER_OTLP_ENDPOINT=10.10.10.105:4317 CREATE_TEST_USER=true diff --git a/forward-proxy/.env.dev b/forward-proxy/.env.dev index 653a539..5b1546f 100644 --- a/forward-proxy/.env.dev +++ b/forward-proxy/.env.dev @@ -24,3 +24,8 @@ CA_CERT="-----BEGIN CERTIFICATE-----\nMIIF0zCCA7ugAwIBAgIUQbJs4Jhd2NiSeL0Y+SZ4z7 CERT="-----BEGIN CERTIFICATE-----\nMIIFtTCCA52gAwIBAgIUXKLNUFsfUc3xSpmCKj35dAP3oe8wDQYJKoZIhvcNAQEL\nBQAwcTELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRUwEwYDVQQHDAxTYW5GcmFu\nY2lzY28xGDAWBgNVBAoMD0dsb2JlQW5kQ2l0aXplbjEPMA0GA1UECwwGTGF5ZXI4\nMRMwEQYDVQQDDAptVExTUm9vdENBMB4XDTI1MDkyNDE5MTAyNVoXDTM1MDkyMjE5\nMTAyNVowdDELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRUwEwYDVQQHDAxTYW5G\ncmFuY2lzY28xGDAWBgNVBAoMD0dsb2JlQW5kQ2l0aXplbjEPMA0GA1UECwwGTGF5\nZXI4MRYwFAYDVQQDDA1mb3J3YXJkLXByb3h5MIICIjANBgkqhkiG9w0BAQEFAAOC\nAg8AMIICCgKCAgEAq4JLgH1rmbSe2/6VFydzdxGsq0oDE9cKOagWxdd9lFqen+LL\nB+vDFV6d0n+CHDkhLsgi5ASGhSl5o232FI/Tgf4YqHQ0C3touGs2ysuI51WT2hzV\nPOFHemgveHNKk+yVHabZp5ypm4kYgMegknhFAtKytAC4g6IcHUX+t9gz+ucA0D8C\n+kuYJiaVd2IML8rF4RCGruZ6WuOcLeHF7byfucT/r9tT3IhdbHtLWOOw6mmtRuAl\nUexiXN+ipxmaAgSyAZrR9CwOo2xp9p5XL2Mn3VKHa6AGZya8QobnN16Dm+JOCoIl\n/C8b7w2H9mC6/GA2kmdxuD0+MyWFnK9DrbT+VYBgubXHeyKAR9mddn0ur7WNBeMl\ntNKufyS9J0Od3S8dERjw05Fl7jGJJj82B1zR/bKbq3v3IIteeTHm6sea2fV0vIw1\n2MBUF5ax60Jdeo71vHf4CVg01F3LBb9ar1LRVNgvrqThach6CDbvHPICXEFHV9OP\nil8PHIgUwon5iySfzfjPxIF0EnkGUR7B5iFYlcG1vSvbSW30sGQ67a9RWBfIll1m\nON1PhLZ5rnslgj6KxNReheaLSe/u4f17QxMrwLw/W3vwOm2SymUcMpwCAaLYtxd+\n6W6zaKy9g0wXeUaEbKBzaOdLqixJ4mL6NVYjEqlXJ5BuQ25iauafpRJz/vcCAwEA\nAaNCMEAwHQYDVR0OBBYEFHrJpAogeNfUj4GhwcrHVYMzXhMeMB8GA1UdIwQYMBaA\nFJOQTjwIZb078bHcHzxNnkUiCKW1MA0GCSqGSIb3DQEBCwUAA4ICAQBdQuPBEGma\nU4puro0xFlpIeOVOUf7eVMHeJuBCiDmBr8C+kae9XuShRYP/OMgGxQ7Y95BJo/C7\nTMwF5ugDQrsRod4EaTEO8ZXGcZq91v4YfegAJ9gQpR/OdniO5dDxfPMlY/VZEyK9\nwHIaUVUP+fiWk9yDwuTPZz+0xF3x2oujBO0JZTsHAbGaZ0CtNW1M5JwFF6aVD5Fs\nPave34wfDEbUg/7WKnZy6pNoxlkgdEsS1QThQ817pE9tTpGJS+9r+JjTSg9AL/W9\nQTvBB96WUsrRqJ2V+XXZ6FjDzPmWC4cfABh24S1NIdygB6uUJ6WLg8HN9tIjH+6O\noDvxCBjUE1444O6c9Z6cdxBvdICrIRN9wc1WhspBqRf4nXtRidFyrV2KDW9TMQXZ\nMOGXrhqgC1lwtKTc2iT8x74qQe8ARSArKNeTjJEiGwbc7+9wG8lf0b0zKWxwlTmg\nmAy8jVEkHmLvkZqKmaW59wUxkaAiBO2TNwjiwDkwPY8ET6DyGd/x4oG6QdocKF87\naOetHnMjTSpUj01sKFpveyTqoAF0UNilmJsUE2AO0BHsxj7Pe+ysLx1yUPiuKJX2\nUMHK/obbGgwu/5BGDXoafQlYu77ZyL/4oqnGYk/s7IGJaw/yVOrmGuuaF0h8RJfV\nOYxd3wTVC8r46A49VExTqvuYdwaAiovdaw==\n-----END CERTIFICATE-----" KEY="-----BEGIN PRIVATE KEY-----\nMIIJRAIBADANBgkqhkiG9w0BAQEFAASCCS4wggkqAgEAAoICAQCrgkuAfWuZtJ7b\n/pUXJ3N3EayrSgMT1wo5qBbF132UWp6f4ssH68MVXp3Sf4IcOSEuyCLkBIaFKXmj\nbfYUj9OB/hiodDQLe2i4azbKy4jnVZPaHNU84Ud6aC94c0qT7JUdptmnnKmbiRiA\nx6CSeEUC0rK0ALiDohwdRf632DP65wDQPwL6S5gmJpV3YgwvysXhEIau5npa45wt\n4cXtvJ+5xP+v21PciF1se0tY47Dqaa1G4CVR7GJc36KnGZoCBLIBmtH0LA6jbGn2\nnlcvYyfdUodroAZnJrxChuc3XoOb4k4KgiX8LxvvDYf2YLr8YDaSZ3G4PT4zJYWc\nr0OttP5VgGC5tcd7IoBH2Z12fS6vtY0F4yW00q5/JL0nQ53dLx0RGPDTkWXuMYkm\nPzYHXNH9spure/cgi155Mebqx5rZ9XS8jDXYwFQXlrHrQl16jvW8d/gJWDTUXcsF\nv1qvUtFU2C+upOFpyHoINu8c8gJcQUdX04+KXw8ciBTCifmLJJ/N+M/EgXQSeQZR\nHsHmIViVwbW9K9tJbfSwZDrtr1FYF8iWXWY43U+EtnmueyWCPorE1F6F5otJ7+7h\n/XtDEyvAvD9be/A6bZLKZRwynAIBoti3F37pbrNorL2DTBd5RoRsoHNo50uqLEni\nYvo1ViMSqVcnkG5DbmJq5p+lEnP+9wIDAQABAoICACmQw1GZk9lFf/abJXDeG8qw\nmuNMZaCKTi0ZAqPiDMpGiAkBwujhh38HVkJsqpDCe7tFv8b5Hczp91PXU3s6PC1V\n8o3o07AwsXl4amgNmdlO0S1cLYW6p0MQOuj7MAjXnm/4PumzOxu5xxl2yACXa0o6\n3Bppzk4AnMWvcAMIP9i/4V+W1dbpOS+NzE2JkqCGiRx5j9qVevPKE9C+1eQ/AYrZ\nJoptIk7hMZsX6nPZgsfc4qS5r/HB0zjk7huHRd7VWnqvFdESWF3c6XVefIy8gC3Q\nUYeQ2dxn89o/rYuquUSvPPCpCCGtHRz7b4cTfF2rx64FqfbXyNpGbrJBe6p+oeCb\nRnB0vnEmiEIvw4C0P0rGz9nOS69TaphTfRFVmDOFgwYDNzZz3DD/YdsQY1XMQ6pH\n+m4Nr7+EPjFk6AYSgvbmRLL8WGGI6Tv9/ezVvdflPClYLMjOGIyVPRfOldgad01T\nlI/vWDHITWJZqHwh4VBZvQaUeoRPcuabMeBIV3d7FBcsQqOwRbpbL/Piu8ydZJ8Y\n+5gUYfh/F7ouJGyuFm77R2uEhSjpbJMxu1Mku2SqG2gAime5YJBRxExUAXky9QyO\nXic4XxX67iFA4P3ATkFA5kQF3m+1uuwfYC3gIs+aiZuAuQuZ4OO5Ay3TxFCMeuTk\nfsS+COfEhsdXUG7Wn2aZAoIBAQDYClEGkfeftTvyXYbISRGeG/Nni3UUS4aNSIsv\nRNlAKnD+n60VwiBpOTr6IEP1MB1eseQvvNVqiemb5Oa8jaDV8Gkon0uBHTBQo62K\nFIRMsM8jsMGvO0RRtd/2iWRu1WnzqzA5TXX2sN/whjxeT0DUrnGKsqUI6PT8STjj\nQo/IHT7uulJPnbVZ+G+CZyQKjaBaOOy6VMYsDXhMm71zNQkM2AgzOYSLwpqtSAHM\nwHWHGkZGvetsVL6hHcLcWVxkuo7saeMcLMPfYBLkV8oX/HayZ0oc+x2RYslQwokq\ng7oFEsvQjb166MXD5gSaOjAkjb9FshBsxU5XKC3gA1xqbOo1AoIBAQDLO2Izu1Nu\nhsJ4UMWgfeYSf9/i8T5CXGYs9VytYNCFpBc530RQ4olu+SY9vrUt3+OW3xeDtWX/\nMm/pq0DxpjkvD5y5fYnsBI2Cd3o3L6WKEsqKnBOJbS5omrcwWSOO18z+8+BF9O55\n+RTR8dWHw+XxvajFTqzcLojtvExyvRwKI3WVaNtZWgU7YmsVxoQ93/PIYIKgdbhI\nv+tbc8DuxnbDA+qJhFipuTI07fclE/hupmgmd2sDiFjiyPbk85dXpLsxnAhlE9ZT\n2YSBhVQz0a68CRiVUcoWv7FsMKlgJtfICmzDH3qrB00Dp7FhXinlubrgMWy+kw5v\n6k/h2WqZhIn7AoIBAQDMOPRfQZzTXH9OnRrMOkZtL/7n4uzKQpru86SfCnZUMcqe\n6FK2Psxkq7UUvWuAW/tniMIsXlVgYP50X+2+UCO6GYlO3UaCxxTlJdTmsn5eAMXO\n90ggXeY3V7ZfV4GZRCTkMu9jO9ZHXOxUcpCelkyywDSU6EsaIR11X8JnEoTYpszW\n30rv+CV252KB4v4u+7KZlzYw7fJnslQGFzL/tSLZAV6/DaA+fbe6FledNlHjZPMJ\n7H6f6XxK0ddidRbiIXj6Ax6tg6OlhSxWrqZcBkwuWXW176wDw16K+Vqw1dUC9sG0\nZEi551EL3mR7ZoYcB+LH/4uHRvzHZzP2jzbNZCgBAoIBAQCEuVN41W23UOrQCHAI\nUDBhBIICg+pVDGLuGY9c601C+dbxRI4pBMkcYDpJOLK6Mu0/KpMAwQbLkvTjdaQE\nLLpLsbZ4rTPVn2OLQNvgDo3djkgYHbXkmhkk12WrfYtrTiPinQJqrXrQzYp7UaRR\n9e3F4kbGFItvgDSMjdyfUkFtnZq86K3XvKKOFcg5gFv8zLU4t06X3EltuWjLYN0v\nEw2cboJNLNF6hifzyTUOUex81tBNzs9kjzb9ZKFZBHxiEILv8ybIXBwsxnFy5NAI\nx3eF9arIWZHRKX+FWIJE+RkS2zwMchJ6f1ocePeuzwAttw4EPEL4crGLBUsGBCdJ\n+vThAoIBAQC1KrUB170d08W6WMdK8sbzL67M5pj8B44yp6650nNAVJhXMRuw846X\nH1KqUzV1elQdmvKJDogkUYllRVByHGX05Lr5BGAkmDxBMqYxUn56IhEfQ18ejy6x\nzUFwg4hLLWZzNqkIwGNb5B9wFzHFy39vwIlsy1mMIswxjucsH/lENRaPF2Lr5Tia\n1VRDJqXSZ2ytGyKsgBhFXsZUXmnxBNmjFuI62ROhOlrYJ3bwfTRDwkqQQWxPDj7f\nfTK7R5D69flh4CBEM6oSIZm7KEMXW/C5AkYc/jjW3/bDNuVy7JpZ/aokTxHCqW8e\n7zyoZqd4bAj7xvPuj8lMdThXtKSz86hK\n-----END PRIVATE KEY-----" +# influxDB +INFLUXDB_URL=http://localhost:8086 +INFLUXDB_ORG=layer8 +INFLUXDB_BUCKET=layer8 +INFLUXDB_AUTH_TOKEN=DEFAULT_TOKEN_FOR_TESTING diff --git a/forward-proxy/.env.docker b/forward-proxy/.env.docker index faf8372..9b03c53 100644 --- a/forward-proxy/.env.docker +++ b/forward-proxy/.env.docker @@ -7,7 +7,8 @@ PATH_TO_SERVER_CONF="../server_conf.yml" # not yet used LOG_LEVEL=trace # default to "json" if not "plain" LOG_FORMAT=plain -# "console" or folder path +# can be set to 'console' or a directory path +#LOG_PATH=/var/log/layer8 LOG_PATH=console # leave empty if LOG_PATH is console LOG_FILENAME=forward-proxy.log @@ -23,3 +24,9 @@ ENABLE_TLS=true CA_CERT="-----BEGIN CERTIFICATE-----\nMIIF0zCCA7ugAwIBAgIUQbJs4Jhd2NiSeL0Y+SZ4z7W6l7swDQYJKoZIhvcNAQEL\nBQAwcTELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRUwEwYDVQQHDAxTYW5GcmFu\nY2lzY28xGDAWBgNVBAoMD0dsb2JlQW5kQ2l0aXplbjEPMA0GA1UECwwGTGF5ZXI4\nMRMwEQYDVQQDDAptVExTUm9vdENBMB4XDTI1MDkyNDE5MDQ1OFoXDTM1MDkyMjE5\nMDQ1OFowcTELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRUwEwYDVQQHDAxTYW5G\ncmFuY2lzY28xGDAWBgNVBAoMD0dsb2JlQW5kQ2l0aXplbjEPMA0GA1UECwwGTGF5\nZXI4MRMwEQYDVQQDDAptVExTUm9vdENBMIICIjANBgkqhkiG9w0BAQEFAAOCAg8A\nMIICCgKCAgEAmQsqgvtgfiQvhaziGzWMAZRzCViScRCrwkvmeE2U6Q/etk1PacuH\ndkJAvDQgLBsXCThg6G0hjw6BHX/i5seBQ6XHBNXEhAR70X3Y9VDCOoOLbqI22XAH\nZrX5gswBUzXbxxbxPrc7lK8N8s2M/LS4ADo9QWu4gtDnfr2eEWQR1P7o8llIn3z+\nGJStvzD0Jlfa39Gv9sIjPsoe7hvSpDiYqwGYDpeBLxn2SU39tyyyxDOH5NrpHG0z\nYMh20kK/0qkiVknd7/oqbSIv1EKfLLnw7p09LmsaHhlR+w+kQt6Z2Dzvae1qMj/D\nWnf1fgBOThZdnfUDznFGLjhPpGIvyjB8zk16wgYW7OIslqqbPPSLWm3ls3qpU0pl\nn2ROOWSLufXobh0Ly8vLLGVN8mwYeaTyXK4ELC0ScrpzBJbQ0mN3xulCMulk54iZ\nLy8KA1DtiwWyyV1cCvb36kfEPavU9zCYUgjvpVp3+fato0DBqe4ukMBUlb5C9qQ0\nKEkz/z75yWAe6bSVqXnYVuIihoxumM6oRMJwfOeOU0yyIfIkhwmDxw+yqeUDVibw\nseq5Ul4qVulocSAYPUX4ym/rPqC0SI+Z3EJZSLbiwjh1jmMiLbrsguYROSIOFrVO\noyYmjCUnc5Dxs04Q8VK80NuDFxCVEaK19esxFzCcFYDTiXoocem6EccCAwEAAaNj\nMGEwHQYDVR0OBBYEFJOQTjwIZb078bHcHzxNnkUiCKW1MB8GA1UdIwQYMBaAFJOQ\nTjwIZb078bHcHzxNnkUiCKW1MA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQD\nAgGGMA0GCSqGSIb3DQEBCwUAA4ICAQAyCYD4IShYuU66OQAPz0/P4rVUr+mfKX0s\nYhnWRY+hLm4M0zp51/H9XGnHUZwGf3No3I0vWFDZHZShKt/GemCZy/WOJzjDGH71\ncNraMm/Bn2c7RXEqzK0qaKdgVo4vrS9inGExGtTRywEc+fLgH+myVR+Dy6JfxpeW\nFarMpu5AyoAWYRwUVaEQXotBF/XZHetaYr1mk9p3qNQkTEnv8zOWFF5Sv5R0I3XM\nABqk5Lz2WyRJIIL0g/9TdWZF88KrQhnr1q9JnTgAg3aTvyTR8c+Oik+BlRUe1zw1\n7TUvuVJkR50uEKZ0IVdWGtNCEzPl1PPMxzUAomozhZRh7vhuBk+yThaDByinaSvC\nxVzufk3hjrnPA0+ZyD4nVcMs3oivl5Vxli/KJrdYiQoErkbTmEgDCzSm0g/14BvJ\nHmlpITgolQGA1SFQCJcrioYn1Eyv4hSrnQtwP+t3t/G4OHbbEeu2pgqvZKEMngNL\naUGla0fwJtytoAqspWxuFwYxJWl5cse1xpsbrf5aiDphqCfSZARr8jzeYtYF8sex\n/8yzy1YmgDlSLng0kFqjLgP8SHLEgKMCOCP/haAHBaDqIu/9FMPofqlG0YJuqTB3\ndf0zohxxcIDZn17qElOVCRoUQVoz2rbYKHHZ8/MVWM7lPmUIqt2G6HlcMo1xYUa4\nfgCQMqEubg==\n-----END CERTIFICATE-----" CERT="-----BEGIN CERTIFICATE-----\nMIIFtTCCA52gAwIBAgIUXKLNUFsfUc3xSpmCKj35dAP3oe8wDQYJKoZIhvcNAQEL\nBQAwcTELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRUwEwYDVQQHDAxTYW5GcmFu\nY2lzY28xGDAWBgNVBAoMD0dsb2JlQW5kQ2l0aXplbjEPMA0GA1UECwwGTGF5ZXI4\nMRMwEQYDVQQDDAptVExTUm9vdENBMB4XDTI1MDkyNDE5MTAyNVoXDTM1MDkyMjE5\nMTAyNVowdDELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRUwEwYDVQQHDAxTYW5G\ncmFuY2lzY28xGDAWBgNVBAoMD0dsb2JlQW5kQ2l0aXplbjEPMA0GA1UECwwGTGF5\nZXI4MRYwFAYDVQQDDA1mb3J3YXJkLXByb3h5MIICIjANBgkqhkiG9w0BAQEFAAOC\nAg8AMIICCgKCAgEAq4JLgH1rmbSe2/6VFydzdxGsq0oDE9cKOagWxdd9lFqen+LL\nB+vDFV6d0n+CHDkhLsgi5ASGhSl5o232FI/Tgf4YqHQ0C3touGs2ysuI51WT2hzV\nPOFHemgveHNKk+yVHabZp5ypm4kYgMegknhFAtKytAC4g6IcHUX+t9gz+ucA0D8C\n+kuYJiaVd2IML8rF4RCGruZ6WuOcLeHF7byfucT/r9tT3IhdbHtLWOOw6mmtRuAl\nUexiXN+ipxmaAgSyAZrR9CwOo2xp9p5XL2Mn3VKHa6AGZya8QobnN16Dm+JOCoIl\n/C8b7w2H9mC6/GA2kmdxuD0+MyWFnK9DrbT+VYBgubXHeyKAR9mddn0ur7WNBeMl\ntNKufyS9J0Od3S8dERjw05Fl7jGJJj82B1zR/bKbq3v3IIteeTHm6sea2fV0vIw1\n2MBUF5ax60Jdeo71vHf4CVg01F3LBb9ar1LRVNgvrqThach6CDbvHPICXEFHV9OP\nil8PHIgUwon5iySfzfjPxIF0EnkGUR7B5iFYlcG1vSvbSW30sGQ67a9RWBfIll1m\nON1PhLZ5rnslgj6KxNReheaLSe/u4f17QxMrwLw/W3vwOm2SymUcMpwCAaLYtxd+\n6W6zaKy9g0wXeUaEbKBzaOdLqixJ4mL6NVYjEqlXJ5BuQ25iauafpRJz/vcCAwEA\nAaNCMEAwHQYDVR0OBBYEFHrJpAogeNfUj4GhwcrHVYMzXhMeMB8GA1UdIwQYMBaA\nFJOQTjwIZb078bHcHzxNnkUiCKW1MA0GCSqGSIb3DQEBCwUAA4ICAQBdQuPBEGma\nU4puro0xFlpIeOVOUf7eVMHeJuBCiDmBr8C+kae9XuShRYP/OMgGxQ7Y95BJo/C7\nTMwF5ugDQrsRod4EaTEO8ZXGcZq91v4YfegAJ9gQpR/OdniO5dDxfPMlY/VZEyK9\nwHIaUVUP+fiWk9yDwuTPZz+0xF3x2oujBO0JZTsHAbGaZ0CtNW1M5JwFF6aVD5Fs\nPave34wfDEbUg/7WKnZy6pNoxlkgdEsS1QThQ817pE9tTpGJS+9r+JjTSg9AL/W9\nQTvBB96WUsrRqJ2V+XXZ6FjDzPmWC4cfABh24S1NIdygB6uUJ6WLg8HN9tIjH+6O\noDvxCBjUE1444O6c9Z6cdxBvdICrIRN9wc1WhspBqRf4nXtRidFyrV2KDW9TMQXZ\nMOGXrhqgC1lwtKTc2iT8x74qQe8ARSArKNeTjJEiGwbc7+9wG8lf0b0zKWxwlTmg\nmAy8jVEkHmLvkZqKmaW59wUxkaAiBO2TNwjiwDkwPY8ET6DyGd/x4oG6QdocKF87\naOetHnMjTSpUj01sKFpveyTqoAF0UNilmJsUE2AO0BHsxj7Pe+ysLx1yUPiuKJX2\nUMHK/obbGgwu/5BGDXoafQlYu77ZyL/4oqnGYk/s7IGJaw/yVOrmGuuaF0h8RJfV\nOYxd3wTVC8r46A49VExTqvuYdwaAiovdaw==\n-----END CERTIFICATE-----" KEY="-----BEGIN PRIVATE KEY-----\nMIIJRAIBADANBgkqhkiG9w0BAQEFAASCCS4wggkqAgEAAoICAQCrgkuAfWuZtJ7b\n/pUXJ3N3EayrSgMT1wo5qBbF132UWp6f4ssH68MVXp3Sf4IcOSEuyCLkBIaFKXmj\nbfYUj9OB/hiodDQLe2i4azbKy4jnVZPaHNU84Ud6aC94c0qT7JUdptmnnKmbiRiA\nx6CSeEUC0rK0ALiDohwdRf632DP65wDQPwL6S5gmJpV3YgwvysXhEIau5npa45wt\n4cXtvJ+5xP+v21PciF1se0tY47Dqaa1G4CVR7GJc36KnGZoCBLIBmtH0LA6jbGn2\nnlcvYyfdUodroAZnJrxChuc3XoOb4k4KgiX8LxvvDYf2YLr8YDaSZ3G4PT4zJYWc\nr0OttP5VgGC5tcd7IoBH2Z12fS6vtY0F4yW00q5/JL0nQ53dLx0RGPDTkWXuMYkm\nPzYHXNH9spure/cgi155Mebqx5rZ9XS8jDXYwFQXlrHrQl16jvW8d/gJWDTUXcsF\nv1qvUtFU2C+upOFpyHoINu8c8gJcQUdX04+KXw8ciBTCifmLJJ/N+M/EgXQSeQZR\nHsHmIViVwbW9K9tJbfSwZDrtr1FYF8iWXWY43U+EtnmueyWCPorE1F6F5otJ7+7h\n/XtDEyvAvD9be/A6bZLKZRwynAIBoti3F37pbrNorL2DTBd5RoRsoHNo50uqLEni\nYvo1ViMSqVcnkG5DbmJq5p+lEnP+9wIDAQABAoICACmQw1GZk9lFf/abJXDeG8qw\nmuNMZaCKTi0ZAqPiDMpGiAkBwujhh38HVkJsqpDCe7tFv8b5Hczp91PXU3s6PC1V\n8o3o07AwsXl4amgNmdlO0S1cLYW6p0MQOuj7MAjXnm/4PumzOxu5xxl2yACXa0o6\n3Bppzk4AnMWvcAMIP9i/4V+W1dbpOS+NzE2JkqCGiRx5j9qVevPKE9C+1eQ/AYrZ\nJoptIk7hMZsX6nPZgsfc4qS5r/HB0zjk7huHRd7VWnqvFdESWF3c6XVefIy8gC3Q\nUYeQ2dxn89o/rYuquUSvPPCpCCGtHRz7b4cTfF2rx64FqfbXyNpGbrJBe6p+oeCb\nRnB0vnEmiEIvw4C0P0rGz9nOS69TaphTfRFVmDOFgwYDNzZz3DD/YdsQY1XMQ6pH\n+m4Nr7+EPjFk6AYSgvbmRLL8WGGI6Tv9/ezVvdflPClYLMjOGIyVPRfOldgad01T\nlI/vWDHITWJZqHwh4VBZvQaUeoRPcuabMeBIV3d7FBcsQqOwRbpbL/Piu8ydZJ8Y\n+5gUYfh/F7ouJGyuFm77R2uEhSjpbJMxu1Mku2SqG2gAime5YJBRxExUAXky9QyO\nXic4XxX67iFA4P3ATkFA5kQF3m+1uuwfYC3gIs+aiZuAuQuZ4OO5Ay3TxFCMeuTk\nfsS+COfEhsdXUG7Wn2aZAoIBAQDYClEGkfeftTvyXYbISRGeG/Nni3UUS4aNSIsv\nRNlAKnD+n60VwiBpOTr6IEP1MB1eseQvvNVqiemb5Oa8jaDV8Gkon0uBHTBQo62K\nFIRMsM8jsMGvO0RRtd/2iWRu1WnzqzA5TXX2sN/whjxeT0DUrnGKsqUI6PT8STjj\nQo/IHT7uulJPnbVZ+G+CZyQKjaBaOOy6VMYsDXhMm71zNQkM2AgzOYSLwpqtSAHM\nwHWHGkZGvetsVL6hHcLcWVxkuo7saeMcLMPfYBLkV8oX/HayZ0oc+x2RYslQwokq\ng7oFEsvQjb166MXD5gSaOjAkjb9FshBsxU5XKC3gA1xqbOo1AoIBAQDLO2Izu1Nu\nhsJ4UMWgfeYSf9/i8T5CXGYs9VytYNCFpBc530RQ4olu+SY9vrUt3+OW3xeDtWX/\nMm/pq0DxpjkvD5y5fYnsBI2Cd3o3L6WKEsqKnBOJbS5omrcwWSOO18z+8+BF9O55\n+RTR8dWHw+XxvajFTqzcLojtvExyvRwKI3WVaNtZWgU7YmsVxoQ93/PIYIKgdbhI\nv+tbc8DuxnbDA+qJhFipuTI07fclE/hupmgmd2sDiFjiyPbk85dXpLsxnAhlE9ZT\n2YSBhVQz0a68CRiVUcoWv7FsMKlgJtfICmzDH3qrB00Dp7FhXinlubrgMWy+kw5v\n6k/h2WqZhIn7AoIBAQDMOPRfQZzTXH9OnRrMOkZtL/7n4uzKQpru86SfCnZUMcqe\n6FK2Psxkq7UUvWuAW/tniMIsXlVgYP50X+2+UCO6GYlO3UaCxxTlJdTmsn5eAMXO\n90ggXeY3V7ZfV4GZRCTkMu9jO9ZHXOxUcpCelkyywDSU6EsaIR11X8JnEoTYpszW\n30rv+CV252KB4v4u+7KZlzYw7fJnslQGFzL/tSLZAV6/DaA+fbe6FledNlHjZPMJ\n7H6f6XxK0ddidRbiIXj6Ax6tg6OlhSxWrqZcBkwuWXW176wDw16K+Vqw1dUC9sG0\nZEi551EL3mR7ZoYcB+LH/4uHRvzHZzP2jzbNZCgBAoIBAQCEuVN41W23UOrQCHAI\nUDBhBIICg+pVDGLuGY9c601C+dbxRI4pBMkcYDpJOLK6Mu0/KpMAwQbLkvTjdaQE\nLLpLsbZ4rTPVn2OLQNvgDo3djkgYHbXkmhkk12WrfYtrTiPinQJqrXrQzYp7UaRR\n9e3F4kbGFItvgDSMjdyfUkFtnZq86K3XvKKOFcg5gFv8zLU4t06X3EltuWjLYN0v\nEw2cboJNLNF6hifzyTUOUex81tBNzs9kjzb9ZKFZBHxiEILv8ybIXBwsxnFy5NAI\nx3eF9arIWZHRKX+FWIJE+RkS2zwMchJ6f1ocePeuzwAttw4EPEL4crGLBUsGBCdJ\n+vThAoIBAQC1KrUB170d08W6WMdK8sbzL67M5pj8B44yp6650nNAVJhXMRuw846X\nH1KqUzV1elQdmvKJDogkUYllRVByHGX05Lr5BGAkmDxBMqYxUn56IhEfQ18ejy6x\nzUFwg4hLLWZzNqkIwGNb5B9wFzHFy39vwIlsy1mMIswxjucsH/lENRaPF2Lr5Tia\n1VRDJqXSZ2ytGyKsgBhFXsZUXmnxBNmjFuI62ROhOlrYJ3bwfTRDwkqQQWxPDj7f\nfTK7R5D69flh4CBEM6oSIZm7KEMXW/C5AkYc/jjW3/bDNuVy7JpZ/aokTxHCqW8e\n7zyoZqd4bAj7xvPuj8lMdThXtKSz86hK\n-----END PRIVATE KEY-----" + +# influxDB +INFLUXDB_URL=http://localhost:8086 +INFLUXDB_ORG=layer8 +INFLUXDB_BUCKET=layer8 +INFLUXDB_AUTH_TOKEN=DEFAULT_TOKEN_FOR_TESTING diff --git a/forward-proxy/Cargo.toml b/forward-proxy/Cargo.toml index 4d2b400..d32c48a 100644 --- a/forward-proxy/Cargo.toml +++ b/forward-proxy/Cargo.toml @@ -10,7 +10,7 @@ clap = { version = "3.2.25", features = ["derive"] } serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" chrono = "0.4.40" -reqwest = { version="0.11", default-features=false, features=["json", "rustls-tls"] } +reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } tokio = { version = "1.44.2", features = ["rt-multi-thread", "macros"] } pingora = { version = "0.5.0", features = ["lb", "boringssl"] } jsonwebtoken = "9.3.1" @@ -24,3 +24,5 @@ utils = { path = "../utils", version = "0.1.0" } hex = "0.4.3" envy = "0.4.2" tracing = "0.1.41" +influxdb2 = { version = "0.5.2", default-features = false, features = ["rustls"] } + diff --git a/forward-proxy/Dockerfile b/forward-proxy/Dockerfile index 2b9affd..7aaaf26 100644 --- a/forward-proxy/Dockerfile +++ b/forward-proxy/Dockerfile @@ -20,7 +20,6 @@ RUN apt-get update && apt-get install -y \ WORKDIR /usr/src/app # Copy source code -COPY ./certs ./certs COPY ./pingora-router ./pingora-router COPY ./utils ./utils COPY ./forward-proxy ./forward-proxy @@ -44,7 +43,6 @@ RUN useradd -m layer8 # Copy only the built binary from the builder stage COPY --from=builder /usr/src/app/forward-proxy/target/release/forward-proxy /usr/local/bin/forward-proxy -COPY --from=builder /usr/src/app/certs /usr/local/certs # Switch to non-root user USER layer8 diff --git a/forward-proxy/src/config.rs b/forward-proxy/src/config.rs index 4894f8d..34cf625 100644 --- a/forward-proxy/src/config.rs +++ b/forward-proxy/src/config.rs @@ -11,7 +11,9 @@ pub struct FPConfig { #[serde(flatten)] pub tls_config: TlsConfig, #[serde(flatten)] // This flattens the HandlerConfig fields into this struct - pub handler_config: HandlerConfig + pub handler_config: HandlerConfig, + #[serde(flatten)] + pub influxdb_config: InfluxDBConfig, } #[derive(Debug, Deserialize)] @@ -42,4 +44,12 @@ pub struct TlsConfig { pub ca_cert: String, pub cert: String, pub key: String, +} + +#[derive(Debug, Deserialize)] +pub struct InfluxDBConfig { + pub influxdb_url: String, + pub influxdb_org: String, + pub influxdb_bucket: String, + pub influxdb_auth_token: String, } \ No newline at end of file diff --git a/forward-proxy/src/handler/consts.rs b/forward-proxy/src/handler/consts.rs index 2ec617d..9253f6e 100644 --- a/forward-proxy/src/handler/consts.rs +++ b/forward-proxy/src/handler/consts.rs @@ -1,48 +1,28 @@ -// can be replaced by constants, will see -pub enum HeaderKeys { - #[allow(dead_code)] - IntRpJwtKey, - IntFpJwtKey, - FpRpJwtKey -} +pub struct HeaderKeys; impl HeaderKeys { - pub fn as_str(&self) -> &'static str { - match self { - HeaderKeys::IntRpJwtKey => "int_rp_jwt", - HeaderKeys::IntFpJwtKey => "int_fp_jwt", - HeaderKeys::FpRpJwtKey => "fp_rp_jwt", - } - } -} - -pub enum CtxKeys { - NTorServerId, - NTorStaticPublicKey, - UpstreamAddress, - UpstreamSNI, #[allow(dead_code)] - IntRPJwt, - #[allow(dead_code)] - IntFPJwt, - FpRpJwt, + pub const INT_RP_JWT: &'static str = "int_rp_jwt"; + pub const INT_FP_JWT: &'static str = "int_fp_jwt"; + pub const FP_RP_JWT: &'static str = "fp_rp_jwt"; } +pub struct CtxKeys; + impl CtxKeys { - pub fn to_string(&self) -> String { - match self { - CtxKeys::NTorServerId => "ntor_server_id".to_string(), - CtxKeys::NTorStaticPublicKey => "ntor_static_public_key".to_string(), - CtxKeys::UpstreamAddress => "upstream_address".to_string(), - CtxKeys::UpstreamSNI => "upstream_sni".to_string(), - CtxKeys::IntRPJwt => "int_rp_jwt".to_string(), - CtxKeys::IntFPJwt => "int_fp_jwt".to_string(), - CtxKeys::FpRpJwt => "fp_rp_jwt".to_string(), - } - } + pub const NTOR_SERVER_ID: &'static str = "ntor_server_id"; + pub const NTOR_STATIC_PUBLIC_KEY: &'static str = "ntor_static_public_key"; + pub const UPSTREAM_ADDRESS: &'static str = "upstream_address"; + pub const UPSTREAM_SNI: &'static str = "upstream_sni"; + #[allow(dead_code)] + pub const INT_RP_JWT: &'static str = "int_rp_jwt"; + #[allow(dead_code)] + pub const INT_FP_JWT: &'static str = "int_fp_jwt"; + pub const FP_RP_JWT: &'static str = "fp_rp_jwt"; + pub const BACKEND_AUTH_CLIENT_ID: &'static str = "backend_auth_client_id"; } -pub struct LogTypes {} +pub struct LogTypes; impl LogTypes { pub const ACCESS_LOG: &'static str = "ACCESS_LOG"; @@ -51,4 +31,15 @@ impl LogTypes { pub const HANDLE_CLIENT_REQUEST: &'static str = "HANDLE_CLIENT_REQUEST"; pub const HANDLE_UPSTREAM_RESPONSE: &'static str = "HANDLE_UPSTREAM_RESPONSE"; pub const HEALTHCHECK: &'static str = "HEALTHCHECK"; -} \ No newline at end of file + pub const INFLUXDB: &'static str = "INFLUXDB"; + pub const AUTHENTICATION_SERVER: &'static str = "AUTHENTICATION_SERVER"; +} + +pub struct RequestPaths; + +impl RequestPaths { + pub const PROXY: &'static str = "/proxy"; + pub const INIT_TUNNEL: &'static str = "/init-tunnel"; + pub const HEALTHCHECK: &'static str = "/healthcheck"; +} + diff --git a/forward-proxy/src/handler/mod.rs b/forward-proxy/src/handler/mod.rs index 86cbff8..9c4805c 100644 --- a/forward-proxy/src/handler/mod.rs +++ b/forward-proxy/src/handler/mod.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex}; use pingora::http::StatusCode; use reqwest::Client; use pingora_router::{ - ctx::{Layer8Context, Layer8ContextTrait}, + ctx::{Layer8Context, Layer8ContextTrait}, handler::{APIHandlerResponse, DefaultHandlerTrait, RequestBodyTrait, ResponseBodyTrait} }; use serde::Deserialize; @@ -23,7 +23,7 @@ pub mod consts; pub struct ForwardHandler { pub config: HandlerConfig, - jwts_storage: Arc>>, + jwts_storage: Arc>>, // int_fp_jwt -> IntFPSession } impl DefaultHandlerTrait for ForwardHandler {} @@ -36,6 +36,7 @@ struct NTorServerCertificate { #[derive(Clone, Debug, Default)] pub struct IntFPSession { + pub client_id: String, pub rp_base_url: String, pub fp_rp_jwt: String, } @@ -54,6 +55,8 @@ impl ForwardHandler { ctx: &mut Layer8Context, ) -> Result { + let correlation_id = ctx.get_correlation_id(); + let client = Client::new(); //todo @@ -87,7 +90,8 @@ impl ForwardHandler { error: format!("Failed to get public key from layer8, status code: {}", res.status().as_u16()), }; error!( - log_type=LogTypes::HANDLE_CLIENT_REQUEST, + %correlation_id, + log_type=LogTypes::AUTHENTICATION_SERVER, "Failed to get ntor certificate for {request_path}: {response_body:?}" ); @@ -101,11 +105,13 @@ impl ForwardHandler { #[derive(Deserialize, Debug)] struct AuthServerResponse { pub x509_certificate: String, + pub client_id: String, } - let cert: AuthServerResponse = res.json().await.map_err(|err| { + let auth_res: AuthServerResponse = res.json().await.map_err(|err| { error!( - log_type=LogTypes::HANDLE_CLIENT_REQUEST, + %correlation_id, + log_type=LogTypes::AUTHENTICATION_SERVER, "Failed to parse authentication server response: {:?}", err ); @@ -115,10 +121,14 @@ impl ForwardHandler { } })?; - let pub_key = utils::cert::extract_x509_pem(cert.x509_certificate.clone()) + // save `client_id` to ctx for later use + ctx.set(consts::CtxKeys::BACKEND_AUTH_CLIENT_ID.to_string(), auth_res.client_id.clone()); + + let pub_key = utils::cert::extract_x509_pem(auth_res.x509_certificate.clone()) .map_err(|e| { error!( - log_type=LogTypes::HANDLE_CLIENT_REQUEST, + %correlation_id, + log_type=LogTypes::AUTHENTICATION_SERVER, "Failed to parse x509 certificate: {:?}", e ); @@ -128,9 +138,10 @@ impl ForwardHandler { } })?; - debug!("AuthenticationServer response: {:?}", cert); + debug!(%correlation_id, "AuthenticationServer response: {:?}", auth_res); info!( - log_type=LogTypes::HANDLE_CLIENT_REQUEST, + %correlation_id, + log_type=LogTypes::AUTHENTICATION_SERVER, "Obtained ntor credentials for backend_url: {}", backend_url ); @@ -200,11 +211,11 @@ impl ForwardHandler { debug!("Server certificate: {:?}", server_certificate); ctx.set( - consts::CtxKeys::NTorServerId.to_string(), + consts::CtxKeys::NTOR_SERVER_ID.to_string(), server_certificate.server_id, ); ctx.set( - consts::CtxKeys::NTorStaticPublicKey.to_string(), + consts::CtxKeys::NTOR_STATIC_PUBLIC_KEY.to_string(), hex::encode(server_certificate.public_key), ); } @@ -216,9 +227,9 @@ impl ForwardHandler { } pub fn handle_init_tunnel_response(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { - let ntor_server_id = ctx.get(&consts::CtxKeys::NTorServerId.to_string()).unwrap_or(&"".to_string()).clone(); + let ntor_server_id = ctx.get(&consts::CtxKeys::NTOR_SERVER_ID.to_string()).unwrap_or(&"".to_string()).clone(); let ntor_static_public_key = hex::decode( - ctx.get(&consts::CtxKeys::NTorStaticPublicKey.to_string()).clone().unwrap_or(&"".to_string()) + ctx.get(&consts::CtxKeys::NTOR_STATIC_PUBLIC_KEY.to_string()).clone().unwrap_or(&"".to_string()) ).unwrap_or_default(); let response_body = ctx.get_response_body(); @@ -226,6 +237,7 @@ impl ForwardHandler { return match utils::bytes_to_json::(response_body) { Err(e) => { error!( + correlation_id=ctx.get_correlation_id(), log_type=LogTypes::HANDLE_UPSTREAM_RESPONSE, "Error parsing RP response: {:?}", e @@ -243,6 +255,7 @@ impl ForwardHandler { }; let int_fp_session = IntFPSession { + client_id: ctx.get(&consts::CtxKeys::BACKEND_AUTH_CLIENT_ID.to_string()).unwrap_or(&"".to_string()).to_string(), rp_base_url: ctx.param("backend_url").unwrap_or(&"".to_string()).to_string(), fp_rp_jwt: res_from_rp.fp_rp_jwt, }; diff --git a/forward-proxy/src/main.rs b/forward-proxy/src/main.rs index 1523908..0b0e18a 100644 --- a/forward-proxy/src/main.rs +++ b/forward-proxy/src/main.rs @@ -1,12 +1,15 @@ mod proxy; mod handler; mod config; +mod statistics; use crate::handler::ForwardHandler; use proxy::ForwardProxy; use pingora::prelude::*; +use tokio::runtime::Runtime; use crate::config::FPConfig; use tracing::{info, debug}; +use crate::statistics::Statistics; fn load_config() -> FPConfig { // Load environment variables from .env file @@ -21,6 +24,12 @@ fn load_config() -> FPConfig { fn main() { let config = load_config(); + // let influxdb_client = InfluxDBClient::new(&config.influxdb_config); + + // Initialize the async runtime + let rt = Runtime::new().unwrap(); + rt.block_on(Statistics::init_influxdb_client(&config.influxdb_config)); + let _logger_guard = utils::log::init_logger( config.log_config.log_level.clone(), diff --git a/forward-proxy/src/proxy.rs b/forward-proxy/src/proxy.rs index 484752b..fcec0b8 100644 --- a/forward-proxy/src/proxy.rs +++ b/forward-proxy/src/proxy.rs @@ -1,3 +1,8 @@ +use crate::config::TlsConfig; +use crate::handler::ForwardHandler; +use crate::handler::consts::{CtxKeys, HeaderKeys, LogTypes, RequestPaths}; +use crate::handler::types::response::ErrorResponse; +use crate::statistics::Statistics; use async_trait::async_trait; use boring::x509::X509; use bytes::Bytes; @@ -8,17 +13,13 @@ use pingora::listeners::tls::TLS_CONF_ERR; use pingora::prelude::{HttpPeer, ProxyHttp, Session}; use pingora::upstreams::peer::PeerOptions; use pingora::utils::tls::CertKey; +use pingora_error::ErrorType; use pingora_router::ctx::{Layer8Context, Layer8ContextTrait}; +use pingora_router::handler::ResponseBodyTrait; use reqwest::header::TRANSFER_ENCODING; use std::sync::Arc; use std::time::Duration; -use pingora_error::ErrorType; use tracing::{debug, error, info}; -use pingora_router::handler::{ResponseBodyTrait}; -use crate::config::TlsConfig; -use crate::handler::consts::{HeaderKeys, LogTypes}; -use crate::handler::{consts, ForwardHandler}; -use crate::handler::types::response::ErrorResponse; pub struct ForwardProxy { tls_config: TlsConfig, @@ -62,41 +63,55 @@ impl ProxyHttp for ForwardProxy { // // Code below is for step 4(this is a client to RP), presenting the client's TLS certificate. - let addrs = ctx.get(&consts::CtxKeys::UpstreamAddress.to_string()).unwrap_or(&"".to_string()).clone(); - let sni = ctx.get(&consts::CtxKeys::UpstreamSNI.to_string()).unwrap_or(&"".to_string()).clone(); + let correlation_id = ctx.get_correlation_id(); + + let addrs = ctx + .get(&CtxKeys::UPSTREAM_ADDRESS.to_string()) + .unwrap_or(&"".to_string()) + .clone(); + let sni = ctx + .get(&CtxKeys::UPSTREAM_SNI.to_string()) + .unwrap_or(&"".to_string()) + .clone(); info!( - log_type=LogTypes::UPSTREAM_CONNECT, - addresses=addrs, - sni=sni + %correlation_id, + log_type = LogTypes::UPSTREAM_CONNECT, + addresses = addrs, + sni = sni ); // HttpPeer cannot connect to upstream without a valid socket(IP:PORT) address. // A dns name can resolve to multiple socket addresses. // We will try to connect to each address until one succeeds. let mut address_list: Vec<&str> = addrs.split(',').collect(); + let enable_tls = self.tls_config.enable_tls; // clone for move into closure + let upstream_sni = sni.to_string(); // clone for move into closure let mut opt_peer = None; for addr in address_list.clone() { - match std::panic::catch_unwind(|| { - HttpPeer::new(addr, self.tls_config.enable_tls, sni.to_string()) - }) { + match std::panic::catch_unwind(|| HttpPeer::new(addr, enable_tls, upstream_sni.clone())) + { Ok(p) => { info!( - log_type=LogTypes::UPSTREAM_CONNECT, - "Created HttpPeer for addr: {}", - addr + %correlation_id, + log_type = LogTypes::UPSTREAM_CONNECT, + "Created HttpPeer for addr: {}", addr ); opt_peer = Some(p); break; } Err(err) => { error!( - log_type=LogTypes::UPSTREAM_CONNECT, + %correlation_id, + log_type = LogTypes::UPSTREAM_CONNECT, "Panic occurred while creating HttpPeer for addr: {}, error: {:?}", addr, err ); address_list.retain(|&x| x != addr); - ctx.set(consts::CtxKeys::UpstreamAddress.to_string(), address_list.join(",")); + ctx.set( + CtxKeys::UPSTREAM_ADDRESS.to_string(), + address_list.join(","), + ); } } } @@ -105,7 +120,8 @@ impl ProxyHttp for ForwardProxy { Some(p) => p, None => { error!( - log_type=LogTypes::UPSTREAM_CONNECT, + %correlation_id, + log_type = LogTypes::UPSTREAM_CONNECT, "Failed to create HttpPeer for any socket address" ); return Err(Error::new(ErrorType::ConnectError)); @@ -119,8 +135,9 @@ impl ProxyHttp for ForwardProxy { let ca_cert = X509::from_pem(&self.tls_config.ca_cert.clone().into_bytes()) .or_err(TLS_CONF_ERR, "Failed to load CA certificate")?; - let key = boring::pkey::PKey::private_key_from_pem(&self.tls_config.key.clone().into_bytes()) - .or_err(TLS_CONF_ERR, "Failed to load private key")?; + let key = + boring::pkey::PKey::private_key_from_pem(&self.tls_config.key.clone().into_bytes()) + .or_err(TLS_CONF_ERR, "Failed to load private key")?; // The certificate to present in mTLS connections to upstream // The organization implementing mTLS acts as its own certificate authority. @@ -151,16 +168,15 @@ impl ProxyHttp for ForwardProxy { { // create Context ctx.update(session).await?; - let request_id = session.req_header().headers.get("x-request-id").map(|v| v.to_str().unwrap_or("")).unwrap_or(""); + let correlation_id = ctx.set_correlation_id(); + info!( - log_type=LogTypes::ACCESS_LOG, - client_ip=ctx.request.summary.host, - request_summary=session.request_summary(), - // status=resp.status, - // duration_ms=duration_ms, - // bytes_out=resp.body.len(), - user_agent=ctx.request.header.get("User-Agent"), - request_id=request_id, + %correlation_id, + log_type = LogTypes::ACCESS_LOG, + request_summary = session.request_summary(), + origin = ctx.request.header.get("origin"), + referer = ctx.request.header.get("referer"), + user_agent = ctx.request.header.get("User-Agent"), ); match session.req_header().method { @@ -181,9 +197,9 @@ impl ProxyHttp for ForwardProxy { let mut error_response_bytes: Vec = vec![]; match ( session.req_header().uri.path(), - session.req_header().method.as_str() + session.req_header().method.as_str(), ) { - ("/healthcheck", "GET") => { + (RequestPaths::HEALTHCHECK, "GET") => { let handler_response = self.handler.handle_healthcheck(ctx); let mut header = ResponseHeader::build(handler_response.status, None)?; let response_headers = header.headers.clone(); @@ -192,14 +208,16 @@ impl ProxyHttp for ForwardProxy { .insert_header(key.clone(), val.clone()) .map_err(|e| { error!( - log_type=LogTypes::HEALTHCHECK, + %correlation_id, + log_type = LogTypes::HEALTHCHECK, "Cannot add request header {}:{:?}, err: {:?}", key.clone(), val.clone(), e ) - }).unwrap_or_default(); - }; + }) + .unwrap_or_default(); + } let mut response_bytes = vec![]; if let Some(body_bytes) = handler_response.body { @@ -211,12 +229,6 @@ impl ProxyHttp for ForwardProxy { session.write_response_header_ref(&header).await?; - debug!( - log_type=LogTypes::HEALTHCHECK, - request_summary=session.request_summary(), - response_body=utils::bytes_to_string(&response_bytes) - ); - // Write the response body to the session after setting headers session .write_response_body(Some(Bytes::from(response_bytes)), true) @@ -224,69 +236,67 @@ impl ProxyHttp for ForwardProxy { return Ok(true); } - ("/init-tunnel", "POST") => { + (RequestPaths::INIT_TUNNEL, "POST") => { if let Some(url) = ctx.param("backend_url") { if let Some(url) = utils::validate_url(url) { let socket_addr = utils::get_socket_addrs(&url); + ctx.set(CtxKeys::UPSTREAM_ADDRESS.to_string(), socket_addr); ctx.set( - consts::CtxKeys::UpstreamAddress.to_string(), - socket_addr, - ); - ctx.set( - consts::CtxKeys::UpstreamSNI.to_string(), + CtxKeys::UPSTREAM_SNI.to_string(), url.domain().unwrap_or_default().to_string(), ); } else { error_response_bytes = ErrorResponse { - error: "Invalid backend_url".to_string() - }.to_bytes(); + error: "Invalid backend_url".to_string(), + } + .to_bytes(); } } else { error_response_bytes = ErrorResponse { - error: "backend_url is a required param".to_string() - }.to_bytes(); + error: "backend_url is a required param".to_string(), + } + .to_bytes(); } } - ("/proxy", "POST") => { - error_response_bytes = match ctx.get_request_header().get(HeaderKeys::IntFpJwtKey.as_str()) { - None => { - ErrorResponse { - error: "Missing int_fp_jwt header".to_string() - }.to_bytes() + (RequestPaths::PROXY, "POST") => { + error_response_bytes = match ctx.get_request_header().get(HeaderKeys::INT_FP_JWT) { + None => ErrorResponse { + error: "Missing int_fp_jwt header".to_string(), } - Some(int_fp_jwt) => { - match self.handler.verify_int_fp_jwt(int_fp_jwt.as_str()) { - Ok(session) => { - debug!("IntFPSession: {:?}", session); - ctx.set(consts::CtxKeys::FpRpJwt.to_string(), session.fp_rp_jwt); - - if let Some(url) = utils::validate_url(&session.rp_base_url) { - let socket_addr = utils::get_socket_addrs(&url); - ctx.set( - consts::CtxKeys::UpstreamAddress.to_string(), - socket_addr, - ); - ctx.set( - consts::CtxKeys::UpstreamSNI.to_string(), - url.domain().unwrap_or_default().to_string(), - ); - vec![] - } else { - ErrorResponse { - error: "Invalid backend_url".to_string() - }.to_bytes() - } - } - Err(err) => { - error!( - log_type=LogTypes::HANDLE_CLIENT_REQUEST, - "Error verifying int_fp_jwt: {}", - err + .to_bytes(), + Some(int_fp_jwt) => match self.handler.verify_int_fp_jwt(int_fp_jwt.as_str()) { + Ok(session) => { + debug!(%correlation_id, "IntFPSession: {:?}", session); + ctx.set(CtxKeys::FP_RP_JWT.to_string(), session.fp_rp_jwt); + ctx.set( + CtxKeys::BACKEND_AUTH_CLIENT_ID.to_string(), + session.client_id, + ); + + if let Some(url) = utils::validate_url(&session.rp_base_url) { + let socket_addr = utils::get_socket_addrs(&url); + ctx.set(CtxKeys::UPSTREAM_ADDRESS.to_string(), socket_addr); + ctx.set( + CtxKeys::UPSTREAM_SNI.to_string(), + url.domain().unwrap_or_default().to_string(), ); - ErrorResponse { error: err }.to_bytes() + vec![] + } else { + ErrorResponse { + error: "Invalid backend_url".to_string(), + } + .to_bytes() } } - } + Err(err) => { + error!( + %correlation_id, + log_type = LogTypes::HANDLE_CLIENT_REQUEST, + "Error verifying int_fp_jwt: {}", err + ); + ErrorResponse { error: err }.to_bytes() + } + }, } } _ => { @@ -303,7 +313,9 @@ impl ProxyHttp for ForwardProxy { ctx.set_response_body(error_response_bytes.clone()); let header = ResponseHeader::build(StatusCode::BAD_REQUEST, None)?; session.write_response_header_ref(&header).await?; - session.write_response_body(Some(Bytes::from(error_response_bytes)), true).await?; + session + .write_response_body(Some(Bytes::from(error_response_bytes)), true) + .await?; session.set_keepalive(None); return Ok(true); } @@ -328,13 +340,12 @@ impl ProxyHttp for ForwardProxy { } if end_of_stream { - debug!( - request_summary=session.request_summary(), - request_body=utils::bytes_to_string(&ctx.get_request_body()), - ); + let correlation_id = ctx.get_correlation_id(); + info!( - log_type=LogTypes::HANDLE_CLIENT_REQUEST, - request_summary=session.request_summary(), + %correlation_id, + log_type = LogTypes::HANDLE_CLIENT_REQUEST, + request_summary = session.request_summary(), "Request Body Received: {} bytes.", &ctx.get_request_body().len() ); @@ -342,11 +353,12 @@ impl ProxyHttp for ForwardProxy { // This is the last chunk, we can process the data now let handler_response = match session.req_header().uri.path() { - "/init-tunnel" => self.handler.handle_init_tunnel_request(ctx).await, + RequestPaths::INIT_TUNNEL => self.handler.handle_init_tunnel_request(ctx).await, _ => { info!( - log_type=LogTypes::HANDLE_CLIENT_REQUEST, - request_summary=session.request_summary(), + %correlation_id, + log_type = LogTypes::HANDLE_CLIENT_REQUEST, + request_summary = session.request_summary(), "Forward proxy passing through request body unchanged." ); *body = Some(Bytes::copy_from_slice(ctx.get_request_body().as_slice())); @@ -356,24 +368,28 @@ impl ProxyHttp for ForwardProxy { if handler_response.status != StatusCode::OK { error!( - log_type=LogTypes::HANDLE_CLIENT_REQUEST, - request_summary=session.request_summary(), + %correlation_id, + log_type = LogTypes::HANDLE_CLIENT_REQUEST, + request_summary = session.request_summary(), "Failed to handle init-tunnel request with status: {}, error: {}", handler_response.status, utils::bytes_to_string(&handler_response.body.unwrap_or_default()) ); - return Err(pingora::Error::new( - pingora::ErrorType::HTTPStatus(u16::from(handler_response.status)), - )); + return Err(pingora::Error::new(pingora::ErrorType::HTTPStatus( + u16::from(handler_response.status), + ))); } info!( - log_type=LogTypes::HANDLE_CLIENT_REQUEST, - request_summary=session.request_summary(), - "Handle init-tunnel Request response with status: {}", handler_response.status, + %correlation_id, + log_type = LogTypes::HANDLE_CLIENT_REQUEST, + request_summary = session.request_summary(), + "Handle init-tunnel Request response with status: {}", + handler_response.status, ); debug!( - request_summary=session.request_summary(), + %correlation_id, + request_summary = session.request_summary(), "Handle init-tunnel response body: {}", utils::bytes_to_string(&handler_response.body.as_ref().unwrap_or(&vec![])) ); @@ -389,71 +405,72 @@ impl ProxyHttp for ForwardProxy { &self, session: &mut Session, upstream_request: &mut RequestHeader, - _ctx: &mut Self::CTX, + ctx: &mut Self::CTX, ) -> pingora::Result<()> where Self::CTX: Send + Sync, { + let correlation_id = ctx.get_correlation_id(); + match session.req_header().uri.path() { - "/proxy" => { - match upstream_request.headers.get(HeaderKeys::IntFpJwtKey.as_str()) { - None => { + RequestPaths::PROXY => match upstream_request.headers.get(HeaderKeys::INT_FP_JWT) { + None => { + error!( + %correlation_id, + log_type = LogTypes::HANDLE_CLIENT_REQUEST, + request_summary = session.request_summary(), + "Missing required header: {}", + HeaderKeys::INT_FP_JWT + ); + + return Err(pingora::Error::new(pingora::ErrorType::HTTPStatus( + u16::from(StatusCode::BAD_REQUEST), + ))); + } + Some(token) => { + let token_str = token.to_str().or_err( + pingora::ErrorType::InvalidHTTPHeader, + "Invalid header value for token", + )?; + + if token_str.is_empty() { error!( - log_type=LogTypes::HANDLE_CLIENT_REQUEST, - request_summary=session.request_summary(), - "Missing required header: {}", - HeaderKeys::IntFpJwtKey.as_str() + %correlation_id, + log_type = LogTypes::HANDLE_CLIENT_REQUEST, + request_summary = session.request_summary(), + "{} token is empty", + HeaderKeys::INT_FP_JWT ); - return Err(pingora::Error::new( - pingora::ErrorType::HTTPStatus(u16::from(StatusCode::BAD_REQUEST)), - )); + return Err(pingora::Error::new(pingora::ErrorType::HTTPStatus( + u16::from(StatusCode::BAD_REQUEST), + ))); } - Some(token) => { - let token_str = token.to_str().or_err( - pingora::ErrorType::InvalidHTTPHeader, - "Invalid header value for token", - )?; - if token_str.is_empty() { + match self.handler.verify_int_fp_jwt(token_str) { + Ok(session) => { + upstream_request + .insert_header(HeaderKeys::FP_RP_JWT, session.fp_rp_jwt) + .unwrap_or_default(); + upstream_request.remove_header(HeaderKeys::INT_FP_JWT); + } + Err(err) => { error!( - log_type=LogTypes::HANDLE_CLIENT_REQUEST, - request_summary=session.request_summary(), - "{} token is empty", - HeaderKeys::IntFpJwtKey.as_str() + %correlation_id, + log_type = LogTypes::HANDLE_CLIENT_REQUEST, + request_summary = session.request_summary(), + "Error verifying {} token: {}", + HeaderKeys::INT_FP_JWT, + err ); - - return Err(pingora::Error::new( - pingora::ErrorType::HTTPStatus(u16::from(StatusCode::BAD_REQUEST)), + return Err(pingora::Error::explain( + pingora::ErrorType::InvalidHTTPHeader, + err, )); } - - match self.handler.verify_int_fp_jwt(token_str) { - Ok(session) => { - upstream_request - .insert_header(HeaderKeys::FpRpJwtKey.as_str(), session.fp_rp_jwt) - .unwrap_or_default(); - upstream_request.remove_header(HeaderKeys::IntFpJwtKey.as_str()); - } - Err(err) => { - error!( - log_type=LogTypes::HANDLE_CLIENT_REQUEST, - request_summary=session.request_summary(), - "Error verifying {} token: {}", - HeaderKeys::IntFpJwtKey.as_str(), - err - ); - return Err( - pingora::Error::explain( - pingora::ErrorType::InvalidHTTPHeader, - err, - ) - ); - } - } } } - } + }, _ => {} } @@ -464,6 +481,10 @@ impl ProxyHttp for ForwardProxy { .unwrap_or_default(); } + upstream_request + .insert_header("x-correlation-id", correlation_id) + .unwrap_or_default(); + Ok(()) } @@ -472,8 +493,7 @@ impl ProxyHttp for ForwardProxy { _session: &mut Session, upstream_response: &mut ResponseHeader, _ctx: &mut Self::CTX, - ) -> pingora::Result<()> - { + ) -> pingora::Result<()> { upstream_response.insert_header("Access-Control-Allow-Origin", "*")?; upstream_response.insert_header("Access-Control-Allow-Methods", "POST")?; upstream_response.insert_header("Access-Control-Allow-Headers", "*")?; @@ -505,25 +525,24 @@ impl ProxyHttp for ForwardProxy { } if end_of_stream { + let correlation_id = ctx.get_correlation_id(); + // This is the last chunk, we can process the data now - debug!( - log_type=LogTypes::HANDLE_UPSTREAM_RESPONSE, - request_summary=session.request_summary(), - body=utils::bytes_to_string(&ctx.get_response_body()), - ); info!( - log_type=LogTypes::HANDLE_UPSTREAM_RESPONSE, - request_summary=session.request_summary(), + %correlation_id, + log_type = LogTypes::HANDLE_UPSTREAM_RESPONSE, + request_summary = session.request_summary(), "Response Body Received: {} bytes.", &ctx.get_response_body().len(), ); let handler_response = match session.req_header().uri.path() { - "/init-tunnel" => self.handler.handle_init_tunnel_response(ctx), + RequestPaths::INIT_TUNNEL => self.handler.handle_init_tunnel_response(ctx), _ => { info!( - log_type=LogTypes::HANDLE_UPSTREAM_RESPONSE, - request_summary=session.request_summary(), + %correlation_id, + log_type = LogTypes::HANDLE_UPSTREAM_RESPONSE, + request_summary = session.request_summary(), "Forward proxy passing through response body unchanged." ); *body = Some(Bytes::copy_from_slice(ctx.get_response_body().as_slice())); @@ -533,25 +552,26 @@ impl ProxyHttp for ForwardProxy { if handler_response.status != StatusCode::OK { error!( - log_type=LogTypes::HANDLE_UPSTREAM_RESPONSE, - request_summary=session.request_summary(), + %correlation_id, + log_type = LogTypes::HANDLE_UPSTREAM_RESPONSE, + request_summary = session.request_summary(), "Failed to handle init-tunnel Response response with status: {}, error: {}", handler_response.status, utils::bytes_to_string(&handler_response.body.unwrap_or_default()) ); ctx.response.status = StatusCode::INTERNAL_SERVER_ERROR; - return Err(pingora::Error::new( - pingora::ErrorType::HTTPStatus( - u16::from(StatusCode::INTERNAL_SERVER_ERROR) - ), - )); + return Err(pingora::Error::new(pingora::ErrorType::HTTPStatus( + u16::from(StatusCode::INTERNAL_SERVER_ERROR), + ))); } info!( - log_type=LogTypes::HANDLE_UPSTREAM_RESPONSE, - request_summary=session.request_summary(), - "Handle init-tunnel Response response with status: {}", handler_response.status, + %correlation_id, + log_type = LogTypes::HANDLE_UPSTREAM_RESPONSE, + request_summary = session.request_summary(), + "Handle init-tunnel Response response with status: {}", + handler_response.status, ); let fp_res_body = handler_response.body.as_ref().unwrap_or(&vec![]).clone(); @@ -568,19 +588,48 @@ impl ProxyHttp for ForwardProxy { where Self::CTX: Send + Sync, { + let correlation_id = ctx.get_correlation_id(); + let mut status = ctx.response.status.as_u16(); if let Some(_err) = e { status = session.response_written().unwrap().status.as_u16(); } + + if session.req_header().method.as_str() == "POST" + && (session.req_header().uri.path() == RequestPaths::PROXY + || session.req_header().uri.path() == RequestPaths::INIT_TUNNEL) + { + let client_id = ctx + .get(&CtxKeys::BACKEND_AUTH_CLIENT_ID.to_string()) + .unwrap_or(&"".to_string()) + .clone(); + let request_path = session.req_header().uri.path().to_string(); + let total_byte_transferred = + (ctx.get_request_body().len() + ctx.get_response_body().len()) as i64; + let correlation_id = correlation_id.clone(); + + tokio::spawn(async move { + Statistics::update( + client_id, + correlation_id, + request_path, + total_byte_transferred, + status, + ) + .await; + }); + } + info!( + %correlation_id, log_type=LogTypes::ACCESS_LOG_RESULT, - client_ip=ctx.request.summary.host, request_summary=session.request_summary(), + origin = ctx.request.header.get("origin"), + referer = ctx.request.header.get("referer"), status=status, - // duration_ms=session.duration_ms(), + latency_ms=ctx.get_latency_ms(), // todo: is it necessary? response_body_size=ctx.get_response_body().len(), user_agent=ctx.request.header.get("User-Agent"), - request_id=session.req_header().headers.get("x-request-id").map(|v| v.to_str().unwrap_or("")).unwrap_or(""), error=?e, ); } @@ -593,9 +642,12 @@ impl ProxyHttp for ForwardProxy { mut e: Box, ) -> Box { let mut retry = false; - if e.etype == ErrorType::ConnectTimedout || e.etype == ErrorType::ConnectError - || e.etype == ErrorType::ConnectRefused { - let mut addrs = ctx.get(&consts::CtxKeys::UpstreamAddress.to_string()) + if e.etype == ErrorType::ConnectTimedout + || e.etype == ErrorType::ConnectError + || e.etype == ErrorType::ConnectRefused + { + let mut addrs = ctx + .get(&CtxKeys::UPSTREAM_ADDRESS.to_string()) .unwrap_or(&"".to_string()) .clone(); @@ -606,12 +658,16 @@ impl ProxyHttp for ForwardProxy { retry = true; addrs = addrs[idx + 1..].to_string(); - ctx.set(consts::CtxKeys::UpstreamAddress.to_string(), addrs); + ctx.set(CtxKeys::UPSTREAM_ADDRESS.to_string(), addrs); } + error!( - log_type=LogTypes::UPSTREAM_CONNECT, + correlation_id = ctx.get_correlation_id(), + log_type = LogTypes::UPSTREAM_CONNECT, "Failed to connect to upstream addr: {}, err: {}, retry: {}", - peer._address.to_string(), e, retry + peer._address.to_string(), + e, + retry ); } e.set_retry(retry); diff --git a/forward-proxy/src/statistics/influxdb_client.rs b/forward-proxy/src/statistics/influxdb_client.rs new file mode 100644 index 0000000..5cff04b --- /dev/null +++ b/forward-proxy/src/statistics/influxdb_client.rs @@ -0,0 +1,120 @@ +use crate::config::InfluxDBConfig; +use crate::handler::consts::RequestPaths; +use crate::statistics::InfluxDBMeasurements; +use futures::stream; +use influxdb2::Client; +use influxdb2::models::DataPoint; +use pingora::http::StatusCode; +use std::error::Error; + +pub struct InfluxDBClient { + client: Client, + bucket: String, +} + +impl InfluxDBClient { + pub fn new(config: &InfluxDBConfig) -> Self { + let influxdb_client = Client::new( + &config.influxdb_url, + &config.influxdb_org, + &config.influxdb_auth_token, + ); + InfluxDBClient { + client: influxdb_client, + bucket: config.influxdb_bucket.clone(), + } + } + + pub async fn update_statistics( + &self, + client_id: String, + request_path: String, + total_byte_transferred: i64, + response_status: u16, + ) -> Result<(), Box> { + self.increase_total_request(&client_id).await?; + + if response_status == StatusCode::OK { + return match request_path.as_str() { + RequestPaths::PROXY => { + self.add_total_byte_transferred(&client_id, total_byte_transferred) + .await?; + + self.increase_total_success(&client_id).await + } + RequestPaths::INIT_TUNNEL => self.increase_total_tunnel_initiated(&client_id).await, + _ => Ok(()), + }; + } + + Ok(()) + } + + async fn update_counter( + &self, + measurement: &str, + client_id: &str, + value: i64, + ) -> Result<(), Box> { + // Create a data point + let point = DataPoint::builder(measurement) + .tag("client_id", client_id) + .field("counter", value) + .build() + .map_err(|e| { + Box::::from(format!( + "Failed to increase counter for {}: {:?}", + measurement, e + )) + })?; + + // Write to bucket + self.client + .write(self.bucket.as_str(), stream::iter(vec![point])) + .await + .map_err(|e| { + Box::::from(format!( + "Failed to write counter for {}: {:?}", + measurement, e + )) + })?; + Ok(()) + } + + async fn add_total_byte_transferred( + &self, + client_id: &str, + bytes_size: i64, + ) -> Result<(), Box> { + self.update_counter( + InfluxDBMeasurements::TOTAL_BYTE_TRANSFERRED, + client_id, + bytes_size, + ) + .await + } + + async fn increase_total_tunnel_initiated( + &self, + client_id: &str, + ) -> Result<(), Box> { + self.update_counter(InfluxDBMeasurements::TOTAL_TUNNEL_INITIATED, client_id, 1) + .await + } + + async fn increase_total_request( + &self, + client_id: &str, + ) -> Result<(), Box> { + self.update_counter(InfluxDBMeasurements::TOTAL_REQUEST, client_id, 1) + .await + } + + async fn increase_total_success( + &self, + client_id: &str, + ) -> Result<(), Box> { + self.update_counter(InfluxDBMeasurements::TOTAL_SUCCESS, client_id, 1) + .await + } +} diff --git a/forward-proxy/src/statistics/mod.rs b/forward-proxy/src/statistics/mod.rs new file mode 100644 index 0000000..ec9f22b --- /dev/null +++ b/forward-proxy/src/statistics/mod.rs @@ -0,0 +1,57 @@ +mod influxdb_client; + +use crate::config::InfluxDBConfig; +use crate::handler::consts::LogTypes; +use crate::statistics::influxdb_client::InfluxDBClient; +use futures::TryFutureExt; +use once_cell::sync::Lazy; +use tokio::sync::Mutex; +use tracing::error; + +struct InfluxDBMeasurements; + +impl InfluxDBMeasurements { + const TOTAL_BYTE_TRANSFERRED: &'static str = "total_byte_transferred"; + const TOTAL_TUNNEL_INITIATED: &'static str = "total_tunnel_initiated"; + const TOTAL_SUCCESS: &'static str = "total_success"; + const TOTAL_REQUEST: &'static str = "total_request"; +} + +static INFLUXDB_CLIENT: Lazy>> = Lazy::new(|| Mutex::new(None)); + +pub struct Statistics; + +impl Statistics { + pub async fn init_influxdb_client(config: &InfluxDBConfig) { + let mut influxdb_client = INFLUXDB_CLIENT.lock().await; + *influxdb_client = Some(InfluxDBClient::new(&config)); + } + + pub async fn update( + client_id: String, + correlation_id: String, + request_path: String, + total_byte_transferred: i64, + response_status: u16, + ) { + let client = INFLUXDB_CLIENT.lock().await; + if let Some(ref influxdb_client) = *client { + influxdb_client + .update_statistics( + client_id, + request_path, + total_byte_transferred, + response_status, + ) + .map_err(|e| { + error!( + %correlation_id, + log_type = LogTypes::INFLUXDB, + "Failed to update statistics: {:?}", e + ); + }) + .await + .ok(); + } + } +} diff --git a/pingora-router/Cargo.toml b/pingora-router/Cargo.toml index 3c4aeff..6be7d3e 100644 --- a/pingora-router/Cargo.toml +++ b/pingora-router/Cargo.toml @@ -8,3 +8,5 @@ futures = "0.3.31" pingora = { version = "0.5.0", features = ["lb", "boringssl"] } serde = "1.0.219" serde_json = "1.0.140" +chrono = "0.4.40" +uuid = "1.16.0" \ No newline at end of file diff --git a/pingora-router/src/ctx.rs b/pingora-router/src/ctx.rs index 45e73f9..1426926 100644 --- a/pingora-router/src/ctx.rs +++ b/pingora-router/src/ctx.rs @@ -1,7 +1,9 @@ use std::collections::HashMap; +use std::time::Instant; use pingora::http::{Method, RequestHeader, StatusCode}; use pingora::proxy::Session; use crate::utils::get_request_body; +use uuid; /* * Each type in this crate serves a specific purpose and may be updated as requirements evolve. @@ -83,7 +85,7 @@ pub struct Layer8ContextResponse { /// This struct is designed to provide a unified interface for accessing and modifying /// request and response data, as well as sharing state across middleware and handlers. /// All fields are private and should be accessed or modified only through dedicated `get` and `set` methods. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct Layer8Context { /// `request`: contains all relevant request information needed for processing and handler access pub request: Layer8ContextRequest, @@ -94,6 +96,18 @@ pub struct Layer8Context { /// during request processing. /// Accessed via `get(&self, key: &str)` and `set(&mut self, key: String, value: String)` methods memory: HashMap, + pub latency_start: Instant, // todo: remove if not needed +} + +impl Default for Layer8Context { + fn default() -> Self { + Self { + request: Default::default(), + response: Default::default(), + memory: Default::default(), + latency_start: Instant::now(), + } + } } impl Layer8Context { @@ -190,6 +204,30 @@ impl Layer8ContextTrait for Layer8Context { fn set_request_summary(&mut self, summary: Layer8ContextRequestSummary) { self.request.summary = summary } + + fn set_correlation_id(&mut self) -> String { + let correlation_id: String; + if let Some(cid) = self.get_request_header().get("x-correlation-id") { + correlation_id = cid.clone(); + } else if let Some(cid) = self.get_request_header().get("x-request-id") { + correlation_id = cid.clone(); + } else { + correlation_id = uuid::Uuid::new_v4().to_string(); + } + + self.set("x-correlation-id".to_string(), correlation_id.clone()); + correlation_id + } + + fn get_correlation_id(&self) -> String { + self.get("x-correlation-id") + .unwrap_or(&"".to_string()) + .clone() + } + + fn get_latency_ms(&self) -> i64 { + self.latency_start.elapsed().as_nanos() as i64 + } } /// This trait appears to be redundant and could potentially be removed, @@ -214,6 +252,9 @@ pub trait Layer8ContextTrait { fn get(&self, key: &str) -> Option<&String>; fn set(&mut self, key: String, value: String); fn set_request_summary(&mut self, summary: Layer8ContextRequestSummary); + fn set_correlation_id(&mut self) -> String; + fn get_correlation_id(&self) -> String; + fn get_latency_ms(&self) -> i64; } /// `Layer8Header` is a type alias for a map of HTTP header key-value pairs used diff --git a/reverse-proxy/.env.docker b/reverse-proxy/.env.docker index f469bed..4e740df 100644 --- a/reverse-proxy/.env.docker +++ b/reverse-proxy/.env.docker @@ -7,7 +7,8 @@ PATH_TO_SERVER_CONF=../server_conf.yml LOG_LEVEL=trace # default to "json" if not "plain" LOG_FORMAT=plain -# "console" or folder path +# can be set to 'console' or a file path +#LOG_PATH=/var/log/layer8 LOG_PATH=console # leave empty if LOG_PATH is console LOG_FILENAME=reverse-proxy.log diff --git a/reverse-proxy/Dockerfile b/reverse-proxy/Dockerfile index 8d7d21f..dd7efd3 100644 --- a/reverse-proxy/Dockerfile +++ b/reverse-proxy/Dockerfile @@ -20,7 +20,6 @@ RUN apt-get update && apt-get install -y \ WORKDIR /usr/src/app # Copy source code -COPY ./certs ./certs COPY ./pingora-router ./pingora-router COPY ./utils ./utils COPY ./reverse-proxy ./reverse-proxy @@ -44,7 +43,6 @@ RUN useradd -m layer8 # Copy only the built binary from the builder stage COPY --from=builder /usr/src/app/reverse-proxy/target/release/reverse-proxy /usr/local/bin/reverse-proxy -COPY --from=builder /usr/src/app/certs /usr/local/certs # Switch to non-root user USER layer8 diff --git a/reverse-proxy/src/handler/common/consts.rs b/reverse-proxy/src/handler/common/consts.rs index 9ec7035..e85559a 100644 --- a/reverse-proxy/src/handler/common/consts.rs +++ b/reverse-proxy/src/handler/common/consts.rs @@ -1,16 +1,8 @@ -// can be replaced by constants, will see -pub enum HeaderKeys { - FpRpJwtKey, - IntRpJwtKey, -} +pub struct HeaderKeys; impl HeaderKeys { - pub fn as_str(&self) -> &'static str { - match self { - HeaderKeys::FpRpJwtKey => "fp_rp_jwt", - HeaderKeys::IntRpJwtKey => "int_rp_jwt" - } - } + pub const FP_RP_JWT: &'static str = "fp_rp_jwt"; + pub const INT_RP_JWT_KEY: &'static str = "int_rp_jwt"; } pub struct LogTypes; diff --git a/reverse-proxy/src/handler/mod.rs b/reverse-proxy/src/handler/mod.rs index 7e6fa09..9a4dec3 100644 --- a/reverse-proxy/src/handler/mod.rs +++ b/reverse-proxy/src/handler/mod.rs @@ -61,6 +61,8 @@ impl ReverseHandler { } pub async fn handle_init_tunnel(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { + let correlation_id = ctx.get_correlation_id(); + // validate request body let request_body = match InitTunnelHandler::validate_request_body( ctx, @@ -69,11 +71,6 @@ impl ReverseHandler { Ok(res) => res, Err(res) => return res }; - debug!( - client_ip=ctx.request.summary.host, - "Parsed body: {:?}", - request_body - ); // todo I think there are prettier ways to use nTor since we are free to modify the nTor crate, but I'm lazy let mut ntor_server = NTorServer::new_with_secret( @@ -116,8 +113,8 @@ impl ReverseHandler { // InitTunnelHandler::send_result_to_be(self.config.backend_url.clone(), true).await; info!( + %correlation_id, log_type=LogTypes::HANDLE_INIT_TUNNEL_REQUEST, - client_ip=ctx.request.summary.host, "Save new nTor session: {}", ntor_session_id ); @@ -133,6 +130,7 @@ impl ReverseHandler { } pub async fn handle_proxy_request(&self, ctx: &mut Layer8Context) -> APIHandlerResponse { + let correlation_id = ctx.get_correlation_id(); // validate request headers (nTor session ID) let session_id = match ProxyHandler::validate_request_headers(ctx, &self.jwt_secret) { @@ -161,17 +159,14 @@ impl ReverseHandler { }; info!( + %correlation_id, log_type=LogTypes::HANDLE_INIT_TUNNEL_REQUEST, - client_ip=ctx.request.summary.host, "Decrypted request body and forward to backend", ); - debug!( - client_ip=ctx.request.summary.host, - "Decrypted request: {:?}", wrapped_request - ); // reconstruct user request let wrapped_response = match ProxyHandler::rebuild_user_request( + ctx, self.config.backend_url.clone(), wrapped_request, ).await { @@ -179,12 +174,6 @@ impl ReverseHandler { Err(res) => return res, }; - debug!( - client_ip=ctx.request.summary.host, - "Wrapped Backend response: {:?}", - wrapped_response - ); - return match ProxyHandler::encrypt_response_body( wrapped_response, self.config.ntor_server_id.clone(), diff --git a/reverse-proxy/src/handler/proxy/handler.rs b/reverse-proxy/src/handler/proxy/handler.rs index 3dace61..0b78a5a 100644 --- a/reverse-proxy/src/handler/proxy/handler.rs +++ b/reverse-proxy/src/handler/proxy/handler.rs @@ -21,15 +21,15 @@ impl ProxyHandler { fn validate_jwt_token( ctx: &mut Layer8Context, - header_key: HeaderKeys, + header_key: &str, jwt_secret: &Vec ) -> Result { - match ctx.get_request_header().get(header_key.as_str()) { + match ctx.get_request_header().get(header_key) { None => { return Err(APIHandlerResponse { status: StatusCode::BAD_REQUEST, body: Some(ErrorResponse { - error: format!("Missing {} header", header_key.as_str()), + error: format!("Missing {} header", header_key.to_string()), }.to_bytes()), }); }, @@ -38,7 +38,7 @@ impl ProxyHandler { return Err(APIHandlerResponse { status: StatusCode::BAD_REQUEST, body: Some(ErrorResponse { - error: format!("Empty {} header", header_key.as_str()), + error: format!("Empty {} header", header_key.to_string()), }.to_bytes()), }); } @@ -48,9 +48,10 @@ impl ProxyHandler { Ok(data) => Ok(data.claims), Err(err) => { error!( + correlation_id=ctx.get_correlation_id(), log_type=LogTypes::HANDLE_PROXY_REQUEST, "Error verifying {} token: {:?}", - header_key.as_str(), + header_key, err ); Err(APIHandlerResponse { @@ -72,14 +73,14 @@ impl ProxyHandler { ) -> Result { // verify fp_rp_jwt header - match ProxyHandler::validate_jwt_token(ctx, HeaderKeys::FpRpJwtKey, jwt_secret) { + match ProxyHandler::validate_jwt_token(ctx, HeaderKeys::FP_RP_JWT, jwt_secret) { Ok(_claims) => { // todo!() nothing to validate at the moment } Err(err) => return Err(err) } - return match ProxyHandler::validate_jwt_token(ctx, HeaderKeys::IntRpJwtKey, jwt_secret) { + return match ProxyHandler::validate_jwt_token(ctx, HeaderKeys::INT_RP_JWT_KEY, jwt_secret) { Ok(claims) => { // extract ntor_session_id from claims match claims.ntor_session_id { @@ -100,6 +101,8 @@ impl ProxyHandler { ctx: &mut Layer8Context ) -> Result { + let correlation_id = ctx.get_correlation_id(); + match ProxyHandler::parse_request_body::< EncryptedMessage, ErrorResponse @@ -110,6 +113,7 @@ impl ProxyHandler { None => None, Some(err_response) => { error!( + %correlation_id, log_type=LogTypes::HANDLE_PROXY_REQUEST, "Error parsing request body: {}", err_response.error @@ -159,13 +163,16 @@ impl ProxyHandler { } pub(crate) async fn rebuild_user_request( + ctx: &Layer8Context, backend_url: String, wrapped_request: L8RequestObject ) -> Result { + let correlation_id = ctx.get_correlation_id(); let header_map = utils::hashmap_to_headermap(&wrapped_request.headers) .unwrap_or_else(|_| HeaderMap::new()); debug!( + %correlation_id, log_type=LogTypes::HANDLE_PROXY_REQUEST, backend_url=backend_url.as_str(), "Reconstructed request headers: {:?}", @@ -173,18 +180,13 @@ impl ProxyHandler { ); let origin_url = format!("{}{}", backend_url, wrapped_request.uri); - debug!( - log_type=LogTypes::HANDLE_PROXY_REQUEST, - backend_url=backend_url.as_str(), - "Origin request URL: {}", - origin_url - ); let client = Client::new(); info!( + %correlation_id, log_type=LogTypes::HANDLE_PROXY_REQUEST, - "Send reconstructed request to backend: {}", - origin_url.as_str() + "Send reconstructed request to origin backend URL: {}", + origin_url ); let response = client.request( wrapped_request.method.parse().unwrap_or_default(), @@ -210,16 +212,12 @@ impl ProxyHandler { let serialized_body = success_res.bytes().await.unwrap_or_default().to_vec(); info!( + %correlation_id, log_type=LogTypes::HANDLE_BACKEND_RESPONSE, "Received response from backend: status={}, url={}", status, url.as_str() ); - debug!( - "Response from backend headers: {:?}, body: {}", - serialized_headers, - utils::bytes_to_string(&serialized_body) - ); Ok(L8ResponseObject { status, @@ -233,6 +231,7 @@ impl ProxyHandler { } Err(err) => { error!( + %correlation_id, log_type=LogTypes::HANDLE_PROXY_REQUEST, "Error while building request to BE: {:?}", err diff --git a/reverse-proxy/src/proxy.rs b/reverse-proxy/src/proxy.rs index 1248904..00a137c 100644 --- a/reverse-proxy/src/proxy.rs +++ b/reverse-proxy/src/proxy.rs @@ -46,7 +46,9 @@ impl ReverseProxy { .insert_header("Access-Control-Max-Age", "86400") .unwrap_or_default(); + let correlation_id = ctx.get_correlation_id(); info!( + %correlation_id, log_type=LogTypes::HANDLE_BACKEND_RESPONSE, "Response Headers: {:?}", header.headers @@ -80,22 +82,20 @@ impl ProxyHttp for ReverseProxy { { // create Context ctx.update(session).await?; - ctx.read_request_body(session).await?; - let request_summary = session.request_summary(); - let request_id = session.req_header().headers.get("x-request-id").map(|v| v.to_str().unwrap_or("")).unwrap_or(""); + + let correlation_id = ctx.set_correlation_id(); + info!( + %correlation_id, log_type=LogTypes::ACCESS_LOG, - client_ip=ctx.request.summary.host, - request_summary=session.request_summary(), - user_agent=ctx.request.header.get("User-Agent"), - request_id=request_id, - ); - debug!( - request_summary=request_summary, - "Decoded request body: {}", - String::from_utf8_lossy(&*ctx.get_request_body()) + request_summary = session.request_summary(), + origin = ctx.request.header.get("origin"), + referer = ctx.request.header.get("referer"), + user_agent = ctx.request.header.get("User-Agent"), ); + ctx.read_request_body(session).await?; + let handler_response = self.router.call_handler(ctx).await; if handler_response.status == StatusCode::NOT_FOUND && handler_response.body.is_none() { let header = ResponseHeader::build(StatusCode::NOT_FOUND, None)?; @@ -111,12 +111,6 @@ impl ProxyHttp for ReverseProxy { }; ReverseProxy::::set_headers(session, ctx, handler_response.status).await?; - debug!( - request_summary=request_summary, - "Response Body: {}", - String::from_utf8_lossy(&*response_bytes) - ); - // Write the response body to the session after setting headers session.write_response_body(Some(Bytes::from(response_bytes)), true).await?; @@ -134,15 +128,18 @@ impl ProxyHttp for ReverseProxy { status = session.response_written().unwrap().status.as_u16(); } + let correlation_id = ctx.get_correlation_id(); + info!( + %correlation_id, log_type=LogTypes::ACCESS_LOG_RESULT, - client_ip=ctx.request.summary.host, request_summary=session.request_summary(), + origin = ctx.request.header.get("origin"), + referer = ctx.request.header.get("referer"), status=status, - // duration_ms=session.duration_ms(), + latency_ms=ctx.get_latency_ms(), // todo: is it necessary? response_body_size=ctx.get_response_body().len(), user_agent=ctx.request.header.get("User-Agent"), - request_id=session.req_header().headers.get("x-request-id").map(|v| v.to_str().unwrap_or("")).unwrap_or(""), error=?e, ); } diff --git a/utils/src/log.rs b/utils/src/log.rs index 3bb004f..94ec078 100644 --- a/utils/src/log.rs +++ b/utils/src/log.rs @@ -1,3 +1,4 @@ +use tracing_appender::non_blocking::WorkerGuard; use tracing_appender::rolling; use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::fmt; @@ -8,17 +9,9 @@ pub fn init_logger( log_format: String, log_folder: String, log_file: String, -) -> Option { +) -> Option { let level_filter = to_level_filter(level); - // Structured JSON logger - let builder = fmt::Subscriber::builder() - .with_max_level(level_filter) - .with_target(true) - .with_file(true) // ✅ include file path - .with_line_number(true) // ✅ include line number - .with_ansi(true); - // Dynamic writer let (writer, guard): (BoxMakeWriter, Option) = if log_folder == "console" { @@ -30,19 +23,24 @@ pub fn init_logger( (BoxMakeWriter::new(non_blocking), Some(guard)) }; - // Build with chosen writer - let builder = builder.with_writer(writer); + // Structured JSON logger + let builder = fmt::Subscriber::builder() + .with_max_level(level_filter) + .with_target(true) + .with_file(true) // ✅ include file path + .with_line_number(true) // ✅ include line number + .with_ansi(true) + .with_writer(writer); if log_format.to_lowercase() != "plain" { builder .json() .with_current_span(true) - .with_current_span(true) .flatten_event(true) .init(); } else { - builder.init(); - } + builder.compact().init(); + }; tracing::info!("Logger initialized"); guard