Skip to content

Commit

Permalink
Merge pull request #322 from m4dh4t/feat-re-expand
Browse files Browse the repository at this point in the history
feat(placeholders): allow regex valuelist transformation
  • Loading branch information
thomaspatzke authored Feb 25, 2025
2 parents f9d2f18 + f056909 commit 286344f
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 24 deletions.
5 changes: 0 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,3 @@ repos:
rev: 24.4.2
hooks:
- id: black
# It is recommended to specify the latest version of Python
# supported by your project here, or alternatively use
# pre-commit's default_language_version, see
# https://pre-commit.com/#top_level-default_language_version
language_version: python3.11
23 changes: 14 additions & 9 deletions sigma/modifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,11 @@ def modify(
if not val.endswith(SpecialChars.WILDCARD_MULTI):
val += SpecialChars.WILDCARD_MULTI
elif isinstance(val, SigmaRegularExpression):
if val.regexp[:2] != ".*" and val.regexp[0] != "^":
val.regexp = ".*" + val.regexp
if val.regexp[-2:] != ".*" and val.regexp[-1] != "$":
val.regexp = val.regexp + ".*"
regexp_str = str(val.regexp)
if regexp_str[:2] != ".*" and regexp_str[0] != "^":
val.regexp = SigmaString(".") + SpecialChars.WILDCARD_MULTI + val.regexp
if regexp_str[-2:] != ".*" and regexp_str[-1] != "$":
val.regexp += SigmaString(".") + SpecialChars.WILDCARD_MULTI
val.compile()
elif isinstance(val, SigmaFieldReference):
val.starts_with = True
Expand All @@ -149,8 +150,9 @@ def modify(
if not val.endswith(SpecialChars.WILDCARD_MULTI):
val += SpecialChars.WILDCARD_MULTI
elif isinstance(val, SigmaRegularExpression):
if val.regexp[-2:] != ".*" and val.regexp[-1] != "$":
val.regexp = val.regexp + ".*"
regexp_str = str(val.regexp)
if regexp_str[-2:] != ".*" and regexp_str[-1] != "$":
val.regexp += SigmaString(".") + SpecialChars.WILDCARD_MULTI
val.compile()
elif isinstance(val, SigmaFieldReference):
val.starts_with = True
Expand All @@ -167,8 +169,9 @@ def modify(
if not val.startswith(SpecialChars.WILDCARD_MULTI):
val = SpecialChars.WILDCARD_MULTI + val
elif isinstance(val, SigmaRegularExpression):
if val.regexp[:2] != ".*" and val.regexp[0] != "^":
val.regexp = ".*" + val.regexp
regexp_str = str(val.regexp)
if regexp_str[:2] != ".*" and regexp_str[0] != "^":
val.regexp = SigmaString(".") + SpecialChars.WILDCARD_MULTI + val.regexp
val.compile()
elif isinstance(val, SigmaFieldReference):
val.ends_with = True
Expand Down Expand Up @@ -391,7 +394,9 @@ class SigmaExpandModifier(SigmaValueModifier):
specific list item or lookup by the processing pipeline.
"""

def modify(self, val: SigmaString) -> SigmaString:
def modify(
self, val: Union[SigmaString, SigmaRegularExpression]
) -> Union[SigmaString, SigmaRegularExpression]:
return val.insert_placeholders()


Expand Down
7 changes: 5 additions & 2 deletions sigma/processing/transformations/placeholder.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from sigma.types import (
Placeholder,
SigmaString,
SigmaRegularExpression,
SpecialChars,
SigmaQueryExpression,
)
Expand Down Expand Up @@ -54,8 +55,10 @@ def __post_init__(self):
super().__post_init__()

def apply_value(
self, field: str, val: SigmaString
) -> Union[SigmaString, Iterable[SigmaString]]:
self, field: str, val: Union[SigmaString, SigmaRegularExpression]
) -> Union[
SigmaString, Iterable[SigmaString], SigmaRegularExpression, Iterable[SigmaRegularExpression]
]:
if val.contains_placeholder(self.include, self.exclude):
return val.replace_placeholders(self.placeholder_replacements_base)
else:
Expand Down
47 changes: 40 additions & 7 deletions sigma/types.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re
from abc import ABC
from dataclasses import dataclass, field
from dataclasses import dataclass, field, InitVar
from enum import Enum, auto
from ipaddress import IPv4Network, IPv6Network, ip_network
from math import inf
Expand Down Expand Up @@ -693,7 +693,8 @@ class SigmaRegularExpressionFlag(Enum):
class SigmaRegularExpression(SigmaType):
"""Regular expression type"""

regexp: str
regexp: SigmaString = field(init=False)
regexp_init: InitVar[Union[SigmaString, str]]
flags: Set[SigmaRegularExpressionFlag] = field(default_factory=set)
sigma_to_python_flags: ClassVar[Dict[SigmaRegularExpressionFlag, re.RegexFlag]] = {
SigmaRegularExpressionFlag.IGNORECASE: re.IGNORECASE,
Expand All @@ -706,7 +707,14 @@ class SigmaRegularExpression(SigmaType):
SigmaRegularExpressionFlag.DOTALL: "s",
}

def __post_init__(self):
def __post_init__(
self,
regexp_init: Union[str, SigmaString],
):
if isinstance(regexp_init, str):
regexp_init = SigmaString(regexp_init)

self.regexp = regexp_init
self.compile()

def add_flag(self, flag: SigmaRegularExpressionFlag):
Expand All @@ -718,10 +726,10 @@ def compile(self):
flags = 0
for flag in self.flags:
flags |= self.sigma_to_python_flags[flag]
re.compile(self.regexp, flags)
re.compile(self.escape(), flags)
except re.error as e:
raise SigmaRegularExpressionError(
f"Regular expression '{self.regexp}' is invalid: {str(e)}"
f"Regular expression '{self.escape()}' is invalid: {str(e)}"
) from e

def escape(
Expand All @@ -741,9 +749,10 @@ def escape(
if e is not None
]
)
regexp_str = str(self.regexp)
pos = (
[ # determine positions of matches in regular expression
m.start() for m in re.finditer(r, self.regexp)
m.start() for m in re.finditer(r, regexp_str)
]
if r != ""
else []
Expand All @@ -758,7 +767,31 @@ def escape(
else:
prefix = ""

return prefix + escape_char.join([self.regexp[i:j] for i, j in ranges])
return prefix + escape_char.join([regexp_str[i:j] for i, j in ranges])

def contains_placeholder(
self, include: Optional[List[str]] = None, exclude: Optional[List[str]] = None
) -> bool:
return self.regexp.contains_placeholder(include, exclude)

def insert_placeholders(self) -> "SigmaRegularExpression":
"""
Replace %something% placeholders with Placeholder stub objects that can be later handled by the processing
pipeline. This implements the expand modifier.
"""
self.regexp = self.regexp.insert_placeholders()
return self

def replace_placeholders(
self, callback: Callable[[Placeholder], Iterator[Union[str, SpecialChars, Placeholder]]]
) -> List["SigmaRegularExpression"]:
"""
Replace all occurrences of string part matching regular expression with placeholder.
"""
return [
SigmaRegularExpression(str(sigmastr), self.flags)
for sigmastr in self.regexp.replace_placeholders(callback)
]


@dataclass
Expand Down
53 changes: 53 additions & 0 deletions tests/test_conversion_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
FieldMappingTransformation,
QueryExpressionPlaceholderTransformation,
SetStateTransformation,
ValueListPlaceholderTransformation,
)
from sigma.exceptions import SigmaPlaceholderError, SigmaTypeError, SigmaValueError
import pytest
Expand Down Expand Up @@ -1316,6 +1317,58 @@ def test_convert_value_regex_unbound_not_escaped_escape(test_backend):
)


def test_convert_value_regex_value_list():
pipeline = ProcessingPipeline(
[ProcessingItem(ValueListPlaceholderTransformation(["test"]))],
vars={"test": ["pat.*tern/foobar", "pat.*te\\rn/foobar"]},
)
backend = TextQueryTestBackend(pipeline)
assert (
backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
field|re|expand: "%test%"
condition: sel
"""
)
)
== ["field=/pat.*tern\\/foo\\bar/ or field=/pat.*te\\\\rn\\/foo\\bar/"]
)


def test_convert_value_regex_value_list_endswith():
pipeline = ProcessingPipeline(
[ProcessingItem(ValueListPlaceholderTransformation(["test"]))],
vars={"test": ["pat.*tern/foobar", "pat.*te\\rn/foobar"]},
)
backend = TextQueryTestBackend(pipeline)
assert (
backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
field|re|expand|endswith: "%test%"
condition: sel
"""
)
)
== ["field=/.*pat.*tern\\/foo\\bar/ or field=/.*pat.*te\\\\rn\\/foo\\bar/"]
)


def test_convert_value_cidr_wildcard_native_ipv4(test_backend):
assert (
test_backend.convert(
Expand Down
10 changes: 10 additions & 0 deletions tests/test_modifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,16 @@ def test_expand(dummy_detection_item):
)


def test_expand_re(dummy_detection_item):
assert SigmaExpandModifier(dummy_detection_item, []).modify(
SigmaRegularExpression("test%var%test")
).regexp.s == (
"test",
Placeholder("var"),
"test",
)


def test_cidr(dummy_detection_item):
assert SigmaCIDRModifier(dummy_detection_item, []).modify(
SigmaString("192.168.1.0/24")
Expand Down
43 changes: 42 additions & 1 deletion tests/test_processing_transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
SigmaTransformationError,
SigmaValueError,
)
from sigma.modifiers import SigmaExpandModifier
from sigma.modifiers import SigmaExpandModifier, SigmaRegularExpressionModifier
from sigma.processing.conditions import (
FieldNameProcessingItemAppliedCondition,
IncludeFieldCondition,
Expand Down Expand Up @@ -197,6 +197,24 @@ def sigma_rule_placeholders_simple():
)


@pytest.fixture
def sigma_rule_placeholders_simple_re():
return SigmaRule.from_dict(
{
"title": "Test",
"logsource": {"category": "test"},
"detection": {
"test": [
{
"field|re|expand": "value%var1%test%var2%end",
}
],
"condition": "test",
},
}
)


@pytest.fixture
def sigma_rule_placeholders_only():
return SigmaRule.from_dict(
Expand Down Expand Up @@ -1019,6 +1037,29 @@ def test_valuelist_placeholders(sigma_rule_placeholders_simple: SigmaRule):
)


def test_valuelist_placeholders_re(sigma_rule_placeholders_simple_re: SigmaRule):
transformation = ValueListPlaceholderTransformation()
pipeline = ProcessingPipeline(vars={"var1": ["val1", 123], "var2": "val3*"})
transformation.set_pipeline(pipeline)
transformation.apply(sigma_rule_placeholders_simple_re)
assert sigma_rule_placeholders_simple_re.detection.detections["test"] == SigmaDetection(
[
SigmaDetection(
[
SigmaDetectionItem(
"field",
[SigmaRegularExpressionModifier, SigmaExpandModifier],
[
SigmaString("valueval1testval3*end"),
SigmaString("value123testval3*end"),
],
),
]
)
]
)


def test_valuelist_placeholders_correlation_rule(sigma_correlation_rule, dummy_pipeline):
orig_correlation_rule = deepcopy(sigma_correlation_rule)
transformation = ValueListPlaceholderTransformation()
Expand Down

0 comments on commit 286344f

Please sign in to comment.