-
-
Notifications
You must be signed in to change notification settings - Fork 109
/
Copy pathtest_finalization_tranformations.py
133 lines (112 loc) · 3.55 KB
/
test_finalization_tranformations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import pytest
from sigma.exceptions import SigmaConfigurationError, SigmaTransformationError
from sigma.processing.finalization import (
ConcatenateQueriesFinalizer,
NestedFinalizer,
TemplateFinalizer,
)
from .test_processing_transformations import dummy_pipeline, sigma_rule
def test_finalization_multiple_pipeline_set(dummy_pipeline):
finalizer = ConcatenateQueriesFinalizer()
finalizer.set_pipeline(dummy_pipeline)
with pytest.raises(SigmaTransformationError, match="Pipeline.*already set"):
finalizer.set_pipeline(dummy_pipeline)
def test_concatenate_queries_tranformation(dummy_pipeline):
transformation = ConcatenateQueriesFinalizer(separator="', '", prefix="('", suffix="')")
transformation.set_pipeline(dummy_pipeline)
assert (
transformation.apply(['field1="value1"', 'field2="value2"'])
== """('field1="value1"', 'field2="value2"')"""
)
def test_template_transformation(dummy_pipeline):
dummy_pipeline.state["setting"] = "value"
transformation = TemplateFinalizer(
"""
[config]
setting = {{ pipeline.state.setting }}
[queries]{% for query in queries %}
query{{ loop.index }} = {{ query }}{% endfor %}
"""
)
transformation.set_pipeline(dummy_pipeline)
assert (
transformation.apply(
[
"fieldA=val1",
"fieldB=val2",
"fieldC=val3",
],
)
== """
[config]
setting = value
[queries]
query1 = fieldA=val1
query2 = fieldB=val2
query3 = fieldC=val3"""
)
def test_template_transformation_from_file(dummy_pipeline):
dummy_pipeline.state["setting"] = "value"
transformation = TemplateFinalizer(template="finalize.j2", path="tests/files")
transformation.set_pipeline(dummy_pipeline)
assert (
transformation.apply(
[
"fieldA=val1",
"fieldB=val2",
"fieldC=val3",
],
)
== """[config]
setting = value
[queries]
query1 = fieldA=val1
query2 = fieldB=val2
query3 = fieldC=val3"""
)
@pytest.fixture
def nested_finalizer(dummy_pipeline):
nested_finalizer = NestedFinalizer(
finalizers=[
ConcatenateQueriesFinalizer(separator="', '", prefix="('", suffix="')"),
TemplateFinalizer("allOf({{ queries }})"),
]
)
nested_finalizer.set_pipeline(dummy_pipeline)
return nested_finalizer
def test_nested_finalizer_from_dict(nested_finalizer):
NestedFinalizer.from_dict(
{
"finalizers": [
{
"type": "concat",
"separator": "', '",
"prefix": "('",
"suffix": "')",
},
{
"type": "template",
"template": "allOf({{ queries }})",
},
]
}
) == nested_finalizer
def test_nested_finalizer_no_finalizers():
with pytest.raises(
SigmaConfigurationError, match="Nested finalizer requires a 'finalizers' key."
):
NestedFinalizer.from_dict({})
def test_nested_finalizer_no_type():
with pytest.raises(SigmaConfigurationError, match="Finalizer type not specified"):
NestedFinalizer.from_dict({"finalizers": [{"foo": "bar"}]})
def test_nested_finalizer_apply(nested_finalizer):
assert (
nested_finalizer.apply(
[
"fieldA=val1",
"fieldB=val2",
"fieldC=val3",
],
)
== """allOf(('fieldA=val1', 'fieldB=val2', 'fieldC=val3'))"""
)