Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Standalone AppSec improvements #3710

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 73 additions & 60 deletions tests/appsec/test_asm_standalone.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class AsmStandalone_UpstreamPropagation_Base:

# Enpoint that triggers an ASM event and a downstream request
requestdownstreamUrl = "/requestdownstream"
returnheadersUrl = "/returnheaders"

# Tested product
tested_product = None
Expand Down Expand Up @@ -84,12 +85,12 @@ def setup_product_is_enabled(self):
headers = {
"User-Agent": "Arachni/v1", # attack if APPSEC enabled
}
self.check_r = weblog.get(self.requestdownstreamUrl, headers=headers)
self.check_r = weblog.get(self.returnheadersUrl, headers=headers)

def setup_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_minus_1(self):
self.setup_product_is_enabled()
trace_id = 1212121212121212121
parent_id = 34343434
trace_id = 1212121212121212120
parent_id = 34343430
self.r = weblog.get(
"/requestdownstream",
headers={
Expand All @@ -112,8 +113,8 @@ def test_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_minus_1
assert self._assert_tags(trace[0], span, "metrics", tested_metrics)

assert span["metrics"]["_dd.apm.enabled"] == 0 # if key missing -> APPSEC-55222
assert span["trace_id"] == 1212121212121212121
assert trace[0]["trace_id"] == 1212121212121212121
assert span["trace_id"] == 1212121212121212120
assert trace[0]["trace_id"] == 1212121212121212120

# Some tracers use true while others use yes
assert any(
Expand All @@ -123,6 +124,7 @@ def test_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_minus_1

assert spans_checked == 1
# Downstream propagation is fully disabled in this case
assert self.r.text
downstream_headers = CaseInsensitiveDict(json.loads(self.r.text))
assert "X-Datadog-Origin" not in downstream_headers
assert "X-Datadog-Parent-Id" not in downstream_headers
Expand All @@ -133,7 +135,7 @@ def test_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_minus_1
def setup_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_0(self):
self.setup_product_is_enabled()
trace_id = 1212121212121212121
parent_id = 34343434
parent_id = 34343431
self.r = weblog.get(
"/requestdownstream",
headers={
Expand Down Expand Up @@ -167,6 +169,7 @@ def test_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_0(self)

assert spans_checked == 1
# Downstream propagation is fully disabled in this case
assert self.r.text
downstream_headers = CaseInsensitiveDict(json.loads(self.r.text))
assert "X-Datadog-Origin" not in downstream_headers
assert "X-Datadog-Parent-Id" not in downstream_headers
Expand All @@ -176,8 +179,8 @@ def test_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_0(self)

def setup_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_1(self):
self.setup_product_is_enabled()
trace_id = 1212121212121212121
parent_id = 34343434
trace_id = 1212121212121212122
parent_id = 34343432
self.r = weblog.get(
"/requestdownstream",
headers={
Expand All @@ -200,8 +203,8 @@ def test_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_1(self)
assert self._assert_tags(trace[0], span, "metrics", tested_metrics)

assert span["metrics"]["_dd.apm.enabled"] == 0 # if key missing -> APPSEC-55222
assert span["trace_id"] == 1212121212121212121
assert trace[0]["trace_id"] == 1212121212121212121
assert span["trace_id"] == 1212121212121212122
assert trace[0]["trace_id"] == 1212121212121212122

# Some tracers use true while others use yes
assert any(
Expand All @@ -211,6 +214,7 @@ def test_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_1(self)

assert spans_checked == 1
# Downstream propagation is fully disabled in this case
assert self.r.text
downstream_headers = CaseInsensitiveDict(json.loads(self.r.text))
assert "X-Datadog-Origin" not in downstream_headers
assert "X-Datadog-Parent-Id" not in downstream_headers
Expand All @@ -220,8 +224,8 @@ def test_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_1(self)

def setup_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_2(self):
self.setup_product_is_enabled()
trace_id = 1212121212121212121
parent_id = 34343434
trace_id = 1212121212121212123
parent_id = 34343433
self.r = weblog.get(
"/requestdownstream",
headers={
Expand All @@ -244,8 +248,8 @@ def test_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_2(self)
assert self._assert_tags(trace[0], span, "metrics", tested_metrics)

assert span["metrics"]["_dd.apm.enabled"] == 0 # if key missing -> APPSEC-55222
assert span["trace_id"] == 1212121212121212121
assert trace[0]["trace_id"] == 1212121212121212121
assert span["trace_id"] == 1212121212121212123
assert trace[0]["trace_id"] == 1212121212121212123

# Some tracers use true while others use yes
assert any(
Expand All @@ -255,6 +259,7 @@ def test_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_2(self)

assert spans_checked == 1
# Downstream propagation is fully disabled in this case
assert self.r.text
downstream_headers = CaseInsensitiveDict(json.loads(self.r.text))
assert "X-Datadog-Origin" not in downstream_headers
assert "X-Datadog-Parent-Id" not in downstream_headers
Expand All @@ -263,7 +268,7 @@ def test_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_2(self)
assert "X-Datadog-Trace-Id" not in downstream_headers

def setup_no_upstream_appsec_propagation__with_asm_event__is_kept_with_priority_2__from_minus_1(self):
trace_id = 1212121212121212121
trace_id = 1212121212121212124
parent_id = 34343434
self.r = weblog.get(
self.requestdownstreamUrl,
Expand All @@ -287,8 +292,8 @@ def test_no_upstream_appsec_propagation__with_asm_event__is_kept_with_priority_2
assert self._assert_tags(trace[0], span, "metrics", tested_metrics)

assert span["metrics"]["_dd.apm.enabled"] == 0 # if key missing -> APPSEC-55222
assert span["trace_id"] == 1212121212121212121
assert trace[0]["trace_id"] == 1212121212121212121
assert span["trace_id"] == 1212121212121212124
assert trace[0]["trace_id"] == 1212121212121212124

# Some tracers use true while others use yes
assert any(
Expand All @@ -297,17 +302,18 @@ def test_no_upstream_appsec_propagation__with_asm_event__is_kept_with_priority_2
spans_checked += 1

assert spans_checked == 1
assert self.r.text
downstream_headers = CaseInsensitiveDict(json.loads(self.r.text))
assert downstream_headers["X-Datadog-Origin"] == "rum"
assert downstream_headers["X-Datadog-Parent-Id"] != "34343434"
assert "_dd.p.other=1" in downstream_headers["X-Datadog-Tags"]
assert "_dd.p.appsec=1" in downstream_headers["X-Datadog-Tags"]
assert downstream_headers["X-Datadog-Sampling-Priority"] == "2"
assert downstream_headers["X-Datadog-Trace-Id"] == "1212121212121212121"
assert downstream_headers["X-Datadog-Trace-Id"] == "1212121212121212124"

def setup_no_upstream_appsec_propagation__with_asm_event__is_kept_with_priority_2__from_0(self):
trace_id = 1212121212121212121
parent_id = 34343434
trace_id = 1212121212121212125
parent_id = 34343435
self.r = weblog.get(
self.requestdownstreamUrl,
headers={
Expand All @@ -331,8 +337,8 @@ def test_no_upstream_appsec_propagation__with_asm_event__is_kept_with_priority_2
assert self._assert_tags(trace[0], span, "metrics", tested_metrics)

assert span["metrics"]["_dd.apm.enabled"] == 0
assert span["trace_id"] == 1212121212121212121
assert trace[0]["trace_id"] == 1212121212121212121
assert span["trace_id"] == 1212121212121212125
assert trace[0]["trace_id"] == 1212121212121212125

# Some tracers use true while others use yes
assert any(
Expand All @@ -341,18 +347,19 @@ def test_no_upstream_appsec_propagation__with_asm_event__is_kept_with_priority_2
spans_checked += 1

assert spans_checked == 1
assert self.r.text
downstream_headers = CaseInsensitiveDict(json.loads(self.r.text))
assert downstream_headers["X-Datadog-Origin"] == "rum"
assert downstream_headers["X-Datadog-Parent-Id"] != "34343434"
assert downstream_headers["X-Datadog-Parent-Id"] != "34343435"
assert "_dd.p.other=1" in downstream_headers["X-Datadog-Tags"]
assert "_dd.p.appsec=1" in downstream_headers["X-Datadog-Tags"]
assert downstream_headers["X-Datadog-Sampling-Priority"] == "2"
assert downstream_headers["X-Datadog-Trace-Id"] == "1212121212121212121"
assert downstream_headers["X-Datadog-Trace-Id"] == "1212121212121212125"

def setup_upstream_appsec_propagation__no_asm_event__is_propagated_as_is__being_0(self):
self.setup_product_is_enabled()
trace_id = 1212121212121212121
parent_id = 34343434
trace_id = 1212121212121212126
parent_id = 34343436
self.r = weblog.get(
"/requestdownstream",
headers={
Expand All @@ -375,8 +382,8 @@ def test_upstream_appsec_propagation__no_asm_event__is_propagated_as_is__being_0
assert self._assert_tags(trace[0], span, "metrics", tested_metrics)

assert span["metrics"]["_dd.apm.enabled"] == 0 # if key missing -> APPSEC-55222
assert span["trace_id"] == 1212121212121212121
assert trace[0]["trace_id"] == 1212121212121212121
assert span["trace_id"] == 1212121212121212126
assert trace[0]["trace_id"] == 1212121212121212126

# Some tracers use true while others use yes
assert any(
Expand All @@ -385,17 +392,18 @@ def test_upstream_appsec_propagation__no_asm_event__is_propagated_as_is__being_0
spans_checked += 1

assert spans_checked == 1
assert self.r.text
downstream_headers = CaseInsensitiveDict(json.loads(self.r.text))
assert downstream_headers["X-Datadog-Origin"] == "rum"
assert downstream_headers["X-Datadog-Parent-Id"] != "34343434"
assert downstream_headers["X-Datadog-Parent-Id"] != "34343436"
assert "_dd.p.appsec=1" in downstream_headers["X-Datadog-Tags"]
assert downstream_headers["X-Datadog-Sampling-Priority"] in ["0", "2"]
assert downstream_headers["X-Datadog-Trace-Id"] == "1212121212121212121"
assert downstream_headers["X-Datadog-Trace-Id"] == "1212121212121212126"

def setup_upstream_appsec_propagation__no_asm_event__is_propagated_as_is__being_1(self):
self.setup_product_is_enabled()
trace_id = 1212121212121212121
parent_id = 34343434
trace_id = 1212121212121212127
parent_id = 34343437
self.r = weblog.get(
"/requestdownstream",
headers={
Expand All @@ -418,8 +426,8 @@ def test_upstream_appsec_propagation__no_asm_event__is_propagated_as_is__being_1
assert self._assert_tags(trace[0], span, "metrics", tested_metrics)

assert span["metrics"]["_dd.apm.enabled"] == 0 # if key missing -> APPSEC-55222
assert span["trace_id"] == 1212121212121212121
assert trace[0]["trace_id"] == 1212121212121212121
assert span["trace_id"] == 1212121212121212127
assert trace[0]["trace_id"] == 1212121212121212127

# Some tracers use true while others use yes
assert any(
Expand All @@ -428,17 +436,18 @@ def test_upstream_appsec_propagation__no_asm_event__is_propagated_as_is__being_1
spans_checked += 1

assert spans_checked == 1
assert self.r.text
downstream_headers = CaseInsensitiveDict(json.loads(self.r.text))
assert downstream_headers["X-Datadog-Origin"] == "rum"
assert downstream_headers["X-Datadog-Parent-Id"] != "34343434"
assert downstream_headers["X-Datadog-Parent-Id"] != "34343437"
assert "_dd.p.appsec=1" in downstream_headers["X-Datadog-Tags"]
assert downstream_headers["X-Datadog-Sampling-Priority"] in ["1", "2"]
assert downstream_headers["X-Datadog-Trace-Id"] == "1212121212121212121"
assert downstream_headers["X-Datadog-Trace-Id"] == "1212121212121212127"

def setup_upstream_appsec_propagation__no_asm_event__is_propagated_as_is__being_2(self):
self.setup_product_is_enabled()
trace_id = 1212121212121212121
parent_id = 34343434
trace_id = 1212121212121212128
parent_id = 34343438
self.r = weblog.get(
"/requestdownstream",
headers={
Expand All @@ -461,8 +470,8 @@ def test_upstream_appsec_propagation__no_asm_event__is_propagated_as_is__being_2
assert self._assert_tags(trace[0], span, "metrics", tested_metrics)

assert span["metrics"]["_dd.apm.enabled"] == 0 # if key missing -> APPSEC-55222
assert span["trace_id"] == 1212121212121212121
assert trace[0]["trace_id"] == 1212121212121212121
assert span["trace_id"] == 1212121212121212128
assert trace[0]["trace_id"] == 1212121212121212128

# Some tracers use true while others use yes
assert any(
Expand All @@ -471,16 +480,17 @@ def test_upstream_appsec_propagation__no_asm_event__is_propagated_as_is__being_2
spans_checked += 1

assert spans_checked == 1
assert self.r.text
downstream_headers = CaseInsensitiveDict(json.loads(self.r.text))
assert downstream_headers["X-Datadog-Origin"] == "rum"
assert downstream_headers["X-Datadog-Parent-Id"] != "34343434"
assert downstream_headers["X-Datadog-Parent-Id"] != "34343438"
assert "_dd.p.appsec=1" in downstream_headers["X-Datadog-Tags"]
assert downstream_headers["X-Datadog-Sampling-Priority"] == "2"
assert downstream_headers["X-Datadog-Trace-Id"] == "1212121212121212121"
assert downstream_headers["X-Datadog-Trace-Id"] == "1212121212121212128"

def setup_any_upstream_propagation__with_asm_event__raises_priority_to_2__from_minus_1(self):
trace_id = 1212121212121212121
parent_id = 34343434
trace_id = 1212121212121212129
parent_id = 34343439
self.r = weblog.get(
self.requestdownstreamUrl,
headers={
Expand All @@ -502,8 +512,8 @@ def test_any_upstream_propagation__with_asm_event__raises_priority_to_2__from_mi
assert self._assert_tags(trace[0], span, "metrics", tested_metrics)

assert span["metrics"]["_dd.apm.enabled"] == 0 # if key missing -> APPSEC-55222
assert span["trace_id"] == 1212121212121212121
assert trace[0]["trace_id"] == 1212121212121212121
assert span["trace_id"] == 1212121212121212129
assert trace[0]["trace_id"] == 1212121212121212129

# Some tracers use true while others use yes
assert any(
Expand All @@ -512,16 +522,17 @@ def test_any_upstream_propagation__with_asm_event__raises_priority_to_2__from_mi
spans_checked += 1

assert spans_checked == 1
assert self.r.text
downstream_headers = CaseInsensitiveDict(json.loads(self.r.text))
assert downstream_headers["X-Datadog-Origin"] == "rum"
assert downstream_headers["X-Datadog-Parent-Id"] != "34343434"
assert downstream_headers["X-Datadog-Parent-Id"] != "34343439"
assert "_dd.p.appsec=1" in downstream_headers["X-Datadog-Tags"]
assert downstream_headers["X-Datadog-Sampling-Priority"] == "2"
assert downstream_headers["X-Datadog-Trace-Id"] == "1212121212121212121"
assert downstream_headers["X-Datadog-Trace-Id"] == "1212121212121212129"

def setup_any_upstream_propagation__with_asm_event__raises_priority_to_2__from_0(self):
trace_id = 1212121212121212121
parent_id = 34343434
trace_id = 1212121212121212110
parent_id = 34343410
self.r = weblog.get(
self.requestdownstreamUrl,
headers={
Expand All @@ -543,8 +554,8 @@ def test_any_upstream_propagation__with_asm_event__raises_priority_to_2__from_0(
assert self._assert_tags(trace[0], span, "metrics", tested_metrics)

assert span["metrics"]["_dd.apm.enabled"] == 0 # if key missing -> APPSEC-55222
assert span["trace_id"] == 1212121212121212121
assert trace[0]["trace_id"] == 1212121212121212121
assert span["trace_id"] == 1212121212121212110
assert trace[0]["trace_id"] == 1212121212121212110

# Some tracers use true while others use yes
assert any(
Expand All @@ -553,16 +564,17 @@ def test_any_upstream_propagation__with_asm_event__raises_priority_to_2__from_0(
spans_checked += 1

assert spans_checked == 1
assert self.r.text
downstream_headers = CaseInsensitiveDict(json.loads(self.r.text))
assert downstream_headers["X-Datadog-Origin"] == "rum"
assert downstream_headers["X-Datadog-Parent-Id"] != "34343434"
assert downstream_headers["X-Datadog-Parent-Id"] != "34343410"
assert "_dd.p.appsec=1" in downstream_headers["X-Datadog-Tags"]
assert downstream_headers["X-Datadog-Sampling-Priority"] == "2"
assert downstream_headers["X-Datadog-Trace-Id"] == "1212121212121212121"
assert downstream_headers["X-Datadog-Trace-Id"] == "1212121212121212110"

def setup_any_upstream_propagation__with_asm_event__raises_priority_to_2__from_1(self):
trace_id = 1212121212121212121
parent_id = 34343434
trace_id = 1212121212121212111
parent_id = 34343411
self.r = weblog.get(
self.requestdownstreamUrl,
headers={
Expand All @@ -584,8 +596,8 @@ def test_any_upstream_propagation__with_asm_event__raises_priority_to_2__from_1(
assert self._assert_tags(trace[0], span, "metrics", tested_metrics)

assert span["metrics"]["_dd.apm.enabled"] == 0 # if key missing -> APPSEC-55222
assert span["trace_id"] == 1212121212121212121
assert trace[0]["trace_id"] == 1212121212121212121
assert span["trace_id"] == 1212121212121212111
assert trace[0]["trace_id"] == 1212121212121212111

# Some tracers use true while others use yes
assert any(
Expand All @@ -594,12 +606,13 @@ def test_any_upstream_propagation__with_asm_event__raises_priority_to_2__from_1(
spans_checked += 1

assert spans_checked == 1
assert self.r.text
downstream_headers = CaseInsensitiveDict(json.loads(self.r.text))
assert downstream_headers["X-Datadog-Origin"] == "rum"
assert downstream_headers["X-Datadog-Parent-Id"] != "34343434"
assert downstream_headers["X-Datadog-Parent-Id"] != "34343411"
assert "_dd.p.appsec=1" in downstream_headers["X-Datadog-Tags"]
assert downstream_headers["X-Datadog-Sampling-Priority"] == "2"
assert downstream_headers["X-Datadog-Trace-Id"] == "1212121212121212121"
assert downstream_headers["X-Datadog-Trace-Id"] == "1212121212121212111"


@rfc("https://docs.google.com/document/d/12NBx-nD-IoQEMiCRnJXneq4Be7cbtSc6pJLOFUWTpNE/edit")
Expand Down
Loading