From b81d7dca8cdf3c4f30157b3d8392242a76ec7556 Mon Sep 17 00:00:00 2001 From: Thomas Patzke Date: Sun, 2 Mar 2025 01:38:57 +0100 Subject: [PATCH] WIP --- sigma/collection.py | 48 ++++++++++++++++++--------------- sigma/conditions.py | 58 ++++++++++++++++++++++++++++------------ sigma/conversion/base.py | 21 ++++++++------- 3 files changed, 78 insertions(+), 49 deletions(-) diff --git a/sigma/collection.py b/sigma/collection.py index 36985ba..7b065a7 100644 --- a/sigma/collection.py +++ b/sigma/collection.py @@ -1,7 +1,7 @@ from dataclasses import dataclass, field from functools import reduce from pathlib import Path -from typing import Callable, Dict, Iterable, List, Optional, Union, IO, cast +from typing import Any, Callable, Dict, Iterable, List, Optional, Union, IO, cast from uuid import UUID import yaml @@ -24,7 +24,7 @@ class SigmaCollection: """Collection of Sigma rules""" - rules: List[Union[SigmaRule, SigmaCorrelationRule]] + rules: List[Union[SigmaRule, SigmaCorrelationRule, SigmaFilter]] errors: List[SigmaError] = field(default_factory=list) ids_to_rules: Dict[UUID, SigmaRuleBase] = field( init=False, repr=False, hash=False, compare=False @@ -58,9 +58,7 @@ def resolve_rule_references(self) -> None: rule.resolve_rule_references(self) # Extract all filters from the rules - filters: List[SigmaFilter] = [ - cast(SigmaFilter, rule) for rule in self.rules if isinstance(rule, SigmaFilter) - ] + filters: List[SigmaFilter] = [rule for rule in self.rules if isinstance(rule, SigmaFilter)] self.rules = [rule for rule in self.rules if not isinstance(rule, SigmaFilter)] # Apply filters on each rule and replace the rule with the filtered rule @@ -93,9 +91,9 @@ def from_dicts( If the collect_errors parameters is set, exceptions are not raised while parsing but collected in the errors property individually for each Sigma rule and the whole SigmaCollection. """ - errors = [] - parsed_rules: List[SigmaRuleBase] = list() - prev_rule = None + errors: List[SigmaError] = [] + parsed_rules: List[Union[SigmaRule, SigmaCorrelationRule, SigmaFilter]] = list() + prev_rule = dict() global_rule: NestedDict = dict() for i, rule in zip(range(1, len(rules) + 1), rules): @@ -156,7 +154,7 @@ def from_dicts( @classmethod def from_yaml( cls, - yaml_str: Union[bytes, str, IO], + yaml_str: Union[bytes, str, IO[Any]], collect_errors: bool = False, source: Optional[SigmaRuleLocation] = None, ) -> "SigmaCollection": @@ -181,10 +179,10 @@ def resolve_paths( paths = ( # Normalize all inputs into paths input if isinstance(input, Path) else Path(input) for input in inputs ) - paths = ( # Recurse into directories if provided + paths_recurse = ( # Recurse into directories if provided path.glob(recursion_pattern) if path.is_dir() else (path,) for path in paths ) - return (subpath for subpaths in paths for subpath in subpaths) # Flatten the list + return (subpath for subpaths in paths_recurse for subpath in subpaths) # Flatten the list @classmethod def load_ruleset( @@ -226,17 +224,23 @@ def load_ruleset( if ( on_beforeload is not None ): # replace path with return value of on_beforeload function if provided - path = on_beforeload(path) - if path is not None: # Skip if path is None + result_path: Optional[Path] = on_beforeload(path) + else: + result_path = path + if result_path is not None: # Skip if path is None sigma_collection = SigmaCollection.from_yaml( - path.open(encoding="utf-8"), collect_errors, SigmaRuleLocation(path) + result_path.open(encoding="utf-8"), + collect_errors, + SigmaRuleLocation(result_path), ) if ( on_load is not None ): # replace SigmaCollection generated from file content with the return value from on_load function if provided - sigma_collection = on_load(path, sigma_collection) - if sigma_collection is not None: # Skip if nothing - sigma_collections.append(sigma_collection) + result_sigma_collection = on_load(result_path, sigma_collection) + else: + result_sigma_collection = sigma_collection + if result_sigma_collection is not None: # Skip if nothing + sigma_collections.append(result_sigma_collection) # Finally merge all SigmaCollection's and return the result return cls.merge(sigma_collections) @@ -245,7 +249,7 @@ def load_ruleset( def merge(cls, collections: Iterable["SigmaCollection"]) -> "SigmaCollection": """Merge multiple SigmaCollection objects into one and return it.""" return cls( - rules=[rule for collection in collections for rule in collection], + rules=[rule for collection in collections for rule in collection.rules], errors=[error for collection in collections for error in collection.errors], ) @@ -257,13 +261,13 @@ def get_unreferenced_rules(self) -> Iterable[SigmaRuleBase]: """Returns an iterator across all rules that are not referenced by any other rule""" return (rule for rule in self.rules if not rule._backreferences) - def __iter__(self): + def __iter__(self) -> Iterable[SigmaRuleBase]: return iter(self.rules) - def __len__(self): + def __len__(self) -> int: return len(self.rules) - def __getitem__(self, i: Union[int, str, UUID]): + def __getitem__(self, i: Union[int, str, UUID]) -> SigmaRuleBase: try: if isinstance(i, int): # Index by position return self.rules[i] @@ -280,7 +284,7 @@ def __getitem__(self, i: Union[int, str, UUID]): raise SigmaRuleNotFoundError(f"Rule '{ i }' not found in rule collection") -def deep_dict_update(dest: SigmaRule, src: SigmaRule) -> "SigmaRule": +def deep_dict_update(dest: Dict[Any, Any], src: Dict[Any, Any]) -> Dict[Any, Any]: for k, v in src.items(): if isinstance(v, dict): dest[k] = deep_dict_update(dest.get(k, {}), v) diff --git a/sigma/conditions.py b/sigma/conditions.py index 5661ec3..baef315 100644 --- a/sigma/conditions.py +++ b/sigma/conditions.py @@ -11,10 +11,11 @@ ParseResults, ParseException, ) -from typing import ClassVar, List, Literal, Optional, Union, Type +from typing import ClassVar, List, Literal, Optional, Union, Type, cast from sigma.types import SigmaType from sigma.exceptions import SigmaConditionError, SigmaRuleLocation import sigma +import sigma.rule @dataclass @@ -50,17 +51,17 @@ def postprocess( detections: "sigma.rule.SigmaDetections", parent: Optional["ConditionItem"] = None, source: Optional[SigmaRuleLocation] = None, - ) -> "ConditionItem": + ) -> "ConditionItem | ConditionFieldEqualsValueExpression | ConditionValueExpression": """ Minimal default postprocessing implementation for classes which don't bring their own postprocess method. Just sets the parent and source property. """ self.parent = parent try: - self.source = source or self.source + self.source: Optional[SigmaRuleLocation] = source or self.source except AttributeError: self.source = None - return self + return cast("ConditionItem", self) @dataclass @@ -71,6 +72,7 @@ class ConditionItem(ParentChainMixin, ABC): ) args: List[ Union[ + "ConditionIdentifier", "ConditionItem", "ConditionFieldEqualsValueExpression", "ConditionValueExpression", @@ -79,17 +81,19 @@ class ConditionItem(ParentChainMixin, ABC): source: Optional[SigmaRuleLocation] = field(default=None, compare=False) @classmethod - def from_parsed(cls, s: str, l: int, t: Union[ParseResults, list]) -> List["ConditionItem"]: + def from_parsed( + cls, s: str, l: int, t: Union[ParseResults, List["ConditionItem"]] + ) -> List["ConditionItem"]: """Create condition object from parse result""" if cls.arg_count == 1: if cls.token_list: args = [t[0]] - else: + elif isinstance(t, ParseResults): args = [t[0][-1]] elif cls.arg_count > 1: if cls.token_list: args = t[0::2] - else: + elif isinstance(t, ParseResults): args = t[0][0::2] else: # pragma: no cover args = list() # this case can only happen if broken classes are defined @@ -100,7 +104,7 @@ def postprocess( detections: "sigma.rule.SigmaDetections", parent: Optional["ConditionItem"] = None, source: Optional[SigmaRuleLocation] = None, - ) -> "ConditionItem": + ) -> "ConditionItem | ConditionFieldEqualsValueExpression | ConditionValueExpression": """ Postprocess condition parse tree after initial parsing. In this stage the detections are available, this allows to resolve references to detections into concrete conditions. @@ -144,11 +148,12 @@ class ConditionNOT(ConditionItem): @dataclass class ConditionIdentifier(ConditionItem): + args: List[str] # type: ignore arg_count: ClassVar[int] = 1 token_list: ClassVar[bool] = True identifier: str = field(init=False) - def __post_init__(self): + def __post_init__(self) -> None: self.identifier = self.args[0] def postprocess( @@ -171,12 +176,13 @@ def postprocess( @dataclass class ConditionSelector(ConditionItem): + args: List[str] # type: ignore arg_count: ClassVar[int] = 2 token_list: ClassVar[bool] = True - cond_class: Union[ConditionAND, ConditionOR] = field(init=False) + cond_class: Union[type[ConditionAND], type[ConditionOR]] = field(init=False) pattern: str = field(init=False) - def __post_init__(self): + def __post_init__(self) -> None: if self.args[0] in ["1", "any"]: self.cond_class = ConditionOR elif self.args[0] == "all": @@ -185,7 +191,9 @@ def __post_init__(self): raise SigmaConditionError("Invalid quantifier in selector", source=self.source) self.pattern = self.args[1] - def resolve_referenced_detections(self, detections: "sigma.rule.SigmaDetections") -> List[str]: + def resolve_referenced_detections( + self, detections: "sigma.rule.SigmaDetections" + ) -> List[ConditionIdentifier]: """ Resolve all detection identifiers referenced by the selector. """ @@ -205,12 +213,24 @@ def postprocess( detections: "sigma.rule.SigmaDetections", parent: Optional["ConditionItem"] = None, source: Optional[SigmaRuleLocation] = None, - ) -> Union[ConditionAND, ConditionOR]: + ) -> Union[ConditionItem, "ConditionFieldEqualsValueExpression", "ConditionValueExpression"]: """Converts selector into an AND or OR condition""" self.parent = parent ids = self.resolve_referenced_detections(detections) - cond = self.cond_class(ids) + cond = self.cond_class( + cast( + List[ + Union[ + ConditionIdentifier, + ConditionItem, + ConditionFieldEqualsValueExpression, + ConditionValueExpression, + ] + ], + ids, + ) + ) return cond.postprocess(detections, parent, source) @@ -254,7 +274,9 @@ class SigmaCondition(ProcessingItemTrackingMixin): detections: "sigma.rule.SigmaDetections" source: Optional[SigmaRuleLocation] = field(default=None, compare=False) - def parse(self, postprocess: bool = True): + def parse( + self, postprocess: bool = True + ) -> Union[ConditionItem, ConditionFieldEqualsValueExpression, ConditionValueExpression]: """ Parse condition and return parse tree (no postprocessing) or condition tree (postprocessed). @@ -268,7 +290,7 @@ def parse(self, postprocess: bool = True): "The pipe syntax in Sigma conditions has been deprecated and replaced by Sigma correlations. pySigma doesn't supports this syntax." ) try: - parsed = condition.parseString(self.condition, parse_all=True)[0] + parsed = cast(ConditionItem, condition.parseString(self.condition, parse_all=True)[0]) if postprocess: return parsed.postprocess(self.detections, source=self.source) else: @@ -277,7 +299,9 @@ def parse(self, postprocess: bool = True): raise SigmaConditionError(str(e), source=self.source) @property - def parsed(self): + def parsed( + self, postprocess: bool = True + ) -> Union[ConditionItem, ConditionFieldEqualsValueExpression, ConditionValueExpression]: """ Parse on first access on parsed condition tree. diff --git a/sigma/conversion/base.py b/sigma/conversion/base.py index d2e9b1f..a2abdf5 100644 --- a/sigma/conversion/base.py +++ b/sigma/conversion/base.py @@ -182,17 +182,18 @@ def convert( processing. """ rule_collection.resolve_rule_references() - queries = [ - query - for rule in rule_collection.rules - for query in ( - self.convert_rule(rule, output_format or self.default_format) - if isinstance(rule, SigmaRule) - else self.convert_correlation_rule( + queries = [] + for rule in rule_collection.rules: + if isinstance(rule, SigmaRule): + for query in self.convert_rule(rule, output_format or self.default_format): + queries.append(query) + elif isinstance(rule, SigmaCorrelationRule): + for query in self.convert_correlation_rule( rule, output_format or self.default_format, correlation_method - ) - ) - ] + ): + queries.append(query) + else: + raise TypeError(f"Unexpected rule type: {type(rule)}") return self.finalize(queries, output_format or self.default_format) def convert_rule(self, rule: SigmaRule, output_format: Optional[str] = None) -> List[Any]: