Skip to content

Commit

Permalink
Integration of condition expressions into processing pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaspatzke committed Aug 9, 2024
1 parent 3a470b4 commit 50ca86b
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 58 deletions.
8 changes: 0 additions & 8 deletions sigma/processing/condition_expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,4 @@ def parse_condition_expression(
raise SigmaPipelineConditionError(
condition_expression, e.column, f"Error parsing condition expression: {e.msg}"
)

refids = parsed.resolve(conditions)
if len(refids) < len(conditions):
raise SigmaPipelineConditionError(
condition_expression,
parsed.location,
f"Pipeline condition contains unreferenced condition items: {', '.join(set(conditions.keys()) - refids)}",
)
return parsed
128 changes: 93 additions & 35 deletions sigma/processing/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)

from sigma.correlations import SigmaCorrelationRule
from sigma.processing.condition_expressions import ConditionExpression
from sigma.processing.condition_expressions import ConditionExpression, parse_condition_expression
from sigma.processing.finalization import Finalizer, finalizers
from sigma.processing.postprocessing import QueryPostprocessingTransformation
from sigma.processing.tracking import FieldMappingTracking
Expand All @@ -33,7 +33,7 @@
field_name_conditions,
FieldNameProcessingCondition,
)
from sigma.exceptions import SigmaConfigurationError, SigmaTypeError
from sigma.exceptions import SigmaConfigurationError, SigmaPipelineConditionError, SigmaTypeError
import yaml

from sigma.types import SigmaFieldReference, SigmaType
Expand All @@ -59,12 +59,16 @@ def _base_args_from_dict(
) -> dict:
"""Return class instantiation parameters for attributes contained in base class for further
usage in similar methods of classes inherited from this class."""
rule_conds = cls._parse_conditions(rule_conditions, d.get("rule_conditions", list()))
rule_cond_expr_str = d.get("rule_cond_expr", None)
if rule_cond_expr_str is not None:
rule_cond_expr = parse_condition_expression(rule_cond_expr_str, rule_conds)
else:
rule_cond_expr = None
return {
"identifier": d.get("id", None),
"rule_conditions": cls._parse_conditions(
rule_conditions, d.get("rule_conditions", list())
),
"rule_condition_expression": d.get("rule_cond_expr", None),
"rule_conditions": rule_conds,
"rule_condition_expression": rule_cond_expr,
"rule_condition_linking": cls._parse_condition_linking(d, "rule_cond_op"),
"rule_condition_negation": d.get("rule_cond_not", False),
"transformation": cls._instantiate_transformation(d, transformations),
Expand Down Expand Up @@ -124,6 +128,24 @@ def __post_init__(self):
self.transformation.set_processing_item(
self
) # set processing item in transformation object after it is instantiated
self._resolve_condition_expression(
self.rule_condition_expression, self.rule_conditions, "Rule condition"
)

def _resolve_condition_expression(
self,
expr: Optional[ConditionExpression],
conditions: Dict[str, RuleProcessingCondition],
name: str,
) -> None:
if expr is not None:
refids = expr.resolve(conditions)
if len(refids) < len(conditions):
raise SigmaPipelineConditionError(
expr.expression,
expr.location,
f"{name} contains unreferenced condition items: {', '.join(set(conditions.keys()) - refids)}",
)

@classmethod
def _parse_condition(cls, condition_class_mapping, cond_def, ref):
Expand Down Expand Up @@ -198,12 +220,6 @@ def _parse_condition_linking(
}
return condition_linking.get(d.get(op_name, None))

def _parse_condition_expression(
self, condition_type: Literal["rule", "detection_item", "field_name"]
) -> Optional[str]:
condition_expr = self.__getattribute__(f"{condition_type}_cond_expr")
conditions = self.__getattribute__(f"{condition_type}_conditions")

@classmethod
def _instantiate_transformation(cls, d: dict, transformations: Dict[str, Type[Transformation]]):
try:
Expand Down Expand Up @@ -253,9 +269,12 @@ def _match_condition_expression(
def match_rule_conditions(
self, pipeline: "ProcessingPipeline", rule: Union[SigmaRule, SigmaCorrelationRule]
):
cond_result = self.rule_condition_linking(
[condition.match(pipeline, rule) for condition in self.rule_conditions]
)
if self.rule_condition_expression is not None: # rule condition expression
cond_result = self.rule_condition_expression.match(rule)
else: # simplified conditional linking of conditions
cond_result = self.rule_condition_linking(
[condition.match(pipeline, rule) for condition in self.rule_conditions]
)
if self.rule_condition_negation:
cond_result = not cond_result
return not self.rule_conditions or cond_result
Expand Down Expand Up @@ -290,20 +309,39 @@ class ProcessingItem(ProcessingItemBase):
def from_dict(cls, d: dict):
"""Instantiate processing item from parsed definition and variables."""
kwargs = super()._base_args_from_dict(d, transformations)

detection_item_conds = cls._parse_conditions(
detection_item_conditions, d.get("detection_item_conditions", list())
)
detection_item_cond_expr_str = d.get("detection_item_cond_expr", None)
if detection_item_cond_expr_str is not None:
detection_item_cond_expr = parse_condition_expression(
detection_item_cond_expr_str, detection_item_conds
)
else:
detection_item_cond_expr = None

field_name_conds = cls._parse_conditions(
field_name_conditions, d.get("field_name_conditions", list())
)
field_name_cond_expr_str = d.get("field_name_cond_expr", None)
if field_name_cond_expr_str is not None:
field_name_cond_expr = parse_condition_expression(
field_name_cond_expr_str, field_name_conds
)
else:
field_name_cond_expr = None

kwargs.update(
{
"detection_item_conditions": cls._parse_conditions(
detection_item_conditions, d.get("detection_item_conditions", list())
),
"detection_item_condition_expression": d.get("detection_item_cond_expr", None),
"detection_item_conditions": detection_item_conds,
"detection_item_condition_expression": detection_item_cond_expr,
"detection_item_condition_linking": cls._parse_condition_linking(
d, "detection_item_cond_op"
),
"detection_item_condition_negation": d.get("detection_item_cond_not", False),
"field_name_conditions": cls._parse_conditions(
field_name_conditions, d.get("field_name_conditions", list())
),
"field_name_condition_expression": d.get("field_name_cond_expr", None),
"field_name_conditions": field_name_conds,
"field_name_condition_expression": field_name_cond_expr,
"field_name_condition_linking": cls._parse_condition_linking(
d, "field_name_cond_op"
),
Expand All @@ -322,13 +360,23 @@ def __post_init__(self):
DetectionItemProcessingCondition,
"Detection item condition",
)
self._resolve_condition_expression(
self.detection_item_condition_expression,
self.detection_item_conditions,
"Detection item condition",
)
self._check_conditions(
"field_name_condition_expression",
"field_name_condition_linking",
"field_name_conditions",
FieldNameProcessingCondition,
"Field name condition",
)
self._resolve_condition_expression(
self.field_name_condition_expression,
self.field_name_conditions,
"Field name condition",
)

def apply(
self, pipeline: "ProcessingPipeline", rule: Union[SigmaRule, SigmaCorrelationRule]
Expand All @@ -352,12 +400,19 @@ def match_detection_item(
Evalutates detection item and field name conditions from processing item to detection item
and returns result.
"""
detection_item_cond_result = self.detection_item_condition_linking(
[
condition.match(pipeline, detection_item)
for condition in self.detection_item_conditions
]
)
if (
self.detection_item_condition_expression is not None
): # detection item condition expression
detection_item_cond_result = self.detection_item_condition_expression.match(
detection_item
)
else: # simplified detection item condition
detection_item_cond_result = self.detection_item_condition_linking(
[
condition.match(pipeline, detection_item)
for condition in self.detection_item_conditions
]
)
if self.detection_item_condition_negation:
detection_item_cond_result = not detection_item_cond_result

Expand All @@ -376,12 +431,15 @@ def match_field_name(self, pipeline: "ProcessingPipeline", field: Optional[str])
"""
Evaluate field name conditions on field names and return result.
"""
field_name_cond_result = self.field_name_condition_linking(
[
condition.match_field_name(pipeline, field)
for condition in self.field_name_conditions
]
)
if self.field_name_condition_expression is not None: # field name condition expression
field_name_cond_result = self.field_name_condition_expression.match_field_name(field)
else: # simplified field name condition
field_name_cond_result = self.field_name_condition_linking(
[
condition.match_field_name(pipeline, field)
for condition in self.field_name_conditions
]
)
if self.field_name_condition_negation:
field_name_cond_result = not field_name_cond_result

Expand Down
Loading

0 comments on commit 50ca86b

Please sign in to comment.