Skip to content

Commit

Permalink
Correlation rule awareness of validators
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaspatzke committed Jan 29, 2024
1 parent f0d1ffd commit fb78999
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 19 deletions.
18 changes: 11 additions & 7 deletions sigma/validators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import re
from typing import ClassVar, Dict, List, Optional, Set, Type
import sigma
from sigma.rule import SigmaDetection, SigmaDetectionItem, SigmaRule, SigmaRuleTag
from sigma.correlations import SigmaCorrelationRule
from sigma.rule import SigmaDetection, SigmaDetectionItem, SigmaRule, SigmaRuleBase, SigmaRuleTag
from sigma.types import SigmaString, SigmaType


Expand Down Expand Up @@ -70,7 +71,7 @@ class SigmaRuleValidator(ABC):
"""

@abstractmethod
def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:
def validate(self, rule: SigmaRuleBase) -> List[SigmaValidationIssue]:
"""Implementation of the rule validation.
:param rule: Sigma rule that should be validated.
Expand Down Expand Up @@ -110,11 +111,14 @@ def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:
Iterate over all detections and call validate_detection() for each.
"""
super().validate(rule)
return [
issue
for name, detection in rule.detection.detections.items()
for issue in self.validate_detection(name, detection)
]
if isinstance(rule, SigmaCorrelationRule):
return []
else:
return [
issue
for name, detection in rule.detection.detections.items()
for issue in self.validate_detection(name, detection)
]

@abstractmethod
def validate_detection(
Expand Down
10 changes: 10 additions & 0 deletions sigma/validators/core/condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import re
from typing import ClassVar, List, Set
from sigma.conditions import ConditionIdentifier, ConditionItem, ConditionSelector
from sigma.correlations import SigmaCorrelationRule
from sigma.rule import SigmaDetections, SigmaRule
from sigma.validators.base import (
SigmaValidationIssue,
Expand Down Expand Up @@ -49,6 +50,9 @@ def condition_referenced_ids(
return set()

def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:
if isinstance(rule, SigmaCorrelationRule):
return [] # Correlation rules do not have detections

detection_names = { # collect detection names
name for name in rule.detection.detections.keys()
}
Expand All @@ -70,6 +74,9 @@ class ThemConditionWithSingleDetectionValidator(SigmaRuleValidator):
"""Detect conditions refering to 'them' with only one detection."""

def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:
if isinstance(rule, SigmaCorrelationRule):
return [] # Correlation rules do not have detections

if (
any(["them" in condition for condition in rule.detection.condition])
and len(rule.detection.detections) == 1
Expand All @@ -93,6 +100,9 @@ class AllOfThemConditionValidator(SigmaRuleValidator):
re_all_of_them: ClassVar[Pattern] = re.compile("all\\s+of\\s+them")

def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:
if isinstance(rule, SigmaCorrelationRule):
return [] # Correlation rules do not have detections

if any([self.re_all_of_them.search(condition) for condition in rule.detection.condition]):
return [AllOfThemConditionIssue([rule])]
else:
Expand Down
4 changes: 4 additions & 0 deletions sigma/validators/core/logsources.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dataclasses import dataclass, field
from typing import ClassVar, Dict, List, Tuple
from sigma.correlations import SigmaCorrelationRule
from sigma.rule import SigmaDetectionItem, SigmaRule
from sigma.types import SigmaNumber

Expand Down Expand Up @@ -68,6 +69,9 @@ class SpecificInsteadOfGenericLogsourceValidator(SigmaDetectionItemValidator):
"""Identify usage of specific Windows event identifiers where corresponding generic log sources exist."""

def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:
if isinstance(rule, SigmaCorrelationRule):
return [] # Correlation rules do not have detections

for (
logsource,
eventid_mappings,
Expand Down
10 changes: 10 additions & 0 deletions tests/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
IdentifierExistenceIssue,
IdentifierCollisionIssue,
)
from .test_correlations import correlation_rule


@pytest.fixture
Expand All @@ -33,6 +34,15 @@ def test_sigmavalidator_validate_rules(rule_with_id, rule_without_id, rules_with
]


def test_sigmavalidator_validate_correlation_rules(correlation_rule):
rules = SigmaCollection([correlation_rule])
validator = SigmaValidator(
{IdentifierExistenceValidator, IdentifierUniquenessValidator, DanglingDetectionValidator}
)
issues = validator.validate_rules(rules)
assert issues == [IdentifierExistenceIssue([correlation_rule])]


def test_sigmavalidator_exclusions(rule_with_id, rule_without_id, rules_with_id_collision):
rules = SigmaCollection([rule_with_id, rule_without_id, *rules_with_id_collision])
exclusions = {
Expand Down
37 changes: 31 additions & 6 deletions tests/test_validators.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
from uuid import UUID
from wsgiref.validate import validator

import pytest
from sigma.exceptions import SigmaValueError

from sigma.rule import SigmaDetectionItem, SigmaLogSource, SigmaRule
from sigma.types import SigmaString

from sigma.validators.core.logsources import (
SpecificInsteadOfGenericLogsourceValidator,
SpecificInsteadOfGenericLogsourceIssue,
Expand All @@ -25,6 +19,7 @@
EscapedWildcardIssue,
EscapedWildcardValidator,
)
from .test_correlations import correlation_rule


@pytest.fixture
Expand Down Expand Up @@ -116,6 +111,11 @@ def test_validator_double_wildcard_valid():
assert validator.validate(rule) == []


def test_validator_double_wildcard_correlation_rule(correlation_rule):
validator = DoubleWildcardValidator()
assert validator.validate(correlation_rule) == []


def test_validator_number_as_string():
validator = NumberAsStringValidator()
rule = SigmaRule.from_yaml(
Expand Down Expand Up @@ -151,6 +151,11 @@ def test_validator_number_as_string_valid():
assert validator.validate(rule) == []


def test_validator_number_as_string_correlation_rule(correlation_rule):
validator = NumberAsStringValidator()
assert validator.validate(correlation_rule) == []


def test_validator_control_characters():
validator = ControlCharacterValidator()
rule = SigmaRule.from_yaml(
Expand All @@ -169,6 +174,11 @@ def test_validator_control_characters():
assert validator.validate(rule) == [ControlCharacterIssue([rule], SigmaString("\temp"))]


def test_validator_control_characters_correlation_rule(correlation_rule):
validator = ControlCharacterValidator()
assert validator.validate(correlation_rule) == []


def test_validator_wildcards_instead_of_contains():
validator = WildcardsInsteadOfModifiersValidator()
rule = SigmaRule.from_yaml(
Expand Down Expand Up @@ -288,6 +298,11 @@ def test_validator_wildcards_instead_of_modifiers_inconsistent():
assert validator.validate(rule) == []


def test_validator_wildcards_instead_of_modifiers_correlation_rule(correlation_rule):
validator = WildcardsInsteadOfModifiersValidator()
assert validator.validate(correlation_rule) == []


def test_validator_sysmon_insteadof_generic_logsource():
validator = SpecificInsteadOfGenericLogsourceValidator()
rule = SigmaRule.from_yaml(
Expand Down Expand Up @@ -359,6 +374,11 @@ def test_validator_sysmon_insteadof_generic_logsource_other_valid():
assert validator.validate(rule) == []


def test_validator_specific_insteadof_generic_correlation_rule(correlation_rule):
validator = SpecificInsteadOfGenericLogsourceValidator()
assert validator.validate(correlation_rule) == []


def test_validator_escaped_wildcard():
validator = EscapedWildcardValidator()
rule = SigmaRule.from_yaml(
Expand Down Expand Up @@ -393,3 +413,8 @@ def test_validator_escaped_wildcard_valid():
"""
)
assert validator.validate(rule) == []


def test_validator_escaped_wildcard_correlation_rule(correlation_rule):
validator = EscapedWildcardValidator()
assert validator.validate(correlation_rule) == []
20 changes: 16 additions & 4 deletions tests/test_validators_condition.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import pytest
from wsgiref.validate import validator

from sigma.rule import SigmaRule


from sigma.validators.core.condition import (
AllOfThemConditionIssue,
AllOfThemConditionValidator,
Expand All @@ -12,6 +8,7 @@
ThemConditionWithSingleDetectionIssue,
ThemConditionWithSingleDetectionValidator,
)
from .test_correlations import correlation_rule


def test_validator_dangling_detection():
Expand All @@ -37,6 +34,11 @@ def test_validator_dangling_detection():
assert validator.validate(rule) == [DanglingDetectionIssue([rule], "unreferenced")]


def test_validator_dangling_detection_correlation_rule(correlation_rule):
validator = DanglingDetectionValidator()
assert validator.validate(correlation_rule) == []


def test_validator_dangling_detection_valid():
validator = DanglingDetectionValidator()
rule = SigmaRule.from_yaml(
Expand Down Expand Up @@ -136,6 +138,11 @@ def test_validator_them_condition_with_multiple_detection():
assert validator.validate(rule) == []


def test_validator_them_condition_correlation_rule(correlation_rule):
validator = ThemConditionWithSingleDetectionValidator()
assert validator.validate(correlation_rule) == []


def test_validator_all_of_them():
validator = AllOfThemConditionValidator()
rule = SigmaRule.from_yaml(
Expand Down Expand Up @@ -172,3 +179,8 @@ def test_validator_all_of_them_valid():
"""
)
assert validator.validate(rule) == []


def test_validator_all_of_them_correlation_rule(correlation_rule):
validator = AllOfThemConditionValidator()
assert validator.validate(correlation_rule) == []
8 changes: 6 additions & 2 deletions tests/test_validators_modifiers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from uuid import UUID
from wsgiref.validate import validator

import pytest
from sigma.exceptions import SigmaValueError
from sigma.rule import SigmaDetectionItem, SigmaLogSource, SigmaRule
from sigma.types import SigmaString
from .test_correlations import correlation_rule


from sigma.modifiers import (
Expand Down Expand Up @@ -107,6 +106,11 @@ def test_validator_base64offset_without_contains_modifier():
]


def test_validator_invalid_modifier_combination_correlation_rule(correlation_rule):
validator = InvalidModifierCombinationsValidator()
assert validator.validate(correlation_rule) == []


def test_validator_base64offset_after_contains_modifier():
with pytest.raises(SigmaValueError, match="strings with wildcards"):
rule = SigmaRule.from_yaml(
Expand Down

0 comments on commit fb78999

Please sign in to comment.