Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaspatzke committed Mar 2, 2025
1 parent 39a9a7b commit b81d7dc
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 49 deletions.
48 changes: 26 additions & 22 deletions sigma/collection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dataclasses import dataclass, field
from functools import reduce
from pathlib import Path
from typing import Callable, Dict, Iterable, List, Optional, Union, IO, cast
from typing import Any, Callable, Dict, Iterable, List, Optional, Union, IO, cast
from uuid import UUID

import yaml
Expand All @@ -24,7 +24,7 @@
class SigmaCollection:
"""Collection of Sigma rules"""

rules: List[Union[SigmaRule, SigmaCorrelationRule]]
rules: List[Union[SigmaRule, SigmaCorrelationRule, SigmaFilter]]
errors: List[SigmaError] = field(default_factory=list)
ids_to_rules: Dict[UUID, SigmaRuleBase] = field(
init=False, repr=False, hash=False, compare=False
Expand Down Expand Up @@ -58,9 +58,7 @@ def resolve_rule_references(self) -> None:
rule.resolve_rule_references(self)

# Extract all filters from the rules
filters: List[SigmaFilter] = [
cast(SigmaFilter, rule) for rule in self.rules if isinstance(rule, SigmaFilter)
]
filters: List[SigmaFilter] = [rule for rule in self.rules if isinstance(rule, SigmaFilter)]
self.rules = [rule for rule in self.rules if not isinstance(rule, SigmaFilter)]

# Apply filters on each rule and replace the rule with the filtered rule
Expand Down Expand Up @@ -93,9 +91,9 @@ def from_dicts(
If the collect_errors parameters is set, exceptions are not raised while parsing but collected
in the errors property individually for each Sigma rule and the whole SigmaCollection.
"""
errors = []
parsed_rules: List[SigmaRuleBase] = list()
prev_rule = None
errors: List[SigmaError] = []
parsed_rules: List[Union[SigmaRule, SigmaCorrelationRule, SigmaFilter]] = list()
prev_rule = dict()
global_rule: NestedDict = dict()

for i, rule in zip(range(1, len(rules) + 1), rules):
Expand Down Expand Up @@ -156,7 +154,7 @@ def from_dicts(
@classmethod
def from_yaml(
cls,
yaml_str: Union[bytes, str, IO],
yaml_str: Union[bytes, str, IO[Any]],
collect_errors: bool = False,
source: Optional[SigmaRuleLocation] = None,
) -> "SigmaCollection":
Expand All @@ -181,10 +179,10 @@ def resolve_paths(
paths = ( # Normalize all inputs into paths
input if isinstance(input, Path) else Path(input) for input in inputs
)
paths = ( # Recurse into directories if provided
paths_recurse = ( # Recurse into directories if provided
path.glob(recursion_pattern) if path.is_dir() else (path,) for path in paths
)
return (subpath for subpaths in paths for subpath in subpaths) # Flatten the list
return (subpath for subpaths in paths_recurse for subpath in subpaths) # Flatten the list

@classmethod
def load_ruleset(
Expand Down Expand Up @@ -226,17 +224,23 @@ def load_ruleset(
if (
on_beforeload is not None
): # replace path with return value of on_beforeload function if provided
path = on_beforeload(path)
if path is not None: # Skip if path is None
result_path: Optional[Path] = on_beforeload(path)
else:
result_path = path
if result_path is not None: # Skip if path is None
sigma_collection = SigmaCollection.from_yaml(
path.open(encoding="utf-8"), collect_errors, SigmaRuleLocation(path)
result_path.open(encoding="utf-8"),
collect_errors,
SigmaRuleLocation(result_path),
)
if (
on_load is not None
): # replace SigmaCollection generated from file content with the return value from on_load function if provided
sigma_collection = on_load(path, sigma_collection)
if sigma_collection is not None: # Skip if nothing
sigma_collections.append(sigma_collection)
result_sigma_collection = on_load(result_path, sigma_collection)
else:
result_sigma_collection = sigma_collection
if result_sigma_collection is not None: # Skip if nothing
sigma_collections.append(result_sigma_collection)

# Finally merge all SigmaCollection's and return the result
return cls.merge(sigma_collections)
Expand All @@ -245,7 +249,7 @@ def load_ruleset(
def merge(cls, collections: Iterable["SigmaCollection"]) -> "SigmaCollection":
"""Merge multiple SigmaCollection objects into one and return it."""
return cls(
rules=[rule for collection in collections for rule in collection],
rules=[rule for collection in collections for rule in collection.rules],
errors=[error for collection in collections for error in collection.errors],
)

Expand All @@ -257,13 +261,13 @@ def get_unreferenced_rules(self) -> Iterable[SigmaRuleBase]:
"""Returns an iterator across all rules that are not referenced by any other rule"""
return (rule for rule in self.rules if not rule._backreferences)

def __iter__(self):
def __iter__(self) -> Iterable[SigmaRuleBase]:
return iter(self.rules)

def __len__(self):
def __len__(self) -> int:
return len(self.rules)

def __getitem__(self, i: Union[int, str, UUID]):
def __getitem__(self, i: Union[int, str, UUID]) -> SigmaRuleBase:
try:
if isinstance(i, int): # Index by position
return self.rules[i]
Expand All @@ -280,7 +284,7 @@ def __getitem__(self, i: Union[int, str, UUID]):
raise SigmaRuleNotFoundError(f"Rule '{ i }' not found in rule collection")


def deep_dict_update(dest: SigmaRule, src: SigmaRule) -> "SigmaRule":
def deep_dict_update(dest: Dict[Any, Any], src: Dict[Any, Any]) -> Dict[Any, Any]:
for k, v in src.items():
if isinstance(v, dict):
dest[k] = deep_dict_update(dest.get(k, {}), v)
Expand Down
58 changes: 41 additions & 17 deletions sigma/conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
ParseResults,
ParseException,
)
from typing import ClassVar, List, Literal, Optional, Union, Type
from typing import ClassVar, List, Literal, Optional, Union, Type, cast
from sigma.types import SigmaType
from sigma.exceptions import SigmaConditionError, SigmaRuleLocation
import sigma
import sigma.rule


@dataclass
Expand Down Expand Up @@ -50,17 +51,17 @@ def postprocess(
detections: "sigma.rule.SigmaDetections",
parent: Optional["ConditionItem"] = None,
source: Optional[SigmaRuleLocation] = None,
) -> "ConditionItem":
) -> "ConditionItem | ConditionFieldEqualsValueExpression | ConditionValueExpression":
"""
Minimal default postprocessing implementation for classes which don't bring their own postprocess method.
Just sets the parent and source property.
"""
self.parent = parent
try:
self.source = source or self.source
self.source: Optional[SigmaRuleLocation] = source or self.source
except AttributeError:
self.source = None
return self
return cast("ConditionItem", self)


@dataclass
Expand All @@ -71,6 +72,7 @@ class ConditionItem(ParentChainMixin, ABC):
)
args: List[
Union[
"ConditionIdentifier",
"ConditionItem",
"ConditionFieldEqualsValueExpression",
"ConditionValueExpression",
Expand All @@ -79,17 +81,19 @@ class ConditionItem(ParentChainMixin, ABC):
source: Optional[SigmaRuleLocation] = field(default=None, compare=False)

@classmethod
def from_parsed(cls, s: str, l: int, t: Union[ParseResults, list]) -> List["ConditionItem"]:
def from_parsed(
cls, s: str, l: int, t: Union[ParseResults, List["ConditionItem"]]
) -> List["ConditionItem"]:
"""Create condition object from parse result"""
if cls.arg_count == 1:
if cls.token_list:
args = [t[0]]
else:
elif isinstance(t, ParseResults):
args = [t[0][-1]]
elif cls.arg_count > 1:
if cls.token_list:
args = t[0::2]
else:
elif isinstance(t, ParseResults):
args = t[0][0::2]
else: # pragma: no cover
args = list() # this case can only happen if broken classes are defined
Expand All @@ -100,7 +104,7 @@ def postprocess(
detections: "sigma.rule.SigmaDetections",
parent: Optional["ConditionItem"] = None,
source: Optional[SigmaRuleLocation] = None,
) -> "ConditionItem":
) -> "ConditionItem | ConditionFieldEqualsValueExpression | ConditionValueExpression":
"""
Postprocess condition parse tree after initial parsing. In this stage the detections
are available, this allows to resolve references to detections into concrete conditions.
Expand Down Expand Up @@ -144,11 +148,12 @@ class ConditionNOT(ConditionItem):

@dataclass
class ConditionIdentifier(ConditionItem):
args: List[str] # type: ignore
arg_count: ClassVar[int] = 1
token_list: ClassVar[bool] = True
identifier: str = field(init=False)

def __post_init__(self):
def __post_init__(self) -> None:
self.identifier = self.args[0]

def postprocess(
Expand All @@ -171,12 +176,13 @@ def postprocess(

@dataclass
class ConditionSelector(ConditionItem):
args: List[str] # type: ignore
arg_count: ClassVar[int] = 2
token_list: ClassVar[bool] = True
cond_class: Union[ConditionAND, ConditionOR] = field(init=False)
cond_class: Union[type[ConditionAND], type[ConditionOR]] = field(init=False)
pattern: str = field(init=False)

def __post_init__(self):
def __post_init__(self) -> None:
if self.args[0] in ["1", "any"]:
self.cond_class = ConditionOR
elif self.args[0] == "all":
Expand All @@ -185,7 +191,9 @@ def __post_init__(self):
raise SigmaConditionError("Invalid quantifier in selector", source=self.source)
self.pattern = self.args[1]

def resolve_referenced_detections(self, detections: "sigma.rule.SigmaDetections") -> List[str]:
def resolve_referenced_detections(
self, detections: "sigma.rule.SigmaDetections"
) -> List[ConditionIdentifier]:
"""
Resolve all detection identifiers referenced by the selector.
"""
Expand All @@ -205,12 +213,24 @@ def postprocess(
detections: "sigma.rule.SigmaDetections",
parent: Optional["ConditionItem"] = None,
source: Optional[SigmaRuleLocation] = None,
) -> Union[ConditionAND, ConditionOR]:
) -> Union[ConditionItem, "ConditionFieldEqualsValueExpression", "ConditionValueExpression"]:
"""Converts selector into an AND or OR condition"""
self.parent = parent

ids = self.resolve_referenced_detections(detections)
cond = self.cond_class(ids)
cond = self.cond_class(
cast(
List[
Union[
ConditionIdentifier,
ConditionItem,
ConditionFieldEqualsValueExpression,
ConditionValueExpression,
]
],
ids,
)
)
return cond.postprocess(detections, parent, source)


Expand Down Expand Up @@ -254,7 +274,9 @@ class SigmaCondition(ProcessingItemTrackingMixin):
detections: "sigma.rule.SigmaDetections"
source: Optional[SigmaRuleLocation] = field(default=None, compare=False)

def parse(self, postprocess: bool = True):
def parse(
self, postprocess: bool = True
) -> Union[ConditionItem, ConditionFieldEqualsValueExpression, ConditionValueExpression]:
"""
Parse condition and return parse tree (no postprocessing) or condition tree (postprocessed).
Expand All @@ -268,7 +290,7 @@ def parse(self, postprocess: bool = True):
"The pipe syntax in Sigma conditions has been deprecated and replaced by Sigma correlations. pySigma doesn't supports this syntax."
)
try:
parsed = condition.parseString(self.condition, parse_all=True)[0]
parsed = cast(ConditionItem, condition.parseString(self.condition, parse_all=True)[0])
if postprocess:
return parsed.postprocess(self.detections, source=self.source)
else:
Expand All @@ -277,7 +299,9 @@ def parse(self, postprocess: bool = True):
raise SigmaConditionError(str(e), source=self.source)

@property
def parsed(self):
def parsed(
self, postprocess: bool = True
) -> Union[ConditionItem, ConditionFieldEqualsValueExpression, ConditionValueExpression]:
"""
Parse on first access on parsed condition tree.
Expand Down
21 changes: 11 additions & 10 deletions sigma/conversion/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,17 +182,18 @@ def convert(
processing.
"""
rule_collection.resolve_rule_references()
queries = [
query
for rule in rule_collection.rules
for query in (
self.convert_rule(rule, output_format or self.default_format)
if isinstance(rule, SigmaRule)
else self.convert_correlation_rule(
queries = []
for rule in rule_collection.rules:
if isinstance(rule, SigmaRule):
for query in self.convert_rule(rule, output_format or self.default_format):
queries.append(query)
elif isinstance(rule, SigmaCorrelationRule):
for query in self.convert_correlation_rule(
rule, output_format or self.default_format, correlation_method
)
)
]
):
queries.append(query)
else:
raise TypeError(f"Unexpected rule type: {type(rule)}")
return self.finalize(queries, output_format or self.default_format)

def convert_rule(self, rule: SigmaRule, output_format: Optional[str] = None) -> List[Any]:
Expand Down

0 comments on commit b81d7dc

Please sign in to comment.