From ddaf5a297a777bbf8ee4c68efa8dc38af8154d55 Mon Sep 17 00:00:00 2001 From: Flavien Darche Date: Thu, 23 Oct 2025 17:46:04 +0200 Subject: [PATCH 1/6] enable body processing by default on all proxies integration except for GCP Service Extension --- .../cmd/serviceextensions/env.go | 15 ++ .../cmd/serviceextensions/main.go | 4 +- .../cmd/serviceextensions/main_test.go | 21 +- contrib/envoyproxy/go-control-plane/envoy.go | 28 +-- .../go-control-plane/envoy_messages.go | 24 ++- .../envoyproxy/go-control-plane/envoy_test.go | 190 +++++++++++++++++- .../cmd/spoa/README.md | 3 +- .../cmd/spoa/main.go | 3 +- .../cmd/spoa/main_test.go | 5 +- .../stream-processing-offload/haproxy.go | 2 +- .../haproxy_messages.go | 4 + contrib/k8s.io/gateway-api/processor_types.go | 4 + contrib/k8s.io/gateway-api/request-mirror.go | 8 +- instrumentation/appsec/proxy/config.go | 7 +- instrumentation/appsec/proxy/interfaces.go | 3 + .../appsec/proxy/message_processor.go | 35 ++-- 16 files changed, 300 insertions(+), 56 deletions(-) diff --git a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/env.go b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/env.go index d6b6fa85f0..55ef90a1a8 100644 --- a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/env.go +++ b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/env.go @@ -27,6 +27,21 @@ func intEnv(key string, def int) int { return v } +// intEnvNil returns the parsed int value of an environment variable if it exists, or +// return nil if unset or failed to parse. +func intEnvNil(key string) *int { + vv, ok := env.Lookup(key) + if !ok { + return nil + } + v, err := strconv.Atoi(vv) + if err != nil { + log.Warn("Non-integer value for env var %s. Parse failed with error: %v", key, err) + return nil + } + return &v +} + // IpEnv returns the valid IP value of an environment variable, or def otherwise. func ipEnv(key string, def net.IP) net.IP { vv, ok := env.Lookup(key) diff --git a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main.go b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main.go index a3ec104580..eaca1f0146 100644 --- a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main.go +++ b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main.go @@ -45,7 +45,7 @@ type serviceExtensionConfig struct { extensionHost string healthcheckPort string observabilityMode bool - bodyParsingSizeLimit int + bodyParsingSizeLimit *int tls *tlsConfig } @@ -92,7 +92,7 @@ func loadConfig() serviceExtensionConfig { healthcheckPortInt := intEnv("DD_SERVICE_EXTENSION_HEALTHCHECK_PORT", 80) extensionHostStr := ipEnv("DD_SERVICE_EXTENSION_HOST", net.IP{0, 0, 0, 0}).String() observabilityMode := boolEnv("DD_SERVICE_EXTENSION_OBSERVABILITY_MODE", false) - bodyParsingSizeLimit := intEnv("DD_APPSEC_BODY_PARSING_SIZE_LIMIT", 0) + bodyParsingSizeLimit := intEnvNil("DD_APPSEC_BODY_PARSING_SIZE_LIMIT") enableTLS := boolEnv("DD_SERVICE_EXTENSION_TLS", true) keyFile := stringEnv("DD_SERVICE_EXTENSION_TLS_KEY_FILE", "localhost.key") certFile := stringEnv("DD_SERVICE_EXTENSION_TLS_CERT_FILE", "localhost.crt") diff --git a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main_test.go b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main_test.go index 2808169598..aea5390f74 100644 --- a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main_test.go +++ b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main_test.go @@ -78,7 +78,7 @@ func TestLoadConfig_VariousCases(t *testing.T) { healthcheckPort string extensionHost string observabilityMode bool - bodyParsingSizeLimit int + bodyParsingSizeLimit *int tlsEnabled bool tlsCertFile string tlsKeyFile string @@ -92,7 +92,7 @@ func TestLoadConfig_VariousCases(t *testing.T) { { name: "defaults", env: nil, - want: want{"443", "80", "0.0.0.0", false, 0, true, "localhost.key", "localhost.crt"}, + want: want{"443", "80", "0.0.0.0", false, nil, true, "localhost.crt", "localhost.key"}, }, { name: "valid overrides", @@ -103,7 +103,7 @@ func TestLoadConfig_VariousCases(t *testing.T) { "DD_SERVICE_EXTENSION_OBSERVABILITY_MODE": "true", "DD_APPSEC_BODY_PARSING_SIZE_LIMIT": "100000000", }, - want: want{"1234", "4321", "127.0.0.1", true, 100000000, true, "localhost.key", "localhost.crt"}, + want: want{"1234", "4321", "127.0.0.1", true, intPtr(100000000), true, "localhost.crt", "localhost.key"}, }, { name: "bad values fall back", @@ -114,14 +114,14 @@ func TestLoadConfig_VariousCases(t *testing.T) { "DD_APPSEC_BODY_PARSING_SIZE_LIMIT": "notanint", "DD_SERVICE_EXTENSION_HOST": "notanip", }, - want: want{"443", "80", "0.0.0.0", false, 0, true, "localhost.key", "localhost.crt"}, + want: want{"443", "80", "0.0.0.0", false, nil, true, "localhost.crt", "localhost.key"}, }, { name: "no-tls", env: map[string]string{ "DD_SERVICE_EXTENSION_TLS": "false", }, - want: want{"443", "80", "0.0.0.0", false, 0, false, "localhost.key", "localhost.crt"}, + want: want{"443", "80", "0.0.0.0", false, nil, false, "localhost.key", "localhost.crt"}, }, { name: "custom-tls", @@ -129,7 +129,7 @@ func TestLoadConfig_VariousCases(t *testing.T) { "DD_SERVICE_EXTENSION_TLS_KEY_FILE": "/tls/tls.key", "DD_SERVICE_EXTENSION_TLS_CERT_FILE": "/tls/tls.crt", }, - want: want{"443", "80", "0.0.0.0", false, 0, true, "/tls/tls.key", "/tls/tls.crt"}, + want: want{"443", "80", "0.0.0.0", false, nil, true, "/tls/tls.crt", "/tls/tls.key"}, }, } @@ -156,10 +156,19 @@ func TestLoadConfig_VariousCases(t *testing.T) { assert.Equal(t, tc.want.observabilityMode, cfg.observabilityMode, "observabilityMode") assert.Equal(t, tc.want.bodyParsingSizeLimit, cfg.bodyParsingSizeLimit, "bodyParsingSizeLimit") + assert.Equal(t, tc.want.tlsEnabled, cfg.tls != nil, "tlsEnabled") + if cfg.tls != nil { + assert.Equal(t, tc.want.tlsCertFile, cfg.tls.certFile, "tlsCertFile") + assert.Equal(t, tc.want.tlsKeyFile, cfg.tls.keyFile, "tlsKeyFile") + } }) } } +func intPtr(v int) *int { + return &v +} + // Helpers func unsetEnv(keys ...string) { for _, k := range keys { diff --git a/contrib/envoyproxy/go-control-plane/envoy.go b/contrib/envoyproxy/go-control-plane/envoy.go index affaf50297..0953929262 100644 --- a/contrib/envoyproxy/go-control-plane/envoy.go +++ b/contrib/envoyproxy/go-control-plane/envoy.go @@ -9,7 +9,6 @@ import ( "context" "errors" "io" - "sync/atomic" "github.com/DataDog/dd-trace-go/v2/instrumentation" "github.com/DataDog/dd-trace-go/v2/instrumentation/appsec/proxy" @@ -41,20 +40,32 @@ type AppsecEnvoyConfig struct { Integration Integration BlockingUnavailable bool Context context.Context - BodyParsingSizeLimit int + BodyParsingSizeLimit *int } // appsecEnvoyExternalProcessorServer is a server that implements the Envoy ExternalProcessorServer interface. type appsecEnvoyExternalProcessorServer struct { envoyextproc.ExternalProcessorServer config AppsecEnvoyConfig - requestCounter atomic.Uint32 messageProcessor proxy.Processor } // AppsecEnvoyExternalProcessorServer creates a new external processor server with AAP enabled func AppsecEnvoyExternalProcessorServer(userImplementation envoyextproc.ExternalProcessorServer, config AppsecEnvoyConfig) envoyextproc.ExternalProcessorServer { - processor := &appsecEnvoyExternalProcessorServer{ + switch config.Integration { + case GCPServiceExtensionIntegration: + case EnvoyIntegration, IstioIntegration, EnvoyGatewayIntegration: + // Set default body parsing size limit if not specified for non-default integrations + if config.BodyParsingSizeLimit == nil { + defaultBody := proxy.DefaultBodyParsingSizeLimit + config.BodyParsingSizeLimit = &defaultBody + } + default: + instr.Logger().Error("external_processing: invalid proxy integration type %d. Defaulting to GCPServiceExtensionIntegration", config.Integration) + config.Integration = GCPServiceExtensionIntegration + } + + return &appsecEnvoyExternalProcessorServer{ ExternalProcessorServer: userImplementation, config: config, messageProcessor: proxy.NewProcessor(proxy.ProcessorConfig{ @@ -66,15 +77,6 @@ func AppsecEnvoyExternalProcessorServer(userImplementation envoyextproc.External BlockMessageFunc: blockActionFunc, }, instr), } - - switch config.Integration { - case GCPServiceExtensionIntegration, EnvoyIntegration, IstioIntegration, EnvoyGatewayIntegration: - default: - instr.Logger().Error("external_processing: invalid proxy integration type %d. Defaulting to GCPServiceExtensionIntegration", config.Integration) - config.Integration = GCPServiceExtensionIntegration - } - - return processor } type processServerKeyType struct{} diff --git a/contrib/envoyproxy/go-control-plane/envoy_messages.go b/contrib/envoyproxy/go-control-plane/envoy_messages.go index 6ee2bfab04..4151af2e66 100644 --- a/contrib/envoyproxy/go-control-plane/envoy_messages.go +++ b/contrib/envoyproxy/go-control-plane/envoy_messages.go @@ -87,11 +87,24 @@ func (i Integration) String() string { } } +func (m messageRequestHeaders) BodyParsingSizeLimit(ctx context.Context) int { + switch m.component(ctx) { + case componentNameGCPServiceExtension: + return 0 + default: + return proxy.DefaultBodyParsingSizeLimit + } +} + func (m messageRequestHeaders) SpanOptions(ctx context.Context) []tracer.StartSpanOption { + return []tracer.StartSpanOption{tracer.Tag(ext.Component, m.component(ctx))} +} + +func (m messageRequestHeaders) component(ctx context.Context) string { // As the integration (callout container) is run by default with the GCP Service Extension value, // we can consider that if this flag is false, it means that it is running in a custom integration. if m.integration != GCPServiceExtensionIntegration { - return []tracer.StartSpanOption{tracer.Tag(ext.Component, m.integration.String())} + return m.integration.String() } // In newer version of the documentation, customers are instructed to inject the @@ -99,23 +112,24 @@ func (m messageRequestHeaders) SpanOptions(ctx context.Context) []tracer.StartSp if md, ok := metadata.FromIncomingContext(ctx); ok { valuesEnvoy := md.Get(datadogEnvoyIntegrationHeader) if len(valuesEnvoy) > 0 && valuesEnvoy[0] == "1" { - return []tracer.StartSpanOption{tracer.Tag(ext.Component, componentNameEnvoy)} + return componentNameEnvoy } valuesIstio := md.Get(datadogIntegrationHeader) if len(valuesIstio) > 0 && valuesIstio[0] == "1" { - return []tracer.StartSpanOption{tracer.Tag(ext.Component, componentNameIstio)} + return componentNameIstio } // We don't have the ability to add custom headers in envoy gateway EnvoyExtensionPolicy CRD. // So we fall back to detecting if we are running in k8s or not. // If we are running in k8s, we assume it is Envoy Gateway, otherwise GCP Service Extension. if isK8s() { - return []tracer.StartSpanOption{tracer.Tag(ext.Component, componentNameEnvoyGateway)} + return componentNameEnvoyGateway } } - return []tracer.StartSpanOption{tracer.Tag(ext.Component, componentNameGCPServiceExtension)} + return componentNameGCPServiceExtension + } type responseHeadersEnvoy struct { diff --git a/contrib/envoyproxy/go-control-plane/envoy_test.go b/contrib/envoyproxy/go-control-plane/envoy_test.go index 3a271129a7..fb83f3622f 100644 --- a/contrib/envoyproxy/go-control-plane/envoy_test.go +++ b/contrib/envoyproxy/go-control-plane/envoy_test.go @@ -38,7 +38,7 @@ func TestAppSec(t *testing.T) { } setup := func() (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) { - rig, err := newEnvoyAppsecRig(t, EnvoyIntegration, false, 0) + rig, err := newEnvoyAppsecRig(t, GCPServiceExtensionIntegration, false, nil) require.NoError(t, err) mt := mocktracer.Start() @@ -280,7 +280,8 @@ func TestAppSecBodyParsingEnabled(t *testing.T) { } setup := func() (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) { - rig, err := newEnvoyAppsecRig(t, EnvoyIntegration, false, 256) + bodyParsingSizeLimit := 256 + rig, err := newEnvoyAppsecRig(t, EnvoyIntegration, false, &bodyParsingSizeLimit) require.NoError(t, err) mt := mocktracer.Start() @@ -649,7 +650,8 @@ func TestAppSecAPISecurityBodyParsingEnabled(t *testing.T) { } setup := func() (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) { - rig, err := newEnvoyAppsecRig(t, EnvoyIntegration, false, 256) + bodyParsingSizeLimit := 256 + rig, err := newEnvoyAppsecRig(t, EnvoyIntegration, false, &bodyParsingSizeLimit) require.NoError(t, err) mt := mocktracer.Start() @@ -761,9 +763,183 @@ func TestAppSecAPISecurityBodyParsingEnabled(t *testing.T) { }) } +func TestAppSecBodyParsingActivation(t *testing.T) { + t.Setenv("DD_APPSEC_RULES", "../../../internal/appsec/testdata/user_rules.json") + t.Setenv("DD_APPSEC_WAF_TIMEOUT", "10ms") + + testutils.StartAppSec(t) + if !instr.AppSecEnabled() { + t.Skip("appsec disabled") + } + + setup := func(integration Integration, bodyParsingSizeLimit *int) (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) { + rig, err := newEnvoyAppsecRig(t, integration, false, bodyParsingSizeLimit) + require.NoError(t, err) + + mt := mocktracer.Start() + + return rig.client, mt, func() { + rig.Close() + mt.Stop() + } + } + + // Body parsing disabled by default on GCP Service Extension + t.Run("default-gcp-se-no-monitoring-event-on-request-body-parsing", func(t *testing.T) { + client, mt, cleanup := setup(GCPServiceExtensionIntegration, nil) + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + end2EndStreamRequest(t, stream, "/", "PUT", map[string]string{"User-Agent": "Chromium", "Content-Type": "application/json"}, map[string]string{}, false, false, `{ "name": "" }`, "") + + err = stream.CloseSend() + require.NoError(t, err) + _, _ = stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + + // Check that no appsec event was created + span := finished[0] + require.NotContains(t, span.Tags(), "appsec.event") + require.NotContains(t, span.Tags(), "_dd.appsec.json") + }) + + t.Run("value_set_zero-gcp-se-no-monitoring-event-on-request-body-parsing", func(t *testing.T) { + bodySize := 0 + client, mt, cleanup := setup(GCPServiceExtensionIntegration, &bodySize) + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + end2EndStreamRequest(t, stream, "/", "PUT", map[string]string{"User-Agent": "Chromium", "Content-Type": "application/json"}, map[string]string{}, false, false, `{ "name": "" }`, "") + + err = stream.CloseSend() + require.NoError(t, err) + _, _ = stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + + // Check that no appsec event was created + span := finished[0] + require.NotContains(t, span.Tags(), "appsec.event") + require.NotContains(t, span.Tags(), "_dd.appsec.json") + }) + + t.Run("value_set_256-gcp-se-monitoring-event-on-request-body-parsing", func(t *testing.T) { + bodySize := 256 + client, mt, cleanup := setup(GCPServiceExtensionIntegration, &bodySize) + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + end2EndStreamRequest(t, stream, "/", "PUT", map[string]string{"User-Agent": "Chromium", "Content-Type": "application/json"}, map[string]string{}, false, false, `{ "name": "" }`, "") + + err = stream.CloseSend() + require.NoError(t, err) + _, _ = stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + + // Check that no appsec event was created + span := finished[0] + require.Contains(t, span.Tags(), "appsec.event") + require.Contains(t, span.Tags(), "_dd.appsec.json") + }) + + t.Run("default-code-envoy-monitoring-event-on-request-body-parsing", func(t *testing.T) { + client, mt, cleanup := setup(EnvoyIntegration, nil) + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + end2EndStreamRequest(t, stream, "/", "PUT", map[string]string{"User-Agent": "Chromium", "Content-Type": "application/json"}, map[string]string{}, false, false, `{ "name": "" }`, "") + + err = stream.CloseSend() + require.NoError(t, err) + _, _ = stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + + // Check that no appsec event was created + span := finished[0] + require.Contains(t, span.Tags(), "appsec.event") + require.Contains(t, span.Tags(), "_dd.appsec.json") + }) + + t.Run("default-metadata-envoy-monitoring-event-on-request-body-parsing", func(t *testing.T) { + client, mt, cleanup := setup(GCPServiceExtensionIntegration, nil) + defer cleanup() + + // Set the metadata for an envoy request + md := metadata.New(map[string]string{ + "x-datadog-envoy-integration": "1", + }) + ctx := metadata.NewOutgoingContext(context.Background(), md) + + stream, err := client.Process(ctx) + require.NoError(t, err) + + end2EndStreamRequest(t, stream, "/", "PUT", map[string]string{"User-Agent": "Chromium", "Content-Type": "application/json"}, map[string]string{}, false, false, `{ "name": "" }`, "") + + err = stream.CloseSend() + require.NoError(t, err) + _, _ = stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + + // Check that no appsec event was created + span := finished[0] + require.Contains(t, span.Tags(), "appsec.event") + require.Contains(t, span.Tags(), "_dd.appsec.json") + }) + + t.Run("value_set_zero-headers-envoy-no-monitoring-event-on-request-body-parsing", func(t *testing.T) { + bodySize := 0 + client, mt, cleanup := setup(GCPServiceExtensionIntegration, &bodySize) + defer cleanup() + + // Set the metadata for an envoy request + md := metadata.New(map[string]string{ + "x-datadog-envoy-integration": "1", + }) + ctx := metadata.NewOutgoingContext(context.Background(), md) + + stream, err := client.Process(ctx) + require.NoError(t, err) + + end2EndStreamRequest(t, stream, "/", "PUT", map[string]string{"User-Agent": "Chromium", "Content-Type": "application/json"}, map[string]string{}, false, false, `{ "name": "" }`, "") + + err = stream.CloseSend() + require.NoError(t, err) + _, _ = stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + + // Check that no appsec event was created + span := finished[0] + require.NotContains(t, span.Tags(), "appsec.event") + require.NotContains(t, span.Tags(), "_dd.appsec.json") + }) +} + func TestGeneratedSpan(t *testing.T) { setup := func() (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) { - rig, err := newEnvoyAppsecRig(t, EnvoyIntegration, false, 0) + rig, err := newEnvoyAppsecRig(t, EnvoyIntegration, false, nil) require.NoError(t, err) mt := mocktracer.Start() @@ -851,7 +1027,7 @@ func TestMalformedEnvoyProcessing(t *testing.T) { } setup := func() (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) { - rig, err := newEnvoyAppsecRig(t, EnvoyIntegration, false, 0) + rig, err := newEnvoyAppsecRig(t, EnvoyIntegration, false, nil) require.NoError(t, err) mt := mocktracer.Start() @@ -926,7 +1102,7 @@ func TestAppSecComponentName(t *testing.T) { } setup := func(integration Integration) (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) { - rig, err := newEnvoyAppsecRig(t, integration, false, 0) + rig, err := newEnvoyAppsecRig(t, integration, false, nil) require.NoError(t, err) mt := mocktracer.Start() @@ -1056,7 +1232,7 @@ func TestAppSecComponentName(t *testing.T) { }) } -func newEnvoyAppsecRig(t *testing.T, integration Integration, blockingUnavailable bool, bodyParsingSizeLimit int) (*envoyAppsecRig, error) { +func newEnvoyAppsecRig(t *testing.T, integration Integration, blockingUnavailable bool, bodyParsingSizeLimit *int) (*envoyAppsecRig, error) { t.Helper() server := grpc.NewServer() diff --git a/contrib/haproxy/stream-processing-offload/cmd/spoa/README.md b/contrib/haproxy/stream-processing-offload/cmd/spoa/README.md index bad026f4ad..9c74b5aa7a 100644 --- a/contrib/haproxy/stream-processing-offload/cmd/spoa/README.md +++ b/contrib/haproxy/stream-processing-offload/cmd/spoa/README.md @@ -24,7 +24,7 @@ The HAProxy SPOA agent expose some configuration: | `DD_HAPROXY_SPOA_HOST` | `0.0.0.0` | Host on where the SPOA and HTTP server should listen to. | | `DD_HAPROXY_SPOA_PORT` | `3000` | Port used by the SPOA that accept communication with HAProxy. | | `DD_HAPROXY_SPOA_HEALTHCHECK_PORT` | `3080` | Port used for the HTTP server for the health check. | -| `DD_APPSEC_BODY_PARSING_SIZE_LIMIT` | `0` | Maximum size of the bodies to be processed in bytes. If set to 0, the bodies are not processed. The recommended value is `10000000` (10MB). | +| `DD_APPSEC_BODY_PARSING_SIZE_LIMIT` | `10000000` | Maximum size of the bodies to be processed in bytes. If set to 0, the bodies are not processed. | | > The HAProxy SPOA need to be connected to a deployed [Datadog agent](https://docs.datadoghq.com/agent). @@ -33,4 +33,3 @@ The HAProxy SPOA agent expose some configuration: |-----------------------|---------------|----------------------------------| | `DD_AGENT_HOST` | `localhost` | Host of a running Datadog Agent. | | `DD_TRACE_AGENT_PORT` | `8126` | Port of a running Datadog Agent. | - diff --git a/contrib/haproxy/stream-processing-offload/cmd/spoa/main.go b/contrib/haproxy/stream-processing-offload/cmd/spoa/main.go index bb0c7c6342..4cd1c3c430 100644 --- a/contrib/haproxy/stream-processing-offload/cmd/spoa/main.go +++ b/contrib/haproxy/stream-processing-offload/cmd/spoa/main.go @@ -22,6 +22,7 @@ import ( "github.com/DataDog/dd-trace-go/contrib/haproxy/stream-processing-offload/v2" "github.com/DataDog/dd-trace-go/v2/ddtrace/tracer" "github.com/DataDog/dd-trace-go/v2/instrumentation" + "github.com/DataDog/dd-trace-go/v2/instrumentation/appsec/proxy" ) type haProxySpoaConfig struct { @@ -59,7 +60,7 @@ func loadConfig() haProxySpoaConfig { extensionHostStr := ipEnv("DD_HAPROXY_SPOA_HOST", net.IP{0, 0, 0, 0}).String() extensionPortInt := intEnv("DD_HAPROXY_SPOA_PORT", 3000) healthcheckPortInt := intEnv("DD_HAPROXY_SPOA_HEALTHCHECK_PORT", 3080) - bodyParsingSizeLimit := intEnv("DD_APPSEC_BODY_PARSING_SIZE_LIMIT", 0) + bodyParsingSizeLimit := intEnv("DD_APPSEC_BODY_PARSING_SIZE_LIMIT", proxy.DefaultBodyParsingSizeLimit) extensionPortStr := strconv.FormatInt(int64(extensionPortInt), 10) healthcheckPortStr := strconv.FormatInt(int64(healthcheckPortInt), 10) diff --git a/contrib/haproxy/stream-processing-offload/cmd/spoa/main_test.go b/contrib/haproxy/stream-processing-offload/cmd/spoa/main_test.go index 20127563d8..22c460a62d 100644 --- a/contrib/haproxy/stream-processing-offload/cmd/spoa/main_test.go +++ b/contrib/haproxy/stream-processing-offload/cmd/spoa/main_test.go @@ -9,6 +9,7 @@ import ( "os" "testing" + "github.com/DataDog/dd-trace-go/v2/instrumentation/appsec/proxy" "github.com/stretchr/testify/assert" ) @@ -80,7 +81,7 @@ func TestLoadConfig_VariousCases(t *testing.T) { { name: "defaults", env: nil, - want: want{"3000", "3080", "0.0.0.0", 0}, + want: want{"3000", "3080", "0.0.0.0", proxy.DefaultBodyParsingSizeLimit}, }, { name: "valid overrides", @@ -102,7 +103,7 @@ func TestLoadConfig_VariousCases(t *testing.T) { "DD_APPSEC_BODY_PARSING_SIZE_LIMIT": "notanint", "DD_HAPROXY_SPOA_HOST": "notanip", }, - want: want{"3000", "3080", "0.0.0.0", 0}, + want: want{"3000", "3080", "0.0.0.0", proxy.DefaultBodyParsingSizeLimit}, }, } diff --git a/contrib/haproxy/stream-processing-offload/haproxy.go b/contrib/haproxy/stream-processing-offload/haproxy.go index 6c47b8b20d..4bc85c4d95 100644 --- a/contrib/haproxy/stream-processing-offload/haproxy.go +++ b/contrib/haproxy/stream-processing-offload/haproxy.go @@ -46,7 +46,7 @@ func NewHAProxySPOA(config AppsecHAProxyConfig) *HAProxySPOA { return &HAProxySPOA{ messageProcessor: proxy.NewProcessor(proxy.ProcessorConfig{ BlockingUnavailable: config.BlockingUnavailable, - BodyParsingSizeLimit: config.BodyParsingSizeLimit, + BodyParsingSizeLimit: &config.BodyParsingSizeLimit, Framework: "haproxy/haproxy", Context: config.Context, ContinueMessageFunc: continueActionFunc, diff --git a/contrib/haproxy/stream-processing-offload/haproxy_messages.go b/contrib/haproxy/stream-processing-offload/haproxy_messages.go index 999e898a5c..a9d60f2f02 100644 --- a/contrib/haproxy/stream-processing-offload/haproxy_messages.go +++ b/contrib/haproxy/stream-processing-offload/haproxy_messages.go @@ -58,6 +58,10 @@ type messageRequestHeaders struct { hasBody bool } +func (m *messageRequestHeaders) BodyParsingSizeLimit(_ context.Context) int { + return proxy.DefaultBodyParsingSizeLimit +} + func (m *messageRequestHeaders) ExtractRequest(_ context.Context) (proxy.PseudoRequest, error) { headers, err := parseHAProxyReqHdrsBin(m.msg.Bytes(VarHeaders)) if err != nil { diff --git a/contrib/k8s.io/gateway-api/processor_types.go b/contrib/k8s.io/gateway-api/processor_types.go index 63daa84ce2..f81cbbd5e0 100644 --- a/contrib/k8s.io/gateway-api/processor_types.go +++ b/contrib/k8s.io/gateway-api/processor_types.go @@ -43,6 +43,10 @@ func (r requestHeader) SpanOptions(_ context.Context) []tracer.StartSpanOption { return r.spanOptions } +func (r requestHeader) BodyParsingSizeLimit(_ context.Context) int { + return proxy.DefaultBodyParsingSizeLimit +} + var _ proxy.HTTPBody = (*requestBody)(nil) type requestBody struct { diff --git a/contrib/k8s.io/gateway-api/request-mirror.go b/contrib/k8s.io/gateway-api/request-mirror.go index f65b1cd806..1bd71aa558 100644 --- a/contrib/k8s.io/gateway-api/request-mirror.go +++ b/contrib/k8s.io/gateway-api/request-mirror.go @@ -24,8 +24,7 @@ import ( ) const ( - maxBodyBytes = 5 * 1 << 20 // 5 MB - framework = "k8s.io/gateway-api" + framework = "k8s.io/gateway-api" ) var ( @@ -59,10 +58,11 @@ func HTTPRequestMirrorHandler(config Config) http.Handler { config.Hijack = ptr.To[bool](true) } + bodyProcessingMaxBytes := proxy.DefaultBodyParsingSizeLimit processor := proxy.NewProcessor(proxy.ProcessorConfig{ Context: context.Background(), BlockingUnavailable: true, - BodyParsingSizeLimit: maxBodyBytes, + BodyParsingSizeLimit: &bodyProcessingMaxBytes, Framework: framework, ContinueMessageFunc: func(_ context.Context, _ proxy.ContinueActionOptions) error { return nil }, BlockMessageFunc: func(_ context.Context, _ proxy.BlockActionOptions) error { return nil }, @@ -86,7 +86,7 @@ func HTTPRequestMirrorHandler(config Config) http.Handler { defer reqState.Close() - body, err := io.ReadAll(io.LimitReader(r.Body, maxBodyBytes+1)) + body, err := io.ReadAll(io.LimitReader(r.Body, int64(bodyProcessingMaxBytes+1))) if err := processor.OnRequestBody(requestBody{body: body}, &reqState); err != nil { logger.Error("Failed to process request body: %v", err) return diff --git a/instrumentation/appsec/proxy/config.go b/instrumentation/appsec/proxy/config.go index dc5adeaa1f..9d94813b91 100644 --- a/instrumentation/appsec/proxy/config.go +++ b/instrumentation/appsec/proxy/config.go @@ -9,6 +9,11 @@ import ( "context" ) +const ( + // DefaultBodyParsingSizeLimit is the default number of bytes parsed for body analysis. + DefaultBodyParsingSizeLimit = 10_000_000 // 10MB +) + // ContinueActionOptions contains options for the continue action created through [ProcessorConfig.ContinueMessageFunc]. type ContinueActionOptions struct { // HeaderMutations are the HTTP header mutations to be applied to the message (default is empty) @@ -34,7 +39,7 @@ type BlockActionOptions struct { type ProcessorConfig struct { context.Context BlockingUnavailable bool - BodyParsingSizeLimit int + BodyParsingSizeLimit *int Framework string // ContinueMessageFunc is a function that generates a continue message of type O based on the provided ContinueActionOptions. diff --git a/instrumentation/appsec/proxy/interfaces.go b/instrumentation/appsec/proxy/interfaces.go index 1004db221e..eb2cb20079 100644 --- a/instrumentation/appsec/proxy/interfaces.go +++ b/instrumentation/appsec/proxy/interfaces.go @@ -25,6 +25,9 @@ type RequestHeaders interface { // SpanOptions returns additional options to use when starting the span for this request. SpanOptions(context.Context) []tracer.StartSpanOption + + // BodyParsingSizeLimit returns the default value for body processing based on the request. + BodyParsingSizeLimit(ctx context.Context) int } // HTTPBody is an interface for accessing the body of an HTTP message used by the message processor. diff --git a/instrumentation/appsec/proxy/message_processor.go b/instrumentation/appsec/proxy/message_processor.go index 73480fde05..b5c8861f8e 100644 --- a/instrumentation/appsec/proxy/message_processor.go +++ b/instrumentation/appsec/proxy/message_processor.go @@ -11,6 +11,7 @@ import ( "fmt" "io" "sync" + "sync/atomic" "github.com/DataDog/dd-trace-go/v2/appsec" "github.com/DataDog/dd-trace-go/v2/instrumentation" @@ -27,15 +28,16 @@ type Processor struct { ProcessorConfig instr *instrumentation.Instrumentation - metrics *metrics - done context.CancelFunc - firstRequest sync.Once + metrics *metrics + done context.CancelFunc + firstRequest sync.Once + bodyParsingConfigured atomic.Bool } // NewProcessor creates a new [Processor] instance with the given configuration and instrumentation // It also initializes the metrics reporter and a context cancellation function func NewProcessor(config ProcessorConfig, instr *instrumentation.Instrumentation) Processor { - if config.BodyParsingSizeLimit <= 0 { + if config.BodyParsingSizeLimit != nil && *config.BodyParsingSizeLimit <= 0 { instr.Logger().Info("external_processing: body parsing size limit set to 0 or negative. The request and response bodies will NOT be analyzed.") } @@ -57,10 +59,6 @@ func NewProcessor(config ProcessorConfig, instr *instrumentation.Instrumentation // along with an optional output message of type O created by either [ProcessorConfig.ContinueMessageFunc] or [ProcessorConfig.BlockMessageFunc] // If the request is blocked or the message ends the stream, it returns io.EOF as error func (mp *Processor) OnRequestHeaders(ctx context.Context, req RequestHeaders) (reqState RequestState, err error) { - mp.firstRequest.Do(func() { - mp.instr.Logger().Info("external_processing: first request received. Configuration: BlockingUnavailable=%v, BodyParsingSizeLimit=%dB, Framework=%s", mp.BlockingUnavailable, mp.BodyParsingSizeLimit, mp.Framework) - }) - mp.metrics.incrementRequestCount() pseudoRequest, err := req.ExtractRequest(ctx) if err != nil { @@ -72,9 +70,22 @@ func (mp *Processor) OnRequestHeaders(ctx context.Context, req RequestHeaders) ( return reqState, fmt.Errorf("error converting to net/http request: %w", err) } + mp.firstRequest.Do(func() { + if mp.BodyParsingSizeLimit == nil { + bodySizeLimit := req.BodyParsingSizeLimit(ctx) + mp.BodyParsingSizeLimit = &bodySizeLimit + } + mp.instr.Logger().Info("external_processing: first request received. Configuration: BlockingUnavailable=%v, BodyParsingSizeLimit=%dB, Framework=%s", mp.BlockingUnavailable, *mp.BodyParsingSizeLimit, mp.Framework) + }) + + var bodyParsingSizeLimit int + if mp.BodyParsingSizeLimit != nil { + bodyParsingSizeLimit = *mp.BodyParsingSizeLimit + } + reqState, blocked := newRequestState( httpRequest, - mp.BodyParsingSizeLimit, + bodyParsingSizeLimit, mp.Framework, req.SpanOptions(ctx)..., ) @@ -127,7 +138,7 @@ func (mp *Processor) OnRequestBody(req HTTPBody, reqState *RequestState) error { mp.instr.Logger().Debug("message_processor: received request body: %v - EOS: %v\n", len(req.GetBody()), req.GetEndOfStream()) - if mp.BodyParsingSizeLimit <= 0 || reqState.State != MessageTypeRequestBody { + if mp.BodyParsingSizeLimit == nil || *mp.BodyParsingSizeLimit <= 0 || reqState.State != MessageTypeRequestBody { mp.instr.Logger().Error("message_processor: the body parsing has been wrongly configured. " + "Please refer to the official documentation for guidance on the proper settings or contact support.") @@ -211,7 +222,7 @@ func (mp *Processor) OnResponseBody(resp HTTPBody, reqState *RequestState) error mp.instr.Logger().Debug("message_processor: received response body: %v - EOS: %v\n", len(resp.GetBody()), resp.GetEndOfStream()) - if mp.BodyParsingSizeLimit <= 0 || reqState.State != MessageTypeResponseBody { + if mp.BodyParsingSizeLimit == nil || *mp.BodyParsingSizeLimit <= 0 || reqState.State != MessageTypeResponseBody { mp.instr.Logger().Error("message_processor: the body parsing has been wrongly configured. " + "Please refer to the official documentation for guidance on the proper settings or contact support.") return io.EOF @@ -268,7 +279,7 @@ func processBody(ctx context.Context, bodyBuffer *bodyBuffer, body []byte, eos b // isBodySupported checks if the body should be analyzed based on content type func (mp *Processor) isBodySupported(contentType string) bool { - if mp.BodyParsingSizeLimit <= 0 { + if mp.BodyParsingSizeLimit == nil || *mp.BodyParsingSizeLimit <= 0 { return false } From 90cada9c8fe008d50f9e601a5282cb34a0a494cb Mon Sep 17 00:00:00 2001 From: Flavien Darche Date: Fri, 24 Oct 2025 15:00:13 +0200 Subject: [PATCH 2/6] use the computed value instead of doing nil check everywhere --- .../appsec/proxy/message_processor.go | 47 ++++++++++--------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/instrumentation/appsec/proxy/message_processor.go b/instrumentation/appsec/proxy/message_processor.go index b5c8861f8e..337fe88973 100644 --- a/instrumentation/appsec/proxy/message_processor.go +++ b/instrumentation/appsec/proxy/message_processor.go @@ -11,7 +11,6 @@ import ( "fmt" "io" "sync" - "sync/atomic" "github.com/DataDog/dd-trace-go/v2/appsec" "github.com/DataDog/dd-trace-go/v2/instrumentation" @@ -28,17 +27,21 @@ type Processor struct { ProcessorConfig instr *instrumentation.Instrumentation - metrics *metrics - done context.CancelFunc - firstRequest sync.Once - bodyParsingConfigured atomic.Bool + metrics *metrics + done context.CancelFunc + firstRequest sync.Once + computedBodyParsingSizeLimit int } // NewProcessor creates a new [Processor] instance with the given configuration and instrumentation // It also initializes the metrics reporter and a context cancellation function func NewProcessor(config ProcessorConfig, instr *instrumentation.Instrumentation) Processor { - if config.BodyParsingSizeLimit != nil && *config.BodyParsingSizeLimit <= 0 { - instr.Logger().Info("external_processing: body parsing size limit set to 0 or negative. The request and response bodies will NOT be analyzed.") + computedBodyParsingSizeLimit := 0 + if config.BodyParsingSizeLimit != nil { + computedBodyParsingSizeLimit = *config.BodyParsingSizeLimit + if computedBodyParsingSizeLimit <= 0 { + instr.Logger().Info("external_processing: body parsing size limit set to 0 or negative. The request and response bodies will NOT be analyzed.") + } } if config.Context == nil { @@ -47,10 +50,11 @@ func NewProcessor(config ProcessorConfig, instr *instrumentation.Instrumentation var done context.CancelFunc config.Context, done = context.WithCancel(config.Context) return Processor{ - ProcessorConfig: config, - instr: instr, - metrics: newMetricsReporter(config.Context, instr.Logger()), - done: done, + ProcessorConfig: config, + instr: instr, + metrics: newMetricsReporter(config.Context, instr.Logger()), + done: done, + computedBodyParsingSizeLimit: computedBodyParsingSizeLimit, } } @@ -72,20 +76,17 @@ func (mp *Processor) OnRequestHeaders(ctx context.Context, req RequestHeaders) ( mp.firstRequest.Do(func() { if mp.BodyParsingSizeLimit == nil { - bodySizeLimit := req.BodyParsingSizeLimit(ctx) - mp.BodyParsingSizeLimit = &bodySizeLimit + mp.computedBodyParsingSizeLimit = req.BodyParsingSizeLimit(ctx) + if mp.computedBodyParsingSizeLimit <= 0 { + mp.instr.Logger().Info("external_processing: body parsing size limit set to 0 or negative. The request and response bodies will NOT be analyzed.") + } } - mp.instr.Logger().Info("external_processing: first request received. Configuration: BlockingUnavailable=%v, BodyParsingSizeLimit=%dB, Framework=%s", mp.BlockingUnavailable, *mp.BodyParsingSizeLimit, mp.Framework) + mp.instr.Logger().Info("external_processing: first request received. Configuration: BlockingUnavailable=%v, BodyParsingSizeLimit=%dB, Framework=%s", mp.BlockingUnavailable, mp.computedBodyParsingSizeLimit, mp.Framework) }) - var bodyParsingSizeLimit int - if mp.BodyParsingSizeLimit != nil { - bodyParsingSizeLimit = *mp.BodyParsingSizeLimit - } - reqState, blocked := newRequestState( httpRequest, - bodyParsingSizeLimit, + mp.computedBodyParsingSizeLimit, mp.Framework, req.SpanOptions(ctx)..., ) @@ -138,7 +139,7 @@ func (mp *Processor) OnRequestBody(req HTTPBody, reqState *RequestState) error { mp.instr.Logger().Debug("message_processor: received request body: %v - EOS: %v\n", len(req.GetBody()), req.GetEndOfStream()) - if mp.BodyParsingSizeLimit == nil || *mp.BodyParsingSizeLimit <= 0 || reqState.State != MessageTypeRequestBody { + if mp.computedBodyParsingSizeLimit <= 0 || reqState.State != MessageTypeRequestBody { mp.instr.Logger().Error("message_processor: the body parsing has been wrongly configured. " + "Please refer to the official documentation for guidance on the proper settings or contact support.") @@ -222,7 +223,7 @@ func (mp *Processor) OnResponseBody(resp HTTPBody, reqState *RequestState) error mp.instr.Logger().Debug("message_processor: received response body: %v - EOS: %v\n", len(resp.GetBody()), resp.GetEndOfStream()) - if mp.BodyParsingSizeLimit == nil || *mp.BodyParsingSizeLimit <= 0 || reqState.State != MessageTypeResponseBody { + if mp.computedBodyParsingSizeLimit <= 0 || reqState.State != MessageTypeResponseBody { mp.instr.Logger().Error("message_processor: the body parsing has been wrongly configured. " + "Please refer to the official documentation for guidance on the proper settings or contact support.") return io.EOF @@ -279,7 +280,7 @@ func processBody(ctx context.Context, bodyBuffer *bodyBuffer, body []byte, eos b // isBodySupported checks if the body should be analyzed based on content type func (mp *Processor) isBodySupported(contentType string) bool { - if mp.BodyParsingSizeLimit == nil || *mp.BodyParsingSizeLimit <= 0 { + if mp.computedBodyParsingSizeLimit <= 0 { return false } From e873ce21a651f5cd3d5ae5736490333c63bfa902 Mon Sep 17 00:00:00 2001 From: Flavien Darche Date: Fri, 24 Oct 2025 16:50:37 +0200 Subject: [PATCH 3/6] update readmes --- .../cmd/serviceextensions/README.md | 25 ++++++++++--------- .../cmd/spoa/README.md | 13 +++++----- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/README.md b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/README.md index 3d98aed333..6d88a906da 100644 --- a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/README.md +++ b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/README.md @@ -22,22 +22,23 @@ The ASM Service Extension expose some configuration. The configuration can be tw >**GCP requires that the default configuration for the Service Extension should not change.** -| Environment variable | Default value | Description | -|---|-----------------|---------------------------------------------------------------------------------------------------------------| -| `DD_SERVICE_EXTENSION_HOST` | `0.0.0.0` | Host on where the gRPC and HTTP server should listen to. | -| `DD_SERVICE_EXTENSION_PORT` | `443` | Port used by the gRPC Server.
Envoy Google backend’s is only using secure connection to Service Extension. | -| `DD_SERVICE_EXTENSION_HEALTHCHECK_PORT` | `80` | Port used for the HTTP server for the health check. | +| Environment variable | Default value | Description | +|-------------------------------------------|-----------------|---------------------------------------------------------------------------------------------------------------| +| `DD_SERVICE_EXTENSION_HOST` | `0.0.0.0` | Host on where the gRPC and HTTP server should listen to. | +| `DD_SERVICE_EXTENSION_PORT` | `443` | Port used by the gRPC Server.
Envoy Google backend’s is only using secure connection to Service Extension. | +| `DD_SERVICE_EXTENSION_HEALTHCHECK_PORT` | `80` | Port used for the HTTP server for the health check. | | `DD_SERVICE_EXTENSION_OBSERVABILITY_MODE` | `false` | Enable observability mode. This will process a request asynchronously (blocking would be disabled). | -| `DD_SERVICE_EXTENSION_TLS` | `true` | Enable the gRPC TLS layer. Do not modify if you are using GCP. | -| `DD_SERVICE_EXTENSION_TLS_KEY_FILE` | `localhost.key` | Change the default gRPC TLS layer key. Do not modify if you are using GCP. | -| `DD_SERVICE_EXTENSION_TLS_CERT_FILE` | `localhost.crt` | Change the default gRPC TLS layer cert. Do not modify if you are using GCP. | +| `DD_APPSEC_BODY_PARSING_SIZE_LIMIT` | `10000000` | Maximum size of the bodies to be processed in bytes. If set to 0, the bodies are not processed. | +| `DD_SERVICE_EXTENSION_TLS` | `true` | Enable the gRPC TLS layer. Do not modify if you are using GCP. | +| `DD_SERVICE_EXTENSION_TLS_KEY_FILE` | `localhost.key` | Change the default gRPC TLS layer key. Do not modify if you are using GCP. | +| `DD_SERVICE_EXTENSION_TLS_CERT_FILE` | `localhost.crt` | Change the default gRPC TLS layer cert. Do not modify if you are using GCP. | > The Service Extension need to be connected to a deployed [Datadog agent](https://docs.datadoghq.com/agent). -| Environment variable | Default value | Description | -|---|---|---| -| `DD_AGENT_HOST` | `N/A` | Host of a running Datadog Agent. | -| `DD_TRACE_AGENT_PORT` | `8126` | Port of a running Datadog Agent. | +| Environment variable | Default value | Description | +|-----------------------|---------------|----------------------------------| +| `DD_AGENT_HOST` | `N/A` | Host of a running Datadog Agent. | +| `DD_TRACE_AGENT_PORT` | `8126` | Port of a running Datadog Agent. | ### SSL Configuration diff --git a/contrib/haproxy/stream-processing-offload/cmd/spoa/README.md b/contrib/haproxy/stream-processing-offload/cmd/spoa/README.md index 9c74b5aa7a..62b2326043 100644 --- a/contrib/haproxy/stream-processing-offload/cmd/spoa/README.md +++ b/contrib/haproxy/stream-processing-offload/cmd/spoa/README.md @@ -19,13 +19,12 @@ docker build -f contrib/haproxy/stream-processing-offload/cmd/spoa/Dockerfile -t The HAProxy SPOA agent expose some configuration: -| Environment variable | Default value | Description | -|-------------------------------------|---------------|---------------------------------------------------------------------------------------------------------------------------------------------| -| `DD_HAPROXY_SPOA_HOST` | `0.0.0.0` | Host on where the SPOA and HTTP server should listen to. | -| `DD_HAPROXY_SPOA_PORT` | `3000` | Port used by the SPOA that accept communication with HAProxy. | -| `DD_HAPROXY_SPOA_HEALTHCHECK_PORT` | `3080` | Port used for the HTTP server for the health check. | -| `DD_APPSEC_BODY_PARSING_SIZE_LIMIT` | `10000000` | Maximum size of the bodies to be processed in bytes. If set to 0, the bodies are not processed. | -| +| Environment variable | Default value | Description | +|-------------------------------------|---------------|-------------------------------------------------------------------------------------------------| +| `DD_HAPROXY_SPOA_HOST` | `0.0.0.0` | Host on where the SPOA and HTTP server should listen to. | +| `DD_HAPROXY_SPOA_PORT` | `3000` | Port used by the SPOA that accept communication with HAProxy. | +| `DD_HAPROXY_SPOA_HEALTHCHECK_PORT` | `3080` | Port used for the HTTP server for the health check. | +| `DD_APPSEC_BODY_PARSING_SIZE_LIMIT` | `10000000` | Maximum size of the bodies to be processed in bytes. If set to 0, the bodies are not processed. | > The HAProxy SPOA need to be connected to a deployed [Datadog agent](https://docs.datadoghq.com/agent). From fb33a1eaf7c598d62652fc544c57e2edb564337d Mon Sep 17 00:00:00 2001 From: Flavien Darche Date: Fri, 24 Oct 2025 17:54:08 +0200 Subject: [PATCH 4/6] change computedBodyParsingSizeLimit to atomic value --- .../appsec/proxy/message_processor.go | 45 +++++++++---------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/instrumentation/appsec/proxy/message_processor.go b/instrumentation/appsec/proxy/message_processor.go index 337fe88973..28819308be 100644 --- a/instrumentation/appsec/proxy/message_processor.go +++ b/instrumentation/appsec/proxy/message_processor.go @@ -11,6 +11,7 @@ import ( "fmt" "io" "sync" + "sync/atomic" "github.com/DataDog/dd-trace-go/v2/appsec" "github.com/DataDog/dd-trace-go/v2/instrumentation" @@ -30,31 +31,22 @@ type Processor struct { metrics *metrics done context.CancelFunc firstRequest sync.Once - computedBodyParsingSizeLimit int + computedBodyParsingSizeLimit atomic.Int64 } // NewProcessor creates a new [Processor] instance with the given configuration and instrumentation // It also initializes the metrics reporter and a context cancellation function func NewProcessor(config ProcessorConfig, instr *instrumentation.Instrumentation) Processor { - computedBodyParsingSizeLimit := 0 - if config.BodyParsingSizeLimit != nil { - computedBodyParsingSizeLimit = *config.BodyParsingSizeLimit - if computedBodyParsingSizeLimit <= 0 { - instr.Logger().Info("external_processing: body parsing size limit set to 0 or negative. The request and response bodies will NOT be analyzed.") - } - } - if config.Context == nil { config.Context = context.Background() } var done context.CancelFunc config.Context, done = context.WithCancel(config.Context) return Processor{ - ProcessorConfig: config, - instr: instr, - metrics: newMetricsReporter(config.Context, instr.Logger()), - done: done, - computedBodyParsingSizeLimit: computedBodyParsingSizeLimit, + ProcessorConfig: config, + instr: instr, + metrics: newMetricsReporter(config.Context, instr.Logger()), + done: done, } } @@ -75,18 +67,23 @@ func (mp *Processor) OnRequestHeaders(ctx context.Context, req RequestHeaders) ( } mp.firstRequest.Do(func() { - if mp.BodyParsingSizeLimit == nil { - mp.computedBodyParsingSizeLimit = req.BodyParsingSizeLimit(ctx) - if mp.computedBodyParsingSizeLimit <= 0 { - mp.instr.Logger().Info("external_processing: body parsing size limit set to 0 or negative. The request and response bodies will NOT be analyzed.") - } + var bodyLimit int64 + if mp.BodyParsingSizeLimit != nil { + bodyLimit = int64(*mp.BodyParsingSizeLimit) + } else { + bodyLimit = int64(req.BodyParsingSizeLimit(ctx)) + } + mp.computedBodyParsingSizeLimit.Store(bodyLimit) + + if bodyLimit <= 0 { + mp.instr.Logger().Info("external_processing: body parsing size limit set to 0 or negative. The request and response bodies will NOT be analyzed.") } - mp.instr.Logger().Info("external_processing: first request received. Configuration: BlockingUnavailable=%v, BodyParsingSizeLimit=%dB, Framework=%s", mp.BlockingUnavailable, mp.computedBodyParsingSizeLimit, mp.Framework) + mp.instr.Logger().Info("external_processing: first request received. Configuration: BlockingUnavailable=%v, BodyParsingSizeLimit=%dB, Framework=%s", mp.BlockingUnavailable, mp.computedBodyParsingSizeLimit.Load(), mp.Framework) }) reqState, blocked := newRequestState( httpRequest, - mp.computedBodyParsingSizeLimit, + int(mp.computedBodyParsingSizeLimit.Load()), mp.Framework, req.SpanOptions(ctx)..., ) @@ -139,7 +136,7 @@ func (mp *Processor) OnRequestBody(req HTTPBody, reqState *RequestState) error { mp.instr.Logger().Debug("message_processor: received request body: %v - EOS: %v\n", len(req.GetBody()), req.GetEndOfStream()) - if mp.computedBodyParsingSizeLimit <= 0 || reqState.State != MessageTypeRequestBody { + if mp.computedBodyParsingSizeLimit.Load() <= 0 || reqState.State != MessageTypeRequestBody { mp.instr.Logger().Error("message_processor: the body parsing has been wrongly configured. " + "Please refer to the official documentation for guidance on the proper settings or contact support.") @@ -223,7 +220,7 @@ func (mp *Processor) OnResponseBody(resp HTTPBody, reqState *RequestState) error mp.instr.Logger().Debug("message_processor: received response body: %v - EOS: %v\n", len(resp.GetBody()), resp.GetEndOfStream()) - if mp.computedBodyParsingSizeLimit <= 0 || reqState.State != MessageTypeResponseBody { + if mp.computedBodyParsingSizeLimit.Load() <= 0 || reqState.State != MessageTypeResponseBody { mp.instr.Logger().Error("message_processor: the body parsing has been wrongly configured. " + "Please refer to the official documentation for guidance on the proper settings or contact support.") return io.EOF @@ -280,7 +277,7 @@ func processBody(ctx context.Context, bodyBuffer *bodyBuffer, body []byte, eos b // isBodySupported checks if the body should be analyzed based on content type func (mp *Processor) isBodySupported(contentType string) bool { - if mp.computedBodyParsingSizeLimit <= 0 { + if mp.computedBodyParsingSizeLimit.Load() <= 0 { return false } From f61a2f2c038d1d82bca4123d55d952bcff704054 Mon Sep 17 00:00:00 2001 From: Flavien Darche Date: Fri, 24 Oct 2025 17:59:27 +0200 Subject: [PATCH 5/6] using power of 2 --- .../envoyproxy/go-control-plane/cmd/serviceextensions/README.md | 2 +- contrib/haproxy/stream-processing-offload/cmd/spoa/README.md | 2 +- instrumentation/appsec/proxy/config.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/README.md b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/README.md index 6d88a906da..89e87fee82 100644 --- a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/README.md +++ b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/README.md @@ -28,7 +28,7 @@ The ASM Service Extension expose some configuration. The configuration can be tw | `DD_SERVICE_EXTENSION_PORT` | `443` | Port used by the gRPC Server.
Envoy Google backend’s is only using secure connection to Service Extension. | | `DD_SERVICE_EXTENSION_HEALTHCHECK_PORT` | `80` | Port used for the HTTP server for the health check. | | `DD_SERVICE_EXTENSION_OBSERVABILITY_MODE` | `false` | Enable observability mode. This will process a request asynchronously (blocking would be disabled). | -| `DD_APPSEC_BODY_PARSING_SIZE_LIMIT` | `10000000` | Maximum size of the bodies to be processed in bytes. If set to 0, the bodies are not processed. | +| `DD_APPSEC_BODY_PARSING_SIZE_LIMIT` | `10485760` | Maximum size of the bodies to be processed in bytes. If set to 0, the bodies are not processed. | | `DD_SERVICE_EXTENSION_TLS` | `true` | Enable the gRPC TLS layer. Do not modify if you are using GCP. | | `DD_SERVICE_EXTENSION_TLS_KEY_FILE` | `localhost.key` | Change the default gRPC TLS layer key. Do not modify if you are using GCP. | | `DD_SERVICE_EXTENSION_TLS_CERT_FILE` | `localhost.crt` | Change the default gRPC TLS layer cert. Do not modify if you are using GCP. | diff --git a/contrib/haproxy/stream-processing-offload/cmd/spoa/README.md b/contrib/haproxy/stream-processing-offload/cmd/spoa/README.md index 62b2326043..637bd508cd 100644 --- a/contrib/haproxy/stream-processing-offload/cmd/spoa/README.md +++ b/contrib/haproxy/stream-processing-offload/cmd/spoa/README.md @@ -24,7 +24,7 @@ The HAProxy SPOA agent expose some configuration: | `DD_HAPROXY_SPOA_HOST` | `0.0.0.0` | Host on where the SPOA and HTTP server should listen to. | | `DD_HAPROXY_SPOA_PORT` | `3000` | Port used by the SPOA that accept communication with HAProxy. | | `DD_HAPROXY_SPOA_HEALTHCHECK_PORT` | `3080` | Port used for the HTTP server for the health check. | -| `DD_APPSEC_BODY_PARSING_SIZE_LIMIT` | `10000000` | Maximum size of the bodies to be processed in bytes. If set to 0, the bodies are not processed. | +| `DD_APPSEC_BODY_PARSING_SIZE_LIMIT` | `10485760` | Maximum size of the bodies to be processed in bytes. If set to 0, the bodies are not processed. | > The HAProxy SPOA need to be connected to a deployed [Datadog agent](https://docs.datadoghq.com/agent). diff --git a/instrumentation/appsec/proxy/config.go b/instrumentation/appsec/proxy/config.go index 9d94813b91..0f2249a21e 100644 --- a/instrumentation/appsec/proxy/config.go +++ b/instrumentation/appsec/proxy/config.go @@ -11,7 +11,7 @@ import ( const ( // DefaultBodyParsingSizeLimit is the default number of bytes parsed for body analysis. - DefaultBodyParsingSizeLimit = 10_000_000 // 10MB + DefaultBodyParsingSizeLimit = 10 * 1 << 20 // 10MB ) // ContinueActionOptions contains options for the continue action created through [ProcessorConfig.ContinueMessageFunc]. From 48767a0fe8b57f60a6612681579cda95ce453540 Mon Sep 17 00:00:00 2001 From: Flavien Darche Date: Fri, 24 Oct 2025 18:14:12 +0200 Subject: [PATCH 6/6] remove default integration body size limit check --- contrib/envoyproxy/go-control-plane/envoy.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/contrib/envoyproxy/go-control-plane/envoy.go b/contrib/envoyproxy/go-control-plane/envoy.go index 0953929262..5452f22b64 100644 --- a/contrib/envoyproxy/go-control-plane/envoy.go +++ b/contrib/envoyproxy/go-control-plane/envoy.go @@ -53,13 +53,7 @@ type appsecEnvoyExternalProcessorServer struct { // AppsecEnvoyExternalProcessorServer creates a new external processor server with AAP enabled func AppsecEnvoyExternalProcessorServer(userImplementation envoyextproc.ExternalProcessorServer, config AppsecEnvoyConfig) envoyextproc.ExternalProcessorServer { switch config.Integration { - case GCPServiceExtensionIntegration: - case EnvoyIntegration, IstioIntegration, EnvoyGatewayIntegration: - // Set default body parsing size limit if not specified for non-default integrations - if config.BodyParsingSizeLimit == nil { - defaultBody := proxy.DefaultBodyParsingSizeLimit - config.BodyParsingSizeLimit = &defaultBody - } + case GCPServiceExtensionIntegration, EnvoyIntegration, IstioIntegration, EnvoyGatewayIntegration: default: instr.Logger().Error("external_processing: invalid proxy integration type %d. Defaulting to GCPServiceExtensionIntegration", config.Integration) config.Integration = GCPServiceExtensionIntegration