diff --git a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/README.md b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/README.md
index 3d98aed333..89e87fee82 100644
--- a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/README.md
+++ b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/README.md
@@ -22,22 +22,23 @@ The ASM Service Extension expose some configuration. The configuration can be tw
>**GCP requires that the default configuration for the Service Extension should not change.**
-| Environment variable | Default value | Description |
-|---|-----------------|---------------------------------------------------------------------------------------------------------------|
-| `DD_SERVICE_EXTENSION_HOST` | `0.0.0.0` | Host on where the gRPC and HTTP server should listen to. |
-| `DD_SERVICE_EXTENSION_PORT` | `443` | Port used by the gRPC Server.
Envoy Google backend’s is only using secure connection to Service Extension. |
-| `DD_SERVICE_EXTENSION_HEALTHCHECK_PORT` | `80` | Port used for the HTTP server for the health check. |
+| Environment variable | Default value | Description |
+|-------------------------------------------|-----------------|---------------------------------------------------------------------------------------------------------------|
+| `DD_SERVICE_EXTENSION_HOST` | `0.0.0.0` | Host on where the gRPC and HTTP server should listen to. |
+| `DD_SERVICE_EXTENSION_PORT` | `443` | Port used by the gRPC Server.
Envoy Google backend’s is only using secure connection to Service Extension. |
+| `DD_SERVICE_EXTENSION_HEALTHCHECK_PORT` | `80` | Port used for the HTTP server for the health check. |
| `DD_SERVICE_EXTENSION_OBSERVABILITY_MODE` | `false` | Enable observability mode. This will process a request asynchronously (blocking would be disabled). |
-| `DD_SERVICE_EXTENSION_TLS` | `true` | Enable the gRPC TLS layer. Do not modify if you are using GCP. |
-| `DD_SERVICE_EXTENSION_TLS_KEY_FILE` | `localhost.key` | Change the default gRPC TLS layer key. Do not modify if you are using GCP. |
-| `DD_SERVICE_EXTENSION_TLS_CERT_FILE` | `localhost.crt` | Change the default gRPC TLS layer cert. Do not modify if you are using GCP. |
+| `DD_APPSEC_BODY_PARSING_SIZE_LIMIT` | `10485760` | Maximum size of the bodies to be processed in bytes. If set to 0, the bodies are not processed. |
+| `DD_SERVICE_EXTENSION_TLS` | `true` | Enable the gRPC TLS layer. Do not modify if you are using GCP. |
+| `DD_SERVICE_EXTENSION_TLS_KEY_FILE` | `localhost.key` | Change the default gRPC TLS layer key. Do not modify if you are using GCP. |
+| `DD_SERVICE_EXTENSION_TLS_CERT_FILE` | `localhost.crt` | Change the default gRPC TLS layer cert. Do not modify if you are using GCP. |
> The Service Extension need to be connected to a deployed [Datadog agent](https://docs.datadoghq.com/agent).
-| Environment variable | Default value | Description |
-|---|---|---|
-| `DD_AGENT_HOST` | `N/A` | Host of a running Datadog Agent. |
-| `DD_TRACE_AGENT_PORT` | `8126` | Port of a running Datadog Agent. |
+| Environment variable | Default value | Description |
+|-----------------------|---------------|----------------------------------|
+| `DD_AGENT_HOST` | `N/A` | Host of a running Datadog Agent. |
+| `DD_TRACE_AGENT_PORT` | `8126` | Port of a running Datadog Agent. |
### SSL Configuration
diff --git a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/env.go b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/env.go
index d6b6fa85f0..55ef90a1a8 100644
--- a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/env.go
+++ b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/env.go
@@ -27,6 +27,21 @@ func intEnv(key string, def int) int {
return v
}
+// intEnvNil returns the parsed int value of an environment variable if it exists, or
+// return nil if unset or failed to parse.
+func intEnvNil(key string) *int {
+ vv, ok := env.Lookup(key)
+ if !ok {
+ return nil
+ }
+ v, err := strconv.Atoi(vv)
+ if err != nil {
+ log.Warn("Non-integer value for env var %s. Parse failed with error: %v", key, err)
+ return nil
+ }
+ return &v
+}
+
// IpEnv returns the valid IP value of an environment variable, or def otherwise.
func ipEnv(key string, def net.IP) net.IP {
vv, ok := env.Lookup(key)
diff --git a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main.go b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main.go
index a3ec104580..eaca1f0146 100644
--- a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main.go
+++ b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main.go
@@ -45,7 +45,7 @@ type serviceExtensionConfig struct {
extensionHost string
healthcheckPort string
observabilityMode bool
- bodyParsingSizeLimit int
+ bodyParsingSizeLimit *int
tls *tlsConfig
}
@@ -92,7 +92,7 @@ func loadConfig() serviceExtensionConfig {
healthcheckPortInt := intEnv("DD_SERVICE_EXTENSION_HEALTHCHECK_PORT", 80)
extensionHostStr := ipEnv("DD_SERVICE_EXTENSION_HOST", net.IP{0, 0, 0, 0}).String()
observabilityMode := boolEnv("DD_SERVICE_EXTENSION_OBSERVABILITY_MODE", false)
- bodyParsingSizeLimit := intEnv("DD_APPSEC_BODY_PARSING_SIZE_LIMIT", 0)
+ bodyParsingSizeLimit := intEnvNil("DD_APPSEC_BODY_PARSING_SIZE_LIMIT")
enableTLS := boolEnv("DD_SERVICE_EXTENSION_TLS", true)
keyFile := stringEnv("DD_SERVICE_EXTENSION_TLS_KEY_FILE", "localhost.key")
certFile := stringEnv("DD_SERVICE_EXTENSION_TLS_CERT_FILE", "localhost.crt")
diff --git a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main_test.go b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main_test.go
index 2808169598..aea5390f74 100644
--- a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main_test.go
+++ b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main_test.go
@@ -78,7 +78,7 @@ func TestLoadConfig_VariousCases(t *testing.T) {
healthcheckPort string
extensionHost string
observabilityMode bool
- bodyParsingSizeLimit int
+ bodyParsingSizeLimit *int
tlsEnabled bool
tlsCertFile string
tlsKeyFile string
@@ -92,7 +92,7 @@ func TestLoadConfig_VariousCases(t *testing.T) {
{
name: "defaults",
env: nil,
- want: want{"443", "80", "0.0.0.0", false, 0, true, "localhost.key", "localhost.crt"},
+ want: want{"443", "80", "0.0.0.0", false, nil, true, "localhost.crt", "localhost.key"},
},
{
name: "valid overrides",
@@ -103,7 +103,7 @@ func TestLoadConfig_VariousCases(t *testing.T) {
"DD_SERVICE_EXTENSION_OBSERVABILITY_MODE": "true",
"DD_APPSEC_BODY_PARSING_SIZE_LIMIT": "100000000",
},
- want: want{"1234", "4321", "127.0.0.1", true, 100000000, true, "localhost.key", "localhost.crt"},
+ want: want{"1234", "4321", "127.0.0.1", true, intPtr(100000000), true, "localhost.crt", "localhost.key"},
},
{
name: "bad values fall back",
@@ -114,14 +114,14 @@ func TestLoadConfig_VariousCases(t *testing.T) {
"DD_APPSEC_BODY_PARSING_SIZE_LIMIT": "notanint",
"DD_SERVICE_EXTENSION_HOST": "notanip",
},
- want: want{"443", "80", "0.0.0.0", false, 0, true, "localhost.key", "localhost.crt"},
+ want: want{"443", "80", "0.0.0.0", false, nil, true, "localhost.crt", "localhost.key"},
},
{
name: "no-tls",
env: map[string]string{
"DD_SERVICE_EXTENSION_TLS": "false",
},
- want: want{"443", "80", "0.0.0.0", false, 0, false, "localhost.key", "localhost.crt"},
+ want: want{"443", "80", "0.0.0.0", false, nil, false, "localhost.key", "localhost.crt"},
},
{
name: "custom-tls",
@@ -129,7 +129,7 @@ func TestLoadConfig_VariousCases(t *testing.T) {
"DD_SERVICE_EXTENSION_TLS_KEY_FILE": "/tls/tls.key",
"DD_SERVICE_EXTENSION_TLS_CERT_FILE": "/tls/tls.crt",
},
- want: want{"443", "80", "0.0.0.0", false, 0, true, "/tls/tls.key", "/tls/tls.crt"},
+ want: want{"443", "80", "0.0.0.0", false, nil, true, "/tls/tls.crt", "/tls/tls.key"},
},
}
@@ -156,10 +156,19 @@ func TestLoadConfig_VariousCases(t *testing.T) {
assert.Equal(t, tc.want.observabilityMode, cfg.observabilityMode, "observabilityMode")
assert.Equal(t, tc.want.bodyParsingSizeLimit, cfg.bodyParsingSizeLimit, "bodyParsingSizeLimit")
+ assert.Equal(t, tc.want.tlsEnabled, cfg.tls != nil, "tlsEnabled")
+ if cfg.tls != nil {
+ assert.Equal(t, tc.want.tlsCertFile, cfg.tls.certFile, "tlsCertFile")
+ assert.Equal(t, tc.want.tlsKeyFile, cfg.tls.keyFile, "tlsKeyFile")
+ }
})
}
}
+func intPtr(v int) *int {
+ return &v
+}
+
// Helpers
func unsetEnv(keys ...string) {
for _, k := range keys {
diff --git a/contrib/envoyproxy/go-control-plane/envoy.go b/contrib/envoyproxy/go-control-plane/envoy.go
index affaf50297..5452f22b64 100644
--- a/contrib/envoyproxy/go-control-plane/envoy.go
+++ b/contrib/envoyproxy/go-control-plane/envoy.go
@@ -9,7 +9,6 @@ import (
"context"
"errors"
"io"
- "sync/atomic"
"github.com/DataDog/dd-trace-go/v2/instrumentation"
"github.com/DataDog/dd-trace-go/v2/instrumentation/appsec/proxy"
@@ -41,20 +40,26 @@ type AppsecEnvoyConfig struct {
Integration Integration
BlockingUnavailable bool
Context context.Context
- BodyParsingSizeLimit int
+ BodyParsingSizeLimit *int
}
// appsecEnvoyExternalProcessorServer is a server that implements the Envoy ExternalProcessorServer interface.
type appsecEnvoyExternalProcessorServer struct {
envoyextproc.ExternalProcessorServer
config AppsecEnvoyConfig
- requestCounter atomic.Uint32
messageProcessor proxy.Processor
}
// AppsecEnvoyExternalProcessorServer creates a new external processor server with AAP enabled
func AppsecEnvoyExternalProcessorServer(userImplementation envoyextproc.ExternalProcessorServer, config AppsecEnvoyConfig) envoyextproc.ExternalProcessorServer {
- processor := &appsecEnvoyExternalProcessorServer{
+ switch config.Integration {
+ case GCPServiceExtensionIntegration, EnvoyIntegration, IstioIntegration, EnvoyGatewayIntegration:
+ default:
+ instr.Logger().Error("external_processing: invalid proxy integration type %d. Defaulting to GCPServiceExtensionIntegration", config.Integration)
+ config.Integration = GCPServiceExtensionIntegration
+ }
+
+ return &appsecEnvoyExternalProcessorServer{
ExternalProcessorServer: userImplementation,
config: config,
messageProcessor: proxy.NewProcessor(proxy.ProcessorConfig{
@@ -66,15 +71,6 @@ func AppsecEnvoyExternalProcessorServer(userImplementation envoyextproc.External
BlockMessageFunc: blockActionFunc,
}, instr),
}
-
- switch config.Integration {
- case GCPServiceExtensionIntegration, EnvoyIntegration, IstioIntegration, EnvoyGatewayIntegration:
- default:
- instr.Logger().Error("external_processing: invalid proxy integration type %d. Defaulting to GCPServiceExtensionIntegration", config.Integration)
- config.Integration = GCPServiceExtensionIntegration
- }
-
- return processor
}
type processServerKeyType struct{}
diff --git a/contrib/envoyproxy/go-control-plane/envoy_messages.go b/contrib/envoyproxy/go-control-plane/envoy_messages.go
index 6ee2bfab04..4151af2e66 100644
--- a/contrib/envoyproxy/go-control-plane/envoy_messages.go
+++ b/contrib/envoyproxy/go-control-plane/envoy_messages.go
@@ -87,11 +87,24 @@ func (i Integration) String() string {
}
}
+func (m messageRequestHeaders) BodyParsingSizeLimit(ctx context.Context) int {
+ switch m.component(ctx) {
+ case componentNameGCPServiceExtension:
+ return 0
+ default:
+ return proxy.DefaultBodyParsingSizeLimit
+ }
+}
+
func (m messageRequestHeaders) SpanOptions(ctx context.Context) []tracer.StartSpanOption {
+ return []tracer.StartSpanOption{tracer.Tag(ext.Component, m.component(ctx))}
+}
+
+func (m messageRequestHeaders) component(ctx context.Context) string {
// As the integration (callout container) is run by default with the GCP Service Extension value,
// we can consider that if this flag is false, it means that it is running in a custom integration.
if m.integration != GCPServiceExtensionIntegration {
- return []tracer.StartSpanOption{tracer.Tag(ext.Component, m.integration.String())}
+ return m.integration.String()
}
// In newer version of the documentation, customers are instructed to inject the
@@ -99,23 +112,24 @@ func (m messageRequestHeaders) SpanOptions(ctx context.Context) []tracer.StartSp
if md, ok := metadata.FromIncomingContext(ctx); ok {
valuesEnvoy := md.Get(datadogEnvoyIntegrationHeader)
if len(valuesEnvoy) > 0 && valuesEnvoy[0] == "1" {
- return []tracer.StartSpanOption{tracer.Tag(ext.Component, componentNameEnvoy)}
+ return componentNameEnvoy
}
valuesIstio := md.Get(datadogIntegrationHeader)
if len(valuesIstio) > 0 && valuesIstio[0] == "1" {
- return []tracer.StartSpanOption{tracer.Tag(ext.Component, componentNameIstio)}
+ return componentNameIstio
}
// We don't have the ability to add custom headers in envoy gateway EnvoyExtensionPolicy CRD.
// So we fall back to detecting if we are running in k8s or not.
// If we are running in k8s, we assume it is Envoy Gateway, otherwise GCP Service Extension.
if isK8s() {
- return []tracer.StartSpanOption{tracer.Tag(ext.Component, componentNameEnvoyGateway)}
+ return componentNameEnvoyGateway
}
}
- return []tracer.StartSpanOption{tracer.Tag(ext.Component, componentNameGCPServiceExtension)}
+ return componentNameGCPServiceExtension
+
}
type responseHeadersEnvoy struct {
diff --git a/contrib/envoyproxy/go-control-plane/envoy_test.go b/contrib/envoyproxy/go-control-plane/envoy_test.go
index 3a271129a7..fb83f3622f 100644
--- a/contrib/envoyproxy/go-control-plane/envoy_test.go
+++ b/contrib/envoyproxy/go-control-plane/envoy_test.go
@@ -38,7 +38,7 @@ func TestAppSec(t *testing.T) {
}
setup := func() (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) {
- rig, err := newEnvoyAppsecRig(t, EnvoyIntegration, false, 0)
+ rig, err := newEnvoyAppsecRig(t, GCPServiceExtensionIntegration, false, nil)
require.NoError(t, err)
mt := mocktracer.Start()
@@ -280,7 +280,8 @@ func TestAppSecBodyParsingEnabled(t *testing.T) {
}
setup := func() (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) {
- rig, err := newEnvoyAppsecRig(t, EnvoyIntegration, false, 256)
+ bodyParsingSizeLimit := 256
+ rig, err := newEnvoyAppsecRig(t, EnvoyIntegration, false, &bodyParsingSizeLimit)
require.NoError(t, err)
mt := mocktracer.Start()
@@ -649,7 +650,8 @@ func TestAppSecAPISecurityBodyParsingEnabled(t *testing.T) {
}
setup := func() (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) {
- rig, err := newEnvoyAppsecRig(t, EnvoyIntegration, false, 256)
+ bodyParsingSizeLimit := 256
+ rig, err := newEnvoyAppsecRig(t, EnvoyIntegration, false, &bodyParsingSizeLimit)
require.NoError(t, err)
mt := mocktracer.Start()
@@ -761,9 +763,183 @@ func TestAppSecAPISecurityBodyParsingEnabled(t *testing.T) {
})
}
+func TestAppSecBodyParsingActivation(t *testing.T) {
+ t.Setenv("DD_APPSEC_RULES", "../../../internal/appsec/testdata/user_rules.json")
+ t.Setenv("DD_APPSEC_WAF_TIMEOUT", "10ms")
+
+ testutils.StartAppSec(t)
+ if !instr.AppSecEnabled() {
+ t.Skip("appsec disabled")
+ }
+
+ setup := func(integration Integration, bodyParsingSizeLimit *int) (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) {
+ rig, err := newEnvoyAppsecRig(t, integration, false, bodyParsingSizeLimit)
+ require.NoError(t, err)
+
+ mt := mocktracer.Start()
+
+ return rig.client, mt, func() {
+ rig.Close()
+ mt.Stop()
+ }
+ }
+
+ // Body parsing disabled by default on GCP Service Extension
+ t.Run("default-gcp-se-no-monitoring-event-on-request-body-parsing", func(t *testing.T) {
+ client, mt, cleanup := setup(GCPServiceExtensionIntegration, nil)
+ defer cleanup()
+
+ ctx := context.Background()
+ stream, err := client.Process(ctx)
+ require.NoError(t, err)
+
+ end2EndStreamRequest(t, stream, "/", "PUT", map[string]string{"User-Agent": "Chromium", "Content-Type": "application/json"}, map[string]string{}, false, false, `{ "name": "" }`, "")
+
+ err = stream.CloseSend()
+ require.NoError(t, err)
+ _, _ = stream.Recv() // to flush the spans
+
+ finished := mt.FinishedSpans()
+ require.Len(t, finished, 1)
+
+ // Check that no appsec event was created
+ span := finished[0]
+ require.NotContains(t, span.Tags(), "appsec.event")
+ require.NotContains(t, span.Tags(), "_dd.appsec.json")
+ })
+
+ t.Run("value_set_zero-gcp-se-no-monitoring-event-on-request-body-parsing", func(t *testing.T) {
+ bodySize := 0
+ client, mt, cleanup := setup(GCPServiceExtensionIntegration, &bodySize)
+ defer cleanup()
+
+ ctx := context.Background()
+ stream, err := client.Process(ctx)
+ require.NoError(t, err)
+
+ end2EndStreamRequest(t, stream, "/", "PUT", map[string]string{"User-Agent": "Chromium", "Content-Type": "application/json"}, map[string]string{}, false, false, `{ "name": "" }`, "")
+
+ err = stream.CloseSend()
+ require.NoError(t, err)
+ _, _ = stream.Recv() // to flush the spans
+
+ finished := mt.FinishedSpans()
+ require.Len(t, finished, 1)
+
+ // Check that no appsec event was created
+ span := finished[0]
+ require.NotContains(t, span.Tags(), "appsec.event")
+ require.NotContains(t, span.Tags(), "_dd.appsec.json")
+ })
+
+ t.Run("value_set_256-gcp-se-monitoring-event-on-request-body-parsing", func(t *testing.T) {
+ bodySize := 256
+ client, mt, cleanup := setup(GCPServiceExtensionIntegration, &bodySize)
+ defer cleanup()
+
+ ctx := context.Background()
+ stream, err := client.Process(ctx)
+ require.NoError(t, err)
+
+ end2EndStreamRequest(t, stream, "/", "PUT", map[string]string{"User-Agent": "Chromium", "Content-Type": "application/json"}, map[string]string{}, false, false, `{ "name": "" }`, "")
+
+ err = stream.CloseSend()
+ require.NoError(t, err)
+ _, _ = stream.Recv() // to flush the spans
+
+ finished := mt.FinishedSpans()
+ require.Len(t, finished, 1)
+
+ // Check that no appsec event was created
+ span := finished[0]
+ require.Contains(t, span.Tags(), "appsec.event")
+ require.Contains(t, span.Tags(), "_dd.appsec.json")
+ })
+
+ t.Run("default-code-envoy-monitoring-event-on-request-body-parsing", func(t *testing.T) {
+ client, mt, cleanup := setup(EnvoyIntegration, nil)
+ defer cleanup()
+
+ ctx := context.Background()
+ stream, err := client.Process(ctx)
+ require.NoError(t, err)
+
+ end2EndStreamRequest(t, stream, "/", "PUT", map[string]string{"User-Agent": "Chromium", "Content-Type": "application/json"}, map[string]string{}, false, false, `{ "name": "" }`, "")
+
+ err = stream.CloseSend()
+ require.NoError(t, err)
+ _, _ = stream.Recv() // to flush the spans
+
+ finished := mt.FinishedSpans()
+ require.Len(t, finished, 1)
+
+ // Check that no appsec event was created
+ span := finished[0]
+ require.Contains(t, span.Tags(), "appsec.event")
+ require.Contains(t, span.Tags(), "_dd.appsec.json")
+ })
+
+ t.Run("default-metadata-envoy-monitoring-event-on-request-body-parsing", func(t *testing.T) {
+ client, mt, cleanup := setup(GCPServiceExtensionIntegration, nil)
+ defer cleanup()
+
+ // Set the metadata for an envoy request
+ md := metadata.New(map[string]string{
+ "x-datadog-envoy-integration": "1",
+ })
+ ctx := metadata.NewOutgoingContext(context.Background(), md)
+
+ stream, err := client.Process(ctx)
+ require.NoError(t, err)
+
+ end2EndStreamRequest(t, stream, "/", "PUT", map[string]string{"User-Agent": "Chromium", "Content-Type": "application/json"}, map[string]string{}, false, false, `{ "name": "" }`, "")
+
+ err = stream.CloseSend()
+ require.NoError(t, err)
+ _, _ = stream.Recv() // to flush the spans
+
+ finished := mt.FinishedSpans()
+ require.Len(t, finished, 1)
+
+ // Check that no appsec event was created
+ span := finished[0]
+ require.Contains(t, span.Tags(), "appsec.event")
+ require.Contains(t, span.Tags(), "_dd.appsec.json")
+ })
+
+ t.Run("value_set_zero-headers-envoy-no-monitoring-event-on-request-body-parsing", func(t *testing.T) {
+ bodySize := 0
+ client, mt, cleanup := setup(GCPServiceExtensionIntegration, &bodySize)
+ defer cleanup()
+
+ // Set the metadata for an envoy request
+ md := metadata.New(map[string]string{
+ "x-datadog-envoy-integration": "1",
+ })
+ ctx := metadata.NewOutgoingContext(context.Background(), md)
+
+ stream, err := client.Process(ctx)
+ require.NoError(t, err)
+
+ end2EndStreamRequest(t, stream, "/", "PUT", map[string]string{"User-Agent": "Chromium", "Content-Type": "application/json"}, map[string]string{}, false, false, `{ "name": "" }`, "")
+
+ err = stream.CloseSend()
+ require.NoError(t, err)
+ _, _ = stream.Recv() // to flush the spans
+
+ finished := mt.FinishedSpans()
+ require.Len(t, finished, 1)
+
+ // Check that no appsec event was created
+ span := finished[0]
+ require.NotContains(t, span.Tags(), "appsec.event")
+ require.NotContains(t, span.Tags(), "_dd.appsec.json")
+ })
+}
+
func TestGeneratedSpan(t *testing.T) {
setup := func() (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) {
- rig, err := newEnvoyAppsecRig(t, EnvoyIntegration, false, 0)
+ rig, err := newEnvoyAppsecRig(t, EnvoyIntegration, false, nil)
require.NoError(t, err)
mt := mocktracer.Start()
@@ -851,7 +1027,7 @@ func TestMalformedEnvoyProcessing(t *testing.T) {
}
setup := func() (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) {
- rig, err := newEnvoyAppsecRig(t, EnvoyIntegration, false, 0)
+ rig, err := newEnvoyAppsecRig(t, EnvoyIntegration, false, nil)
require.NoError(t, err)
mt := mocktracer.Start()
@@ -926,7 +1102,7 @@ func TestAppSecComponentName(t *testing.T) {
}
setup := func(integration Integration) (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) {
- rig, err := newEnvoyAppsecRig(t, integration, false, 0)
+ rig, err := newEnvoyAppsecRig(t, integration, false, nil)
require.NoError(t, err)
mt := mocktracer.Start()
@@ -1056,7 +1232,7 @@ func TestAppSecComponentName(t *testing.T) {
})
}
-func newEnvoyAppsecRig(t *testing.T, integration Integration, blockingUnavailable bool, bodyParsingSizeLimit int) (*envoyAppsecRig, error) {
+func newEnvoyAppsecRig(t *testing.T, integration Integration, blockingUnavailable bool, bodyParsingSizeLimit *int) (*envoyAppsecRig, error) {
t.Helper()
server := grpc.NewServer()
diff --git a/contrib/haproxy/stream-processing-offload/cmd/spoa/README.md b/contrib/haproxy/stream-processing-offload/cmd/spoa/README.md
index bad026f4ad..637bd508cd 100644
--- a/contrib/haproxy/stream-processing-offload/cmd/spoa/README.md
+++ b/contrib/haproxy/stream-processing-offload/cmd/spoa/README.md
@@ -19,13 +19,12 @@ docker build -f contrib/haproxy/stream-processing-offload/cmd/spoa/Dockerfile -t
The HAProxy SPOA agent expose some configuration:
-| Environment variable | Default value | Description |
-|-------------------------------------|---------------|---------------------------------------------------------------------------------------------------------------------------------------------|
-| `DD_HAPROXY_SPOA_HOST` | `0.0.0.0` | Host on where the SPOA and HTTP server should listen to. |
-| `DD_HAPROXY_SPOA_PORT` | `3000` | Port used by the SPOA that accept communication with HAProxy. |
-| `DD_HAPROXY_SPOA_HEALTHCHECK_PORT` | `3080` | Port used for the HTTP server for the health check. |
-| `DD_APPSEC_BODY_PARSING_SIZE_LIMIT` | `0` | Maximum size of the bodies to be processed in bytes. If set to 0, the bodies are not processed. The recommended value is `10000000` (10MB). |
-|
+| Environment variable | Default value | Description |
+|-------------------------------------|---------------|-------------------------------------------------------------------------------------------------|
+| `DD_HAPROXY_SPOA_HOST` | `0.0.0.0` | Host on where the SPOA and HTTP server should listen to. |
+| `DD_HAPROXY_SPOA_PORT` | `3000` | Port used by the SPOA that accept communication with HAProxy. |
+| `DD_HAPROXY_SPOA_HEALTHCHECK_PORT` | `3080` | Port used for the HTTP server for the health check. |
+| `DD_APPSEC_BODY_PARSING_SIZE_LIMIT` | `10485760` | Maximum size of the bodies to be processed in bytes. If set to 0, the bodies are not processed. |
> The HAProxy SPOA need to be connected to a deployed [Datadog agent](https://docs.datadoghq.com/agent).
@@ -33,4 +32,3 @@ The HAProxy SPOA agent expose some configuration:
|-----------------------|---------------|----------------------------------|
| `DD_AGENT_HOST` | `localhost` | Host of a running Datadog Agent. |
| `DD_TRACE_AGENT_PORT` | `8126` | Port of a running Datadog Agent. |
-
diff --git a/contrib/haproxy/stream-processing-offload/cmd/spoa/main.go b/contrib/haproxy/stream-processing-offload/cmd/spoa/main.go
index bb0c7c6342..4cd1c3c430 100644
--- a/contrib/haproxy/stream-processing-offload/cmd/spoa/main.go
+++ b/contrib/haproxy/stream-processing-offload/cmd/spoa/main.go
@@ -22,6 +22,7 @@ import (
"github.com/DataDog/dd-trace-go/contrib/haproxy/stream-processing-offload/v2"
"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
"github.com/DataDog/dd-trace-go/v2/instrumentation"
+ "github.com/DataDog/dd-trace-go/v2/instrumentation/appsec/proxy"
)
type haProxySpoaConfig struct {
@@ -59,7 +60,7 @@ func loadConfig() haProxySpoaConfig {
extensionHostStr := ipEnv("DD_HAPROXY_SPOA_HOST", net.IP{0, 0, 0, 0}).String()
extensionPortInt := intEnv("DD_HAPROXY_SPOA_PORT", 3000)
healthcheckPortInt := intEnv("DD_HAPROXY_SPOA_HEALTHCHECK_PORT", 3080)
- bodyParsingSizeLimit := intEnv("DD_APPSEC_BODY_PARSING_SIZE_LIMIT", 0)
+ bodyParsingSizeLimit := intEnv("DD_APPSEC_BODY_PARSING_SIZE_LIMIT", proxy.DefaultBodyParsingSizeLimit)
extensionPortStr := strconv.FormatInt(int64(extensionPortInt), 10)
healthcheckPortStr := strconv.FormatInt(int64(healthcheckPortInt), 10)
diff --git a/contrib/haproxy/stream-processing-offload/cmd/spoa/main_test.go b/contrib/haproxy/stream-processing-offload/cmd/spoa/main_test.go
index 20127563d8..22c460a62d 100644
--- a/contrib/haproxy/stream-processing-offload/cmd/spoa/main_test.go
+++ b/contrib/haproxy/stream-processing-offload/cmd/spoa/main_test.go
@@ -9,6 +9,7 @@ import (
"os"
"testing"
+ "github.com/DataDog/dd-trace-go/v2/instrumentation/appsec/proxy"
"github.com/stretchr/testify/assert"
)
@@ -80,7 +81,7 @@ func TestLoadConfig_VariousCases(t *testing.T) {
{
name: "defaults",
env: nil,
- want: want{"3000", "3080", "0.0.0.0", 0},
+ want: want{"3000", "3080", "0.0.0.0", proxy.DefaultBodyParsingSizeLimit},
},
{
name: "valid overrides",
@@ -102,7 +103,7 @@ func TestLoadConfig_VariousCases(t *testing.T) {
"DD_APPSEC_BODY_PARSING_SIZE_LIMIT": "notanint",
"DD_HAPROXY_SPOA_HOST": "notanip",
},
- want: want{"3000", "3080", "0.0.0.0", 0},
+ want: want{"3000", "3080", "0.0.0.0", proxy.DefaultBodyParsingSizeLimit},
},
}
diff --git a/contrib/haproxy/stream-processing-offload/haproxy.go b/contrib/haproxy/stream-processing-offload/haproxy.go
index 6c47b8b20d..4bc85c4d95 100644
--- a/contrib/haproxy/stream-processing-offload/haproxy.go
+++ b/contrib/haproxy/stream-processing-offload/haproxy.go
@@ -46,7 +46,7 @@ func NewHAProxySPOA(config AppsecHAProxyConfig) *HAProxySPOA {
return &HAProxySPOA{
messageProcessor: proxy.NewProcessor(proxy.ProcessorConfig{
BlockingUnavailable: config.BlockingUnavailable,
- BodyParsingSizeLimit: config.BodyParsingSizeLimit,
+ BodyParsingSizeLimit: &config.BodyParsingSizeLimit,
Framework: "haproxy/haproxy",
Context: config.Context,
ContinueMessageFunc: continueActionFunc,
diff --git a/contrib/haproxy/stream-processing-offload/haproxy_messages.go b/contrib/haproxy/stream-processing-offload/haproxy_messages.go
index 999e898a5c..a9d60f2f02 100644
--- a/contrib/haproxy/stream-processing-offload/haproxy_messages.go
+++ b/contrib/haproxy/stream-processing-offload/haproxy_messages.go
@@ -58,6 +58,10 @@ type messageRequestHeaders struct {
hasBody bool
}
+func (m *messageRequestHeaders) BodyParsingSizeLimit(_ context.Context) int {
+ return proxy.DefaultBodyParsingSizeLimit
+}
+
func (m *messageRequestHeaders) ExtractRequest(_ context.Context) (proxy.PseudoRequest, error) {
headers, err := parseHAProxyReqHdrsBin(m.msg.Bytes(VarHeaders))
if err != nil {
diff --git a/contrib/k8s.io/gateway-api/processor_types.go b/contrib/k8s.io/gateway-api/processor_types.go
index 63daa84ce2..f81cbbd5e0 100644
--- a/contrib/k8s.io/gateway-api/processor_types.go
+++ b/contrib/k8s.io/gateway-api/processor_types.go
@@ -43,6 +43,10 @@ func (r requestHeader) SpanOptions(_ context.Context) []tracer.StartSpanOption {
return r.spanOptions
}
+func (r requestHeader) BodyParsingSizeLimit(_ context.Context) int {
+ return proxy.DefaultBodyParsingSizeLimit
+}
+
var _ proxy.HTTPBody = (*requestBody)(nil)
type requestBody struct {
diff --git a/contrib/k8s.io/gateway-api/request-mirror.go b/contrib/k8s.io/gateway-api/request-mirror.go
index f65b1cd806..1bd71aa558 100644
--- a/contrib/k8s.io/gateway-api/request-mirror.go
+++ b/contrib/k8s.io/gateway-api/request-mirror.go
@@ -24,8 +24,7 @@ import (
)
const (
- maxBodyBytes = 5 * 1 << 20 // 5 MB
- framework = "k8s.io/gateway-api"
+ framework = "k8s.io/gateway-api"
)
var (
@@ -59,10 +58,11 @@ func HTTPRequestMirrorHandler(config Config) http.Handler {
config.Hijack = ptr.To[bool](true)
}
+ bodyProcessingMaxBytes := proxy.DefaultBodyParsingSizeLimit
processor := proxy.NewProcessor(proxy.ProcessorConfig{
Context: context.Background(),
BlockingUnavailable: true,
- BodyParsingSizeLimit: maxBodyBytes,
+ BodyParsingSizeLimit: &bodyProcessingMaxBytes,
Framework: framework,
ContinueMessageFunc: func(_ context.Context, _ proxy.ContinueActionOptions) error { return nil },
BlockMessageFunc: func(_ context.Context, _ proxy.BlockActionOptions) error { return nil },
@@ -86,7 +86,7 @@ func HTTPRequestMirrorHandler(config Config) http.Handler {
defer reqState.Close()
- body, err := io.ReadAll(io.LimitReader(r.Body, maxBodyBytes+1))
+ body, err := io.ReadAll(io.LimitReader(r.Body, int64(bodyProcessingMaxBytes+1)))
if err := processor.OnRequestBody(requestBody{body: body}, &reqState); err != nil {
logger.Error("Failed to process request body: %v", err)
return
diff --git a/instrumentation/appsec/proxy/config.go b/instrumentation/appsec/proxy/config.go
index dc5adeaa1f..0f2249a21e 100644
--- a/instrumentation/appsec/proxy/config.go
+++ b/instrumentation/appsec/proxy/config.go
@@ -9,6 +9,11 @@ import (
"context"
)
+const (
+ // DefaultBodyParsingSizeLimit is the default number of bytes parsed for body analysis.
+ DefaultBodyParsingSizeLimit = 10 * 1 << 20 // 10MB
+)
+
// ContinueActionOptions contains options for the continue action created through [ProcessorConfig.ContinueMessageFunc].
type ContinueActionOptions struct {
// HeaderMutations are the HTTP header mutations to be applied to the message (default is empty)
@@ -34,7 +39,7 @@ type BlockActionOptions struct {
type ProcessorConfig struct {
context.Context
BlockingUnavailable bool
- BodyParsingSizeLimit int
+ BodyParsingSizeLimit *int
Framework string
// ContinueMessageFunc is a function that generates a continue message of type O based on the provided ContinueActionOptions.
diff --git a/instrumentation/appsec/proxy/interfaces.go b/instrumentation/appsec/proxy/interfaces.go
index 1004db221e..eb2cb20079 100644
--- a/instrumentation/appsec/proxy/interfaces.go
+++ b/instrumentation/appsec/proxy/interfaces.go
@@ -25,6 +25,9 @@ type RequestHeaders interface {
// SpanOptions returns additional options to use when starting the span for this request.
SpanOptions(context.Context) []tracer.StartSpanOption
+
+ // BodyParsingSizeLimit returns the default value for body processing based on the request.
+ BodyParsingSizeLimit(ctx context.Context) int
}
// HTTPBody is an interface for accessing the body of an HTTP message used by the message processor.
diff --git a/instrumentation/appsec/proxy/message_processor.go b/instrumentation/appsec/proxy/message_processor.go
index 73480fde05..28819308be 100644
--- a/instrumentation/appsec/proxy/message_processor.go
+++ b/instrumentation/appsec/proxy/message_processor.go
@@ -11,6 +11,7 @@ import (
"fmt"
"io"
"sync"
+ "sync/atomic"
"github.com/DataDog/dd-trace-go/v2/appsec"
"github.com/DataDog/dd-trace-go/v2/instrumentation"
@@ -27,18 +28,15 @@ type Processor struct {
ProcessorConfig
instr *instrumentation.Instrumentation
- metrics *metrics
- done context.CancelFunc
- firstRequest sync.Once
+ metrics *metrics
+ done context.CancelFunc
+ firstRequest sync.Once
+ computedBodyParsingSizeLimit atomic.Int64
}
// NewProcessor creates a new [Processor] instance with the given configuration and instrumentation
// It also initializes the metrics reporter and a context cancellation function
func NewProcessor(config ProcessorConfig, instr *instrumentation.Instrumentation) Processor {
- if config.BodyParsingSizeLimit <= 0 {
- instr.Logger().Info("external_processing: body parsing size limit set to 0 or negative. The request and response bodies will NOT be analyzed.")
- }
-
if config.Context == nil {
config.Context = context.Background()
}
@@ -57,10 +55,6 @@ func NewProcessor(config ProcessorConfig, instr *instrumentation.Instrumentation
// along with an optional output message of type O created by either [ProcessorConfig.ContinueMessageFunc] or [ProcessorConfig.BlockMessageFunc]
// If the request is blocked or the message ends the stream, it returns io.EOF as error
func (mp *Processor) OnRequestHeaders(ctx context.Context, req RequestHeaders) (reqState RequestState, err error) {
- mp.firstRequest.Do(func() {
- mp.instr.Logger().Info("external_processing: first request received. Configuration: BlockingUnavailable=%v, BodyParsingSizeLimit=%dB, Framework=%s", mp.BlockingUnavailable, mp.BodyParsingSizeLimit, mp.Framework)
- })
-
mp.metrics.incrementRequestCount()
pseudoRequest, err := req.ExtractRequest(ctx)
if err != nil {
@@ -72,9 +66,24 @@ func (mp *Processor) OnRequestHeaders(ctx context.Context, req RequestHeaders) (
return reqState, fmt.Errorf("error converting to net/http request: %w", err)
}
+ mp.firstRequest.Do(func() {
+ var bodyLimit int64
+ if mp.BodyParsingSizeLimit != nil {
+ bodyLimit = int64(*mp.BodyParsingSizeLimit)
+ } else {
+ bodyLimit = int64(req.BodyParsingSizeLimit(ctx))
+ }
+ mp.computedBodyParsingSizeLimit.Store(bodyLimit)
+
+ if bodyLimit <= 0 {
+ mp.instr.Logger().Info("external_processing: body parsing size limit set to 0 or negative. The request and response bodies will NOT be analyzed.")
+ }
+ mp.instr.Logger().Info("external_processing: first request received. Configuration: BlockingUnavailable=%v, BodyParsingSizeLimit=%dB, Framework=%s", mp.BlockingUnavailable, mp.computedBodyParsingSizeLimit.Load(), mp.Framework)
+ })
+
reqState, blocked := newRequestState(
httpRequest,
- mp.BodyParsingSizeLimit,
+ int(mp.computedBodyParsingSizeLimit.Load()),
mp.Framework,
req.SpanOptions(ctx)...,
)
@@ -127,7 +136,7 @@ func (mp *Processor) OnRequestBody(req HTTPBody, reqState *RequestState) error {
mp.instr.Logger().Debug("message_processor: received request body: %v - EOS: %v\n", len(req.GetBody()), req.GetEndOfStream())
- if mp.BodyParsingSizeLimit <= 0 || reqState.State != MessageTypeRequestBody {
+ if mp.computedBodyParsingSizeLimit.Load() <= 0 || reqState.State != MessageTypeRequestBody {
mp.instr.Logger().Error("message_processor: the body parsing has been wrongly configured. " +
"Please refer to the official documentation for guidance on the proper settings or contact support.")
@@ -211,7 +220,7 @@ func (mp *Processor) OnResponseBody(resp HTTPBody, reqState *RequestState) error
mp.instr.Logger().Debug("message_processor: received response body: %v - EOS: %v\n", len(resp.GetBody()), resp.GetEndOfStream())
- if mp.BodyParsingSizeLimit <= 0 || reqState.State != MessageTypeResponseBody {
+ if mp.computedBodyParsingSizeLimit.Load() <= 0 || reqState.State != MessageTypeResponseBody {
mp.instr.Logger().Error("message_processor: the body parsing has been wrongly configured. " +
"Please refer to the official documentation for guidance on the proper settings or contact support.")
return io.EOF
@@ -268,7 +277,7 @@ func processBody(ctx context.Context, bodyBuffer *bodyBuffer, body []byte, eos b
// isBodySupported checks if the body should be analyzed based on content type
func (mp *Processor) isBodySupported(contentType string) bool {
- if mp.BodyParsingSizeLimit <= 0 {
+ if mp.computedBodyParsingSizeLimit.Load() <= 0 {
return false
}