Skip to content

Commit

Permalink
Merge pull request #250 from frack113:kargs_V2
Browse files Browse the repository at this point in the history
Some update to the validators
  • Loading branch information
thomaspatzke authored Aug 10, 2024
2 parents 8863268 + 72983c2 commit f7252a5
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 85 deletions.
24 changes: 24 additions & 0 deletions docs/Rule_Validation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,19 @@ applied to the rule. Example:
This exclusion defines that the *wildcards_instead_of_modifiers* validator check is disabled for the
rule with the identifier *5013332f-8a70-4e04-bcc1-06a98a2cca2e*.

Configuration
-------------

Validator checks that accept parameters can be configured with a dictionary that is passed as the
*config* parameter. This dictionary maps validator identifiers to dictionaries of parameter-value
pairs that are passed as keyword arguments to the validator constructor. Example:

.. code-block:: yaml
config:
description_length:
min_length: 100
Validator Checks
****************

Expand Down Expand Up @@ -148,6 +161,17 @@ desired rule part and takes care of the proper iteration of these parts. These c
* :py:class:`sigma.validators.base.SigmaTagValueValidator` for checking all tags appearing beloe the
*tags* attribute of a Sigma rule.

Parametrization of Checks
=========================

If required, checks can be parametrized by passing parameters as keyword arguments to the validator
check constructor. for this purpose, the validator check class must be a *frozen dataclass*. This
can be achieved by decorating the class with `@dataclass(frozen=True)` from the *dataclasses*
module.

The parameters can then be specified as dataclass members. The `SigmaValidator` instance will pass
the parameters to the validator check constructor as keyword arguments.

Base Classes
============

Expand Down
24 changes: 20 additions & 4 deletions sigma/validation.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from collections import defaultdict
from typing import Callable, DefaultDict, Dict, Iterable, Iterator, List, Set, Type
from typing import DefaultDict, Dict, Iterable, Iterator, List, Set, Type, Union
from uuid import UUID
from sigma.collection import SigmaCollection
from sigma.exceptions import SigmaConfigurationError
from sigma.rule import SigmaRule
from sigma.validators.base import SigmaRuleValidator, SigmaValidationIssue
Expand All @@ -20,13 +19,17 @@ class SigmaValidator:

validators: Set[SigmaRuleValidator]
exclusions: DefaultDict[UUID, Set[Type[SigmaRuleValidator]]]
config: Dict[str, Dict[str, Union[str, int, float, bool]]]

def __init__(
self,
validators: Iterable[Type[SigmaRuleValidator]],
exclusions: Dict[UUID, Set[SigmaRuleValidator]] = dict(),
config: Dict[str, Dict[str, Union[str, int, float, bool]]] = dict(),
):
self.validators = {validator() for validator in validators}
self.validators = {
validator(**config.get(validator.__name__, {})) for validator in validators
}
self.exclusions = defaultdict(set, exclusions)

@classmethod
Expand All @@ -39,6 +42,8 @@ def from_dict(cls, d: Dict, validators: Dict[str, SigmaRuleValidator]) -> "Sigma
represents all known validators.
* exclusion: a map between rule ids and lists of validator names or a single validator name
to define validation exclusions.
* config: a map between validator names and configuration dicts that are passed as
keyword arguments to the validator constructor.
:param d: Definition of the SigmaValidator.
:type d: Dict
Expand Down Expand Up @@ -84,7 +89,18 @@ def from_dict(cls, d: Dict, validators: Dict[str, SigmaRuleValidator]) -> "Sigma
except KeyError as e:
raise SigmaConfigurationError(f"Unknown validator '{ e.args[0] }'")

return cls(validator_classes, exclusions)
# Build configuration dict
configuration = dict()
for validator_name, params in d.get("config", {}).items():
if validator_name not in validators:
raise SigmaConfigurationError(f"Unknown validator '{ validator_name }'")
if not isinstance(params, dict):
raise SigmaConfigurationError(
f"Configuration for validator '{ validator_name }' is not a dict."
)
configuration[validators[validator_name].__name__] = params

return cls(validator_classes, exclusions, configuration)

@classmethod
def from_yaml(
Expand Down
17 changes: 12 additions & 5 deletions sigma/validators/core/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,20 +193,24 @@ def finalize(self) -> List[SigmaValidationIssue]:


@dataclass
class FilenameLenghIssue(SigmaValidationIssue):
class FilenameLengthIssue(SigmaValidationIssue):
description: ClassVar[str] = "Rule filename is too short or long"
severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.HIGH
filename: str


class FilenameLenghValidator(SigmaRuleValidator):
@dataclass(frozen=True)
class FilenameLengthValidator(SigmaRuleValidator):
"""Check rule filename lengh"""

min_size: int = 10
max_size: int = 90

def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:
if rule.source is not None:
filename = rule.source.path.name
if len(filename) < 10 or len(filename) > 90:
return [FilenameLenghIssue(rule, filename)]
if len(filename) < self.min_size or len(filename) > self.max_size:
return [FilenameLengthIssue(rule, filename)]
return []


Expand Down Expand Up @@ -258,11 +262,14 @@ class DescriptionLengthIssue(SigmaValidationIssue):
severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM


@dataclass(frozen=True)
class DescriptionLengthValidator(SigmaRuleValidator):
"""Checks if rule has a description."""

min_length: int = 16

def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:
if rule.description is not None and len(rule.description) < 16:
if rule.description is not None and len(rule.description) < self.min_length:
return [DescriptionLengthIssue([rule])]
else:
return []
Expand Down
84 changes: 30 additions & 54 deletions sigma/validators/core/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,87 +102,63 @@ def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:


@dataclass
class InvalidCVETagIssue(SigmaValidationIssue):
description: ClassVar[str] = "Invalid CVE tagging"
class InvalidNamespaceTagIssue(SigmaValidationIssue):
description: ClassVar[str] = "Invalid tagging namespace"
severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM
tag: SigmaRuleTag


class CVETagValidator(SigmaTagValidator):
"""Validate rule CVE tag"""
class NamespaceTagValidator(SigmaTagValidator):
"""Validate rule tag namespace"""

allowed_namespace = {"attack", "car", "stp", "cve", "tlp", "detection"}

def validate_tag(self, tag: SigmaRuleTag) -> List[SigmaValidationIssue]:
tags_pattern = re.compile(r"\d+\-\d+$")
if tag.namespace == "cve" and tags_pattern.match(tag.name) is None:
return [InvalidCVETagIssue([self.rule], tag)]
if tag.namespace not in self.allowed_namespace:
return [InvalidNamespaceTagIssue([self.rule], tag)]
return []


@dataclass
class InvalidDetectionTagIssue(SigmaValidationIssue):
description: ClassVar[str] = "Invalid detection tagging"
class InvalidPatternTagIssue(SigmaValidationIssue):
description: ClassVar[str] = "The tag is using an invalid pattern"
severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM
tag: SigmaRuleTag


class DetectionTagValidator(SigmaTagValidator):
"""Validate rule detection tag"""

allowed_tags = {"dfir", "emerging-threats", "threat-hunting"}
class TagPatternValidatorBase(SigmaTagValidator):
"""Base class for tag pattern validation"""

def validate_tag(self, tag: SigmaRuleTag) -> List[SigmaValidationIssue]:
if tag.namespace == "detection" and tag.name not in self.allowed_tags:
return [InvalidDetectionTagIssue([self.rule], tag)]
tags_pattern = re.compile(self.pattern)
if tag.namespace == self.namespace and tags_pattern.match(tag.name) is None:
return [InvalidPatternTagIssue([self.rule], tag)]
return []


@dataclass
class InvalidCARTagIssue(SigmaValidationIssue):
description: ClassVar[str] = "Invalid CAR tagging"
severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM
tag: SigmaRuleTag


class CARTagValidator(SigmaTagValidator):
class CARTagValidator(TagPatternValidatorBase):
"""Validate rule CAR tag"""

def validate_tag(self, tag: SigmaRuleTag) -> List[SigmaValidationIssue]:
tags_pattern = re.compile(r"\d{4}-\d{2}-\d{3}$")
if tag.namespace == "car" and tags_pattern.match(tag.name) is None:
return [InvalidCARTagIssue([self.rule], tag)]
return []
namespace = "car"
pattern = r"\d{4}-\d{2}-\d{3}$"


@dataclass
class InvalidSTPTagIssue(SigmaValidationIssue):
description: ClassVar[str] = "Invalid STP tagging"
severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM
tag: SigmaRuleTag

class CVETagValidator(TagPatternValidatorBase):
"""Validate rule CVE tag"""

class STPTagValidator(SigmaTagValidator):
"""Validate rule STP tag"""
namespace = "cve"
pattern = r"^\d+-\d+$"

def validate_tag(self, tag: SigmaRuleTag) -> List[SigmaValidationIssue]:
tags_pattern = re.compile(r"^[1-5]{1}[auk]{0,1}$")
if tag.namespace == "stp" and tags_pattern.match(tag.name) is None:
return [InvalidSTPTagIssue([self.rule], tag)]
return []

class DetectionTagValidator(TagPatternValidatorBase):
"""Validate rule detection tag"""

@dataclass
class InvalidNamespaceTagIssue(SigmaValidationIssue):
description: ClassVar[str] = "Invalid tagging name"
severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM
tag: SigmaRuleTag

namespace = "detection"
pattern = r"dfir|emerging-threats|threat-hunting"

class NamespaceTagValidator(SigmaTagValidator):
"""Validate rule tag name"""

allowed_namespace = {"attack", "car", "stp", "cve", "tlp", "detection"}
class STPTagValidator(TagPatternValidatorBase):
"""Validate rule STP tag"""

def validate_tag(self, tag: SigmaRuleTag) -> List[SigmaValidationIssue]:
if tag.namespace not in self.allowed_namespace:
return [InvalidNamespaceTagIssue([self.rule], tag)]
return []
namespace = "stp"
pattern = r"^[1-5]{1}[auk]{0,1}$"
19 changes: 19 additions & 0 deletions tests/test_validation.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dataclasses
from uuid import UUID
import pytest
from sigma.exceptions import SigmaConfigurationError
Expand All @@ -9,6 +10,7 @@
from tests.test_validators import rule_with_id, rule_without_id, rules_with_id_collision
from sigma.collection import SigmaCollection
from sigma.validators.core.metadata import (
DescriptionLengthValidator,
IdentifierExistenceValidator,
IdentifierUniquenessValidator,
IdentifierExistenceIssue,
Expand All @@ -22,6 +24,13 @@ def validators():
return InstalledSigmaPlugins.autodiscover().validators


@pytest.mark.parametrize("validator", InstalledSigmaPlugins.autodiscover().validators.values())
def test_parametrized_validators_are_frozen(validator):
assert not dataclasses.is_dataclass(validator) or (
dataclasses.is_dataclass(validator) and validator.__dataclass_params__.frozen
)


def test_sigmavalidator_validate_rules(rule_with_id, rule_without_id, rules_with_id_collision):
rules = SigmaCollection([rule_with_id, rule_without_id, *rules_with_id_collision])
validator = SigmaValidator({IdentifierExistenceValidator, IdentifierUniquenessValidator})
Expand Down Expand Up @@ -72,11 +81,17 @@ def test_sigmavalidator_from_dict(validators):
"number_as_string",
],
},
"config": {
"description_length": {
"min_length": 100,
},
},
},
validators,
)
assert DanglingDetectionValidator in (v.__class__ for v in validator.validators)
assert TLPv1TagValidator not in (v.__class__ for v in validator.validators)
assert DescriptionLengthValidator(min_length=100) in validator.validators
assert len(validator.validators) >= 10
assert validator.exclusions == {
UUID("c702c6c7-1393-40e5-93f8-91469f3445ad"): {DanglingDetectionValidator},
Expand All @@ -99,11 +114,15 @@ def test_sigmavalidator_from_yaml(validators):
bf39335e-e666-4eaf-9416-47f1955b5fb3:
- attacktag
- number_as_string
config:
description_length:
min_length: 100
""",
validators,
)
assert DanglingDetectionValidator in (v.__class__ for v in validator.validators)
assert TLPv1TagValidator not in (v.__class__ for v in validator.validators)
assert DescriptionLengthValidator(min_length=100) in validator.validators
assert len(validator.validators) >= 10
assert validator.exclusions == {
UUID("c702c6c7-1393-40e5-93f8-91469f3445ad"): {DanglingDetectionValidator},
Expand Down
40 changes: 31 additions & 9 deletions tests/test_validators_metadata.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from uuid import UUID
from wsgiref.validate import validator

import pytest
from sigma.rule import SigmaRule
from sigma.types import SigmaString
from sigma.collection import SigmaCollection

from sigma.validators.core.metadata import (
Expand All @@ -23,8 +21,8 @@
DateExistenceIssue,
DuplicateFilenameValidator,
DuplicateFilenameIssue,
FilenameLenghValidator,
FilenameLenghIssue,
FilenameLengthValidator,
FilenameLengthIssue,
CustomAttributesValidator,
CustomAttributesIssue,
DescriptionExistenceValidator,
Expand Down Expand Up @@ -324,15 +322,22 @@ def test_validator_duplicate_filename_multiple_rules_in_one_file():
assert validator.finalize() == []


def test_validator_filename_lengh():
validator = FilenameLenghValidator()
def test_validator_filename_length():
validator = FilenameLengthValidator()
sigma_collection = SigmaCollection.load_ruleset(["tests/files/rule_filename_errors"])
rule = sigma_collection[0]
assert validator.validate(rule) == [FilenameLenghIssue([rule], "Name.yml")]
assert validator.validate(rule) == [FilenameLengthIssue([rule], "Name.yml")]


def test_validator_filename_lengh_valid():
validator = FilenameLenghValidator()
def test_validator_filename_length_customized_valid():
validator = FilenameLengthValidator(min_size=0, max_size=999)
sigma_collection = SigmaCollection.load_ruleset(["tests/files/rule_filename_errors"])
rule = sigma_collection[0]
assert validator.validate(rule) == []


def test_validator_filename_length_valid():
validator = FilenameLengthValidator()
sigma_collection = SigmaCollection.load_ruleset(["tests/files/rule_valid"])
rule = sigma_collection[0]
assert validator.validate(rule) == []
Expand Down Expand Up @@ -440,6 +445,23 @@ def test_validator_description_length_valid():
assert validator.validate(rule) == []


def test_validator_description_length_valid_customized():
validator = DescriptionLengthValidator(min_length=999)
rule = SigmaRule.from_yaml(
"""
title: Test
description: it is a simple description
logsource:
category: test
detection:
sel:
field: value
condition: sel
"""
)
assert validator.validate(rule) == [DescriptionLengthIssue([rule])]


def test_validator_level_existence():
validator = LevelExistenceValidator()
rule = SigmaRule.from_yaml(
Expand Down
Loading

0 comments on commit f7252a5

Please sign in to comment.