Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,26 @@ public AppSecSpanPostProcessor(ApiSecuritySampler sampler, EventProducerService

@Override
public void process(@Nonnull AgentSpan span, @Nonnull BooleanSupplier timeoutCheck) {
final RequestContext ctx_ = span.getRequestContext();
if (ctx_ == null) {
return;
}
final AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
if (ctx == null) {
return;
}

if (!ctx.isKeepOpenForApiSecurityPostProcessing()) {
return;
}
AppSecRequestContext ctx = null;
RequestContext ctx_ = null;
boolean needsRelease = false;

try {
ctx_ = span.getRequestContext();
if (ctx_ == null) {
return;
}
ctx = ctx_.getData(RequestContextSlot.APPSEC);
if (ctx == null) {
return;
}

// Check if we acquired a permit for this request - must be inside try to ensure finally runs
needsRelease = ctx.isKeepOpenForApiSecurityPostProcessing();
if (!needsRelease) {
return;
}

if (timeoutCheck.getAsBoolean()) {
log.debug("Timeout detected, skipping API security post-processing");
return;
Expand All @@ -56,17 +62,25 @@ public void process(@Nonnull AgentSpan span, @Nonnull BooleanSupplier timeoutChe
log.debug("Request sampled, processing API security post-processing");
extractSchemas(ctx, ctx_.getTraceSegment());
} finally {
ctx.setKeepOpenForApiSecurityPostProcessing(false);
try {
// XXX: Close the additive first. This is not strictly needed, but it'll prevent getting it
// detected as a
// missed request-ended event.
ctx.closeWafContext();
ctx.close();
} catch (Exception e) {
log.debug("Error closing AppSecRequestContext", e);
// Always release the semaphore permit if we acquired one
if (needsRelease) {
if (ctx != null) {
ctx.setKeepOpenForApiSecurityPostProcessing(false);
// XXX: Close the additive first. This is not strictly needed, but it'll prevent getting
// it detected as a missed request-ended event.
try {
ctx.closeWafContext();
} catch (Exception e) {
log.debug("Error closing WAF context", e);
}
try {
ctx.close();
} catch (Exception e) {
log.debug("Error closing AppSecRequestContext", e);
}
}
sampler.releaseOne();
}
sampler.releaseOne();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,10 +836,32 @@ private NoopFlow onRequestEnded(RequestContext ctx_, IGSpanInfo spanInfo) {
TraceSegment traceSeg = ctx_.getTraceSegment();
Map<String, Object> tags = spanInfo.getTags();

if (maybeSampleForApiSecurity(ctx, spanInfo, tags)) {
if (!Config.get().isApmTracingEnabled()) {
// Log upstream propagated tags
Object upstreamPropagatedTs = tags.get(Tags.PROPAGATED_TRACE_SOURCE);
log.info(
"[APPSEC-57815] Request ended - spanId={}, upstream _dd.p.ts={}",
spanInfo.getSpanId(),
upstreamPropagatedTs);

boolean sampledForApiSec = maybeSampleForApiSecurity(ctx, spanInfo, tags);
boolean apmTracingEnabled = Config.get().isApmTracingEnabled();

log.info(
"[APPSEC-57815] sampledForApiSec={}, apmTracingEnabled={}",
sampledForApiSec,
apmTracingEnabled);

if (sampledForApiSec) {
if (!apmTracingEnabled) {
log.info(
"[APPSEC-57815] Setting ASM_KEEP=true (API Security sampled, APM tracing disabled)");
traceSeg.setTagTop(Tags.ASM_KEEP, true);
// Must set _dd.p.ts locally so TraceCollector respects force-keep in standalone mode
// (TraceCollector.java lines 67-74 ignore force-keep without _dd.p.ts when APM disabled)
traceSeg.setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.ASM);
// Verify the tag was set
Object asmKeepAfterSet = traceSeg.getTagTop(Tags.ASM_KEEP);
log.info("[APPSEC-57815] ASM_KEEP after setTagTop: {}", asmKeepAfterSet);
}
} else {
ctx.closeWafContext();
Expand All @@ -852,6 +874,8 @@ private NoopFlow onRequestEnded(RequestContext ctx_, IGSpanInfo spanInfo) {

Collection<AppSecEvent> collectedEvents = ctx.transferCollectedEvents();

log.info("[APPSEC-57815] Collected {} AppSec events", collectedEvents.size());

for (TraceSegmentPostProcessor pp : this.traceSegmentPostProcessors) {
pp.processTraceSegment(traceSeg, ctx, collectedEvents);
}
Expand All @@ -863,8 +887,12 @@ private NoopFlow onRequestEnded(RequestContext ctx_, IGSpanInfo spanInfo) {

// If detected any events - mark span at appsec.event
if (!collectedEvents.isEmpty()) {
if (ctx.isManuallyKept()) {
boolean manuallyKept = ctx.isManuallyKept();
log.info("[APPSEC-57815] AppSec events detected - manuallyKept={}", manuallyKept);
if (manuallyKept) {
// Set asm keep in case that root span was not available when events are detected
log.info(
"[APPSEC-57815] Setting ASM_KEEP=true and _dd.p.ts=ASM (AppSec events + manually kept)");
traceSeg.setTagTop(Tags.ASM_KEEP, true);
traceSeg.setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.ASM);
}
Expand Down Expand Up @@ -928,6 +956,15 @@ private NoopFlow onRequestEnded(RequestContext ctx_, IGSpanInfo spanInfo) {
);
}

// Log final state of propagation tags from TraceSegment (not from spanInfo.getTags() which is
// immutable)
Object finalPropagatedTs = traceSeg.getTagTop(Tags.PROPAGATED_TRACE_SOURCE);
Object finalAsmKeep = traceSeg.getTagTop(Tags.ASM_KEEP);
log.info(
"[APPSEC-57815] Request ended - final state from TraceSegment: _dd.p.ts={}, _dd.appsec.keep={}",
finalPropagatedTs,
finalAsmKeep);

ctx.close();
return NoopFlow.INSTANCE;
}
Expand All @@ -941,7 +978,10 @@ private boolean maybeSampleForApiSecurity(
ctx.setRoute(route.toString());
}
ApiSecuritySampler requestSampler = requestSamplerSupplier.get();
return requestSampler.preSampleRequest(ctx);
boolean sampled = requestSampler.preSampleRequest(ctx);
log.info(
"[APPSEC-57815] API Security sampling decision - route={}, sampled={}", route, sampled);
return sampled;
}

private Flow<Void> onRequestHeadersDone(RequestContext ctx_) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,4 +248,85 @@ class AppSecSpanPostProcessorTest extends DDSpecification {
1 * sampler.releaseOne()
0 * _
}

void 'permit is released even if extractSchemas throws exception'() {
given:
def sampler = Mock(ApiSecuritySamplerImpl)
def producer = Mock(EventProducerService)
def span = Mock(AgentSpan)
def reqCtx = Mock(RequestContext)
def ctx = Mock(AppSecRequestContext)
def processor = new AppSecSpanPostProcessor(sampler, producer)

when:
processor.process(span, { false })

then:
def ex = thrown(RuntimeException)
ex.message == "Unexpected error"
1 * span.getRequestContext() >> reqCtx
1 * reqCtx.getData(_) >> ctx
1 * ctx.isKeepOpenForApiSecurityPostProcessing() >> true
1 * sampler.sampleRequest(_) >> true
1 * reqCtx.getTraceSegment() >> { throw new RuntimeException("Unexpected error") }
1 * ctx.setKeepOpenForApiSecurityPostProcessing(false)
1 * ctx.closeWafContext()
1 * ctx.close()
1 * sampler.releaseOne() // Critical: permit is still released despite exception
0 * _
}

void 'multiple requests do not exhaust semaphore permits'() {
given:
// Use real ApiSecuritySamplerImpl which has a semaphore with 4 permits
def realSampler = new ApiSecuritySamplerImpl()
def producer = Mock(EventProducerService)
def processor = new AppSecSpanPostProcessor(realSampler, producer)

when: 'Process 5 consecutive requests that acquire permits'
5.times { i ->
def span = Mock(AgentSpan)
def reqCtx = Mock(RequestContext)
def ctx = Mock(AppSecRequestContext)

// Mock the interactions
span.getRequestContext() >> reqCtx
reqCtx.getData(_) >> ctx
ctx.isKeepOpenForApiSecurityPostProcessing() >> true
ctx.setKeepOpenForApiSecurityPostProcessing(false)
ctx.closeWafContext()
ctx.close()

// Process should complete without issues, releasing permit each time
processor.process(span, { false })
}

then: 'All requests complete successfully without permit exhaustion'
noExceptionThrown()
}

void 'permit is released when ctx cleanup operations fail'() {
given:
def sampler = Mock(ApiSecuritySamplerImpl)
def producer = Mock(EventProducerService)
def span = Mock(AgentSpan)
def reqCtx = Mock(RequestContext)
def ctx = Mock(AppSecRequestContext)
def processor = new AppSecSpanPostProcessor(sampler, producer)

when:
processor.process(span, { false })

then:
noExceptionThrown()
1 * span.getRequestContext() >> reqCtx
1 * reqCtx.getData(_) >> ctx
1 * ctx.isKeepOpenForApiSecurityPostProcessing() >> true
1 * sampler.sampleRequest(_) >> false
1 * ctx.setKeepOpenForApiSecurityPostProcessing(false)
1 * ctx.closeWafContext() >> { throw new RuntimeException("WAF context close failed") }
1 * ctx.close() >> { throw new RuntimeException("Context close failed") }
1 * sampler.releaseOne() // Critical: permit is still released despite cleanup failures
0 * _
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,7 @@ class GatewayBridgeSpecification extends DDSpecification {
1 * spanInfo.getTags() >> TagMap.fromMap(['http.route': 'route'])
1 * requestSampler.preSampleRequest(_) >> true
1 * traceSegment.setTagTop(Tags.ASM_KEEP, true)
1 * traceSegment.setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.ASM)
0 * traceSegment.setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.ASM)
}


Expand Down
Loading