-
-
Notifications
You must be signed in to change notification settings - Fork 109
/
Copy pathtest_postprocessing_transformations.py
144 lines (122 loc) · 4.78 KB
/
test_postprocessing_transformations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import pytest
from sigma.exceptions import SigmaConfigurationError
from sigma.processing.pipeline import ProcessingPipeline, QueryPostprocessingItem
from sigma.processing.postprocessing import (
EmbedQueryInJSONTransformation,
EmbedQueryTransformation,
NestedQueryPostprocessingTransformation,
QuerySimpleTemplateTransformation,
QueryTemplateTransformation,
ReplaceQueryTransformation,
)
from sigma.rule import SigmaRule
from .test_processing_transformations import dummy_pipeline, sigma_rule
def test_embed_query_transformation(dummy_pipeline, sigma_rule):
transformation = EmbedQueryTransformation("[ ", " ]")
transformation.set_pipeline(dummy_pipeline)
assert transformation.apply(sigma_rule, "field=value") == "[ field=value ]"
def test_embed_query_transformation_none(dummy_pipeline, sigma_rule):
transformation = EmbedQueryTransformation()
transformation.set_pipeline(dummy_pipeline)
assert transformation.apply(sigma_rule, "field=value") == "field=value"
def test_query_simple_template_transformation(
dummy_pipeline: ProcessingPipeline, sigma_rule: SigmaRule
):
transformation = QuerySimpleTemplateTransformation(
"""
title = {rule.title}
query = {query}
state = {pipeline.state[test]}
"""
)
transformation.set_pipeline(dummy_pipeline)
dummy_pipeline.state["test"] = "teststate"
assert (
transformation.apply(sigma_rule, 'field="value"')
== """
title = Test
query = field="value"
state = teststate
"""
)
def test_query_template_transformation(dummy_pipeline: ProcessingPipeline, sigma_rule: SigmaRule):
transformation = QueryTemplateTransformation(
"""
title = {{ rule.title }}
query = {{ query }}
state = {{ pipeline.state.test }}
"""
)
transformation.set_pipeline(dummy_pipeline)
dummy_pipeline.state["test"] = "teststate"
assert (
transformation.apply(sigma_rule, 'field="value"')
== """
title = Test
query = field="value"
state = teststate
"""
)
def test_embed_query_in_json_transformation_dict(dummy_pipeline, sigma_rule):
transformation = EmbedQueryInJSONTransformation('{ "field": "value", "query": "%QUERY%" }')
transformation.set_pipeline(dummy_pipeline)
assert (
transformation.apply(sigma_rule, 'field="value"')
== '{"field": "value", "query": "field=\\"value\\""}'
)
def test_embed_query_in_json_transformation_list(dummy_pipeline, sigma_rule):
transformation = EmbedQueryInJSONTransformation(
'{ "field": "value", "query": ["foo", "%QUERY%", "bar"] }'
)
transformation.set_pipeline(dummy_pipeline)
assert (
transformation.apply(sigma_rule, 'field="value"')
== '{"field": "value", "query": ["foo", "field=\\"value\\"", "bar"]}'
)
def test_replace_query_transformation(dummy_pipeline, sigma_rule):
transformation = ReplaceQueryTransformation("v\\w+e", "replaced")
transformation.set_pipeline(dummy_pipeline)
assert transformation.apply(sigma_rule, 'field="value"') == 'field="replaced"'
@pytest.fixture
def nested_query_postprocessing_transformation(dummy_pipeline):
transformation = NestedQueryPostprocessingTransformation(
items=[
QueryPostprocessingItem(ReplaceQueryTransformation("foo", "bar")),
QueryPostprocessingItem(EmbedQueryTransformation("[", "]"), identifier="test"),
QueryPostprocessingItem(
QuerySimpleTemplateTransformation("title = {rule.title}\nquery = {query}")
),
]
)
transformation.set_pipeline(dummy_pipeline)
return transformation
def test_nested_query_postprocessing_transformation_from_dict(
nested_query_postprocessing_transformation,
):
assert (
NestedQueryPostprocessingTransformation.from_dict(
{
"items": [
{"type": "replace", "pattern": "foo", "replacement": "bar"},
{"type": "embed", "prefix": "[", "suffix": "]", "id": "test"},
{
"type": "simple_template",
"template": "title = {rule.title}\nquery = {query}",
},
],
}
)
== nested_query_postprocessing_transformation
)
def test_nested_query_postprocessing_transformation_no_items():
with pytest.raises(
SigmaConfigurationError,
match="Nested post-processing transformation requires an 'items' key.",
):
NestedQueryPostprocessingTransformation.from_dict({})
def test_nested_query_postprocessing_transformation(
nested_query_postprocessing_transformation, sigma_rule
):
result = nested_query_postprocessing_transformation.apply(sigma_rule, 'field="foobar"')
assert result == 'title = Test\nquery = [field="barbar"]'
assert sigma_rule.was_processed_by("test")