-
-
Notifications
You must be signed in to change notification settings - Fork 109
/
Copy pathtest_filters.py
306 lines (248 loc) · 9.71 KB
/
test_filters.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
import copy
import uuid
from pathlib import Path
from typing import Callable
import pytest
from sigma.collection import SigmaCollection
from sigma.correlations import SigmaRuleReference
from sigma.exceptions import (
SigmaLogsourceError,
SigmaDetectionError,
SigmaTitleError,
SigmaFilterConditionError,
SigmaFilterError,
SigmaFilterRuleReferenceError,
)
from sigma.filters import SigmaFilter, SigmaGlobalFilter
from sigma.processing.conditions import LogsourceCondition
from sigma.processing.pipeline import ProcessingItem
from sigma.processing.transformations import FieldMappingTransformation
from sigma.rule import SigmaLogSource
from .test_conversion_base import test_backend
@pytest.fixture
def sigma_filter():
return SigmaFilter.from_yaml(
"""
title: Filter Administrator account
description: The valid administrator account start with adm_
logsource:
category: process_creation
product: windows
filter:
rules:
- 6f3e2987-db24-4c78-a860-b4f4095a7095 # Data Compressed - rar.exe
- df0841c0-9846-4e9f-ad8a-7df91571771b # Login on jump host
selection:
User|startswith: 'adm_'
condition: not selection
"""
)
@pytest.fixture
def rule_collection():
return SigmaCollection.from_yaml(
"""
title: Failed login
id: 6f3e2987-db24-4c78-a860-b4f4095a7095
name: failed_login
logsource:
category: process_creation
product: windows
detection:
selection:
- EventID: 4625
- EventID2: 4624
condition: selection
"""
)
@pytest.fixture
def event_count_correlation_rule():
return SigmaCollection.from_yaml(
"""
title: Failed logon
name: failed_logon
id: df0841c0-9846-4e9f-ad8a-7df91571771b
status: test
logsource:
product: windows
category: process_creation
detection:
selection:
EventID: 4625
condition: selection
---
title: Multiple failed logons for a single user (possible brute force attack)
status: test
correlation:
type: event_count
rules:
- failed_logon
group-by:
- TargetUserName
- TargetDomainName
- fieldB
timespan: 5m
condition:
gte: 10
"""
)
def test_filter_valid(sigma_filter):
assert isinstance(sigma_filter, SigmaFilter)
assert sigma_filter.title == "Filter Administrator account"
assert sigma_filter.description == "The valid administrator account start with adm_"
assert sigma_filter.logsource == SigmaLogSource.from_dict(
{"category": "process_creation", "product": "windows"}
)
assert sigma_filter.filter == SigmaGlobalFilter.from_dict(
{
"rules": [
"6f3e2987-db24-4c78-a860-b4f4095a7095",
"df0841c0-9846-4e9f-ad8a-7df91571771b",
],
"selection": {"User|startswith": "adm_"},
"condition": "not selection",
}
)
def test_basic_filter_application(sigma_filter, test_backend, rule_collection):
rule_collection.rules += [sigma_filter]
assert test_backend.convert(rule_collection) == [
'(EventID=4625 or EventID2=4624) and not User startswith "adm_"'
]
def test_basic_filter_application_against_correlation_rule(
sigma_filter, test_backend, event_count_correlation_rule
):
event_count_correlation_rule.rules += [sigma_filter]
assert test_backend.convert(event_count_correlation_rule) == [
'EventID=4625 and not User startswith "adm_"\n'
"| aggregate window=5min count() as event_count by TargetUserName, "
"TargetDomainName, mappedB\n"
"| where event_count >= 10"
]
def test_filter_application_to_several_rules(sigma_filter, test_backend, rule_collection):
rule_copy = copy.deepcopy(rule_collection.rules[0])
rule_copy.id = uuid.UUID("257f7780-ea6c-48d4-ae8e-2b95b3740d84")
sigma_filter.filter.rules.append(SigmaRuleReference(str(rule_copy.id)))
rule_collection.rules.extend([rule_copy, sigma_filter])
assert (
test_backend.convert(rule_collection)
== ['(EventID=4625 or EventID2=4624) and not User startswith "adm_"'] * 2
)
def test_reducing_rule_collections(sigma_filter, test_backend, rule_collection):
rule_collection.rules += [sigma_filter]
assert len(rule_collection.rules) == 2
# Applies / Flattens all the filters onto the rules in processing
rule_collection.resolve_rule_references()
assert len(rule_collection.rules) == 1
def test_filter_with_field_mapping_against_it(sigma_filter, test_backend, rule_collection):
rule_collection.rules += [sigma_filter]
# Field Mapping
test_backend.processing_pipeline.items.append(
ProcessingItem(
FieldMappingTransformation({"User": "User123"}),
rule_conditions=[
LogsourceCondition(**sigma_filter.logsource.to_dict()),
],
)
)
assert test_backend.convert(rule_collection) == [
'(EventID=4625 or EventID2=4624) and not User123 startswith "adm_"'
]
def test_filter_sigma_collection_from_files(test_backend):
rule_collection = SigmaCollection.load_ruleset(
[Path("tests/files/rule_valid"), Path("tests/files/filter_valid")]
)
assert len(rule_collection.rules) == 2
assert test_backend.convert(rule_collection) == [
'EventID=1234 and not ComputerName startswith "DC-"'
]
def test_filter_sigma_collection_from_files_duplicated(test_backend):
rule_collection = SigmaCollection.load_ruleset(
[
Path("tests/files/rule_valid"),
Path("tests/files/filter_valid"),
Path("tests/files/filter_valid"),
]
)
assert len(rule_collection.rules) == 3
assert test_backend.convert(rule_collection) == [
'EventID=1234 and not ComputerName startswith "DC-" and not ComputerName startswith "DC-"'
]
def test_filter_sigma_collection_from_ruleset(sigma_filter, test_backend):
rule_collection = SigmaCollection.load_ruleset(
[
Path("tests/files/correlation_rule_valid"),
]
)
sigma_filter = SigmaFilter.from_dict(
{**sigma_filter.to_dict(), **{"logsource": {"category": "test"}}}
)
sigma_filter.filter.rules.append(SigmaRuleReference("5d8fd9da-6916-45ef-8d4d-3fa9d19d1a64"))
rule_collection.rules += [sigma_filter]
assert len(rule_collection.rules) == 7
assert test_backend.convert(rule_collection) == [
'mappedA="value1" and mappedB="value2" and not User startswith "adm_"\n'
"| aggregate window=15min count() as event_count by fieldC, fieldD\n"
"| where event_count >= 10",
'mappedA="value1" and mappedB="value2" and not User startswith "adm_"\n'
"| aggregate window=15min value_count(fieldD) as value_count by fieldC\n"
"| where value_count < 10",
'subsearch { mappedA="value1" and mappedB="value2" | set '
'event_type="base_rule_1" | set field=fieldC }\n'
'subsearch { mappedA="value3" and mappedB="value4" | set '
'event_type="base_rule_2" | set field=fieldD }\n'
"\n"
"| temporal window=15min eventtypes=base_rule_1,base_rule_2 by fieldC\n"
"\n"
"| where eventtype_count >= 2",
]
def test_invalid_rule_id_matching(sigma_filter, test_backend, rule_collection):
# Change the rule id to something else
rule_collection.rules += [sigma_filter]
rule_collection.rules[0].id = "invalid-id"
assert test_backend.convert(rule_collection) == ["EventID=4625 or EventID2=4624"]
def test_no_rules_section(sigma_filter, test_backend, rule_collection):
rule_collection.rules += [sigma_filter]
rule_collection.rules[1].filter.rules = None
assert test_backend.convert(rule_collection) == ["EventID=4625 or EventID2=4624"]
# Validation Errors
@pytest.mark.parametrize(
"transformation,error",
[
[lambda sf: sf.pop("logsource", None), SigmaLogsourceError],
[lambda sf: sf.pop("filter", None), SigmaFilterError],
[lambda sf: sf.pop("title", None), SigmaTitleError],
[lambda sf: sf["filter"].pop("condition", None), SigmaFilterConditionError],
[lambda sf: sf["filter"].pop("selection", None), SigmaDetectionError],
[lambda sf: sf["filter"].pop("rules", None), SigmaFilterRuleReferenceError],
# Set the value to None
[lambda sf: sf.update({"logsource": None}), SigmaLogsourceError],
[lambda sf: sf.update({"filter": None}), SigmaFilterError],
[lambda sf: sf.update({"title": None}), SigmaTitleError],
[lambda sf: sf["filter"].update({"condition": None}), SigmaFilterConditionError],
[lambda sf: sf["filter"].update({"rules": None}), SigmaFilterRuleReferenceError],
],
)
def test_filter_validation_errors(transformation: Callable, error, sigma_filter):
# Create a copy of the sigma_filter dictionary to avoid modifying the original
sf_copy = sigma_filter.to_dict()
# Apply the transformation to the copied dictionary
transformation(sf_copy)
with pytest.raises(error):
SigmaFilter.from_dict(sf_copy)
def test_sigma_filter_with_multiple_conditions_raises_error(sigma_filter):
# Create a copy of the sigma_filter dictionary to avoid modifying the original
sf_copy = sigma_filter.to_dict()
sf_copy["filter"]["condition"] = ["selection", "not selection"]
with pytest.raises(SigmaFilterConditionError):
SigmaFilter.from_dict(sf_copy)
# def test_logsource_subset(sigma_filter, test_backend, rule_collection):
# # Remove logsource.category
# new_filter = sigma_filter.to_dict()
# new_filter["logsource"].pop("category")
# sigma_filter = SigmaFilter.from_dict(new_filter)
# rule_collection.rules += [sigma_filter]
#
# rule_collection.resolve_rule_references()
#
# assert test_backend.convert(rule_collection) == [
# '(EventID=4625 or EventID2=4624) and not User startswith "adm_"'
# ]