From 17185b23e5d18f0977eaf0f96bd941fb6c8d53f1 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Wed, 5 Jul 2023 13:51:42 +0200 Subject: [PATCH] Run black after setting line-length --- sigma/backends/test/backend.py | 4 +- sigma/collection.py | 24 +--- sigma/conditions.py | 8 +- sigma/conversion/base.py | 142 ++++++----------------- sigma/modifiers.py | 37 ++---- sigma/pipelines/common.py | 4 +- sigma/pipelines/test/pipeline.py | 8 +- sigma/plugins.py | 15 +-- sigma/processing/conditions.py | 12 +- sigma/processing/pipeline.py | 57 +++------ sigma/processing/resolver.py | 10 +- sigma/processing/tracking.py | 4 +- sigma/processing/transformations.py | 81 ++++--------- sigma/rule.py | 94 +++++---------- sigma/types.py | 64 +++------- sigma/validation.py | 20 +--- sigma/validators/base.py | 4 +- sigma/validators/core/condition.py | 33 ++---- sigma/validators/core/logsources.py | 8 +- sigma/validators/core/metadata.py | 4 +- sigma/validators/core/modifiers.py | 12 +- sigma/validators/core/tags.py | 29 ++--- sigma/validators/core/values.py | 8 +- tests/test_collection.py | 27 +---- tests/test_conditions.py | 51 +++----- tests/test_conversion_base.py | 100 +++++++--------- tests/test_conversion_state.py | 4 +- tests/test_exceptions.py | 8 +- tests/test_modifiers.py | 136 ++++++++++------------ tests/test_pipelines_common.py | 4 +- tests/test_plugins.py | 33 ++---- tests/test_processing_conditions.py | 16 +-- tests/test_processing_pipeline.py | 69 +++-------- tests/test_processing_resolver.py | 4 +- tests/test_processing_transformations.py | 134 +++++++-------------- tests/test_rule.py | 121 ++++++------------- tests/test_types.py | 57 +++------ tests/test_validation.py | 24 +--- tests/test_validators.py | 19 +-- tools/update-mitre_attack.py | 13 +-- 40 files changed, 446 insertions(+), 1056 deletions(-) diff --git a/sigma/backends/test/backend.py b/sigma/backends/test/backend.py index 2e742239..3cc67ef6 100644 --- a/sigma/backends/test/backend.py +++ b/sigma/backends/test/backend.py @@ -55,9 +55,7 @@ class TextQueryTestBackend(TextQueryBackend): re_escape: ClassVar[Tuple[str]] = ("/", "bar") case_sensitive_match_expression = "{field} casematch {value}" - case_sensitive_startswith_expression: ClassVar[ - str - ] = "{field} startswith_cased {value}" + case_sensitive_startswith_expression: ClassVar[str] = "{field} startswith_cased {value}" case_sensitive_endswith_expression: ClassVar[str] = "{field} endswith_cased {value}" case_sensitive_contains_expression: ClassVar[str] = "{field} contains_cased {value}" diff --git a/sigma/collection.py b/sigma/collection.py index 31a032ef..74defa16 100644 --- a/sigma/collection.py +++ b/sigma/collection.py @@ -13,17 +13,13 @@ class SigmaCollection: rules: List[SigmaRule] errors: List[SigmaError] = field(default_factory=list) - ids_to_rules: Dict[str, SigmaRule] = field( - init=False, repr=False, hash=False, compare=False - ) + ids_to_rules: Dict[str, SigmaRule] = field(init=False, repr=False, hash=False, compare=False) def __post_init__(self): """ Map rule identifiers to rules. """ - self.ids_to_rules = { - str(rule.id): rule for rule in self.rules if rule.id is not None - } + self.ids_to_rules = {str(rule.id): rule for rule in self.rules if rule.id is not None} @classmethod def from_dicts( @@ -70,9 +66,7 @@ def from_dicts( action == "repeat" ): # add content of current rule to previous rule and parse it prev_rule = deep_dict_update(prev_rule, rule) - parsed_rules.append( - SigmaRule.from_dict(prev_rule, collect_errors, source) - ) + parsed_rules.append(SigmaRule.from_dict(prev_rule, collect_errors, source)) else: exception = SigmaCollectionError( f"Unknown Sigma collection action '{ action }' in rule { i }", @@ -98,9 +92,7 @@ def from_yaml( If the collect_errors parameters is set, exceptions are not raised while parsing but collected in the errors property individually for each Sigma rule and the whole SigmaCollection. """ - return cls.from_dicts( - list(yaml.safe_load_all(yaml_str)), collect_errors, source - ) + return cls.from_dicts(list(yaml.safe_load_all(yaml_str)), collect_errors, source) @classmethod def resolve_paths( @@ -118,9 +110,7 @@ def resolve_paths( paths = ( # Recurse into directories if provided path.glob(recursion_pattern) if path.is_dir() else (path,) for path in paths ) - return ( # Flatten the list - subpath for subpaths in paths for subpath in subpaths - ) + return (subpath for subpaths in paths for subpath in subpaths) # Flatten the list @classmethod def load_ruleset( @@ -128,9 +118,7 @@ def load_ruleset( inputs: List[Union[str, Path]], collect_errors: bool = False, on_beforeload: Optional[Callable[[Path], Optional[Path]]] = None, - on_load: Optional[ - Callable[[Path, "SigmaCollection"], Optional["SigmaCollection"]] - ] = None, + on_load: Optional[Callable[[Path, "SigmaCollection"], Optional["SigmaCollection"]]] = None, recursion_pattern: str = "**/*.yml", ) -> "SigmaCollection": """ diff --git a/sigma/conditions.py b/sigma/conditions.py index 6848f368..2f475ba2 100644 --- a/sigma/conditions.py +++ b/sigma/conditions.py @@ -79,9 +79,7 @@ class ConditionItem(ParentChainMixin, ABC): source: Optional[SigmaRuleLocation] = field(default=None, compare=False) @classmethod - def from_parsed( - cls, s: str, l: int, t: Union[ParseResults, list] - ) -> "ConditionItem": + def from_parsed(cls, s: str, l: int, t: Union[ParseResults, list]) -> "ConditionItem": """Create condition object from parse result""" if cls.arg_count == 1: if cls.token_list: @@ -185,9 +183,7 @@ def __post_init__(self): self.cond_class = ConditionAND self.pattern = self.args[1] - def resolve_referenced_detections( - self, detections: "sigma.rule.SigmaDetections" - ) -> List[str]: + def resolve_referenced_detections(self, detections: "sigma.rule.SigmaDetections") -> List[str]: """ Resolve all detection identifiers referenced by the selector. """ diff --git a/sigma/conversion/base.py b/sigma/conversion/base.py index 766fa8ff..be8a0c08 100644 --- a/sigma/conversion/base.py +++ b/sigma/conversion/base.py @@ -99,9 +99,9 @@ class Backend(ABC): processing_pipeline: ProcessingPipeline last_processing_pipeline: ProcessingPipeline backend_processing_pipeline: ClassVar[ProcessingPipeline] = ProcessingPipeline() - output_format_processing_pipeline: ClassVar[ - Dict[str, ProcessingPipeline] - ] = defaultdict(ProcessingPipeline) + output_format_processing_pipeline: ClassVar[Dict[str, ProcessingPipeline]] = defaultdict( + ProcessingPipeline + ) config: Dict[str, Any] default_format: ClassVar[str] = "default" collect_errors: bool = False @@ -125,9 +125,7 @@ def __init__( self.processing_pipeline = processing_pipeline self.collect_errors = collect_errors - def convert( - self, rule_collection: SigmaCollection, output_format: Optional[str] = None - ) -> Any: + def convert(self, rule_collection: SigmaCollection, output_format: Optional[str] = None) -> Any: """ Convert a Sigma ruleset into the target data structure. Usually the result are one or multiple queries, but might also be some arbitrary data structure required for further @@ -140,9 +138,7 @@ def convert( ] return self.finalize(queries, output_format or self.default_format) - def convert_rule( - self, rule: SigmaRule, output_format: Optional[str] = None - ) -> List[Any]: + def convert_rule(self, rule: SigmaRule, output_format: Optional[str] = None) -> List[Any]: """ Convert a single Sigma rule into the target data structure (usually query, see above). """ @@ -150,9 +146,7 @@ def convert_rule( self.last_processing_pipeline = ( self.backend_processing_pipeline + self.processing_pipeline - + self.output_format_processing_pipeline[ - output_format or self.default_format - ] + + self.output_format_processing_pipeline[output_format or self.default_format] ) error_state = "applying processing pipeline on" @@ -161,9 +155,7 @@ def convert_rule( # 2. Convert conditions error_state = "converting" states = [ - ConversionState( - processing_state=dict(self.last_processing_pipeline.state) - ) + ConversionState(processing_state=dict(self.last_processing_pipeline.state)) for _ in rule.detection.parsed_condition ] queries = [ @@ -225,9 +217,7 @@ def decide_convert_condition_as_in_expression( # return False # All arguments of the given condition must reference a field - if not all( - (isinstance(arg, ConditionFieldEqualsValueExpression) for arg in cond.args) - ): + if not all((isinstance(arg, ConditionFieldEqualsValueExpression) for arg in cond.args)): return False # Build a set of all fields appearing in condition arguments @@ -237,9 +227,7 @@ def decide_convert_condition_as_in_expression( return False # All argument values must be strings or numbers - if not all( - [isinstance(arg.value, (SigmaString, SigmaNumber)) for arg in cond.args] - ): + if not all([isinstance(arg.value, (SigmaString, SigmaNumber)) for arg in cond.args]): return False # Check for plain strings if wildcards are not allowed for string expressions. @@ -356,11 +344,7 @@ def convert_condition_field_eq_val_exists( else: return self.convert_condition_not( ConditionNOT( - [ - ConditionFieldEqualsValueExpression( - cond.field, SigmaExists(True) - ) - ], + [ConditionFieldEqualsValueExpression(cond.field, SigmaExists(True))], cond.source, ), state, @@ -380,10 +364,7 @@ def convert_condition_field_eq_expansion( all converted subconditions. """ or_cond = ConditionOR( - [ - ConditionFieldEqualsValueExpression(cond.field, value) - for value in cond.value.values - ], + [ConditionFieldEqualsValueExpression(cond.field, value) for value in cond.value.values], cond.source, ) return self.convert_condition_or(or_cond, state) @@ -446,9 +427,7 @@ def convert_condition_query_expr( ) -> Any: """Conversion of query expressions without field association.""" - def convert_condition_val( - self, cond: ConditionValueExpression, state: ConversionState - ) -> Any: + def convert_condition_val(self, cond: ConditionValueExpression, state: ConversionState) -> Any: """Conversion of value-only conditions.""" if isinstance(cond.value, SigmaString): return self.convert_condition_val_str(cond, state) @@ -500,8 +479,7 @@ def convert_condition(self, cond: ConditionType, state: ConversionState) -> Any: return self.convert_condition_val(cond, state) else: # pragma: no cover raise TypeError( - "Unexpected data type in condition parse tree: " - + cond.__class__.__name__ + "Unexpected data type in condition parse tree: " + cond.__class__.__name__ ) def finalize_query( @@ -519,9 +497,7 @@ def finalize_query( This is the place where syntactic elements of the target format for the specific query are added, e.g. adding query metadata. """ - return self.__getattribute__("finalize_query_" + output_format)( - rule, query, index, state - ) + return self.__getattribute__("finalize_query_" + output_format)(rule, query, index, state) def finalize_query_default( self, rule: SigmaRule, query: Any, index: int, state: ConversionState @@ -591,18 +567,14 @@ class variables. If this is not sufficient, the respective methods can be implem field_escape: ClassVar[ Optional[str] ] = None # Character to escape particular parts defined in field_escape_pattern. - field_escape_quote: ClassVar[ - bool - ] = True # Escape quote string defined in field_quote + field_escape_quote: ClassVar[bool] = True # Escape quote string defined in field_quote field_escape_pattern: ClassVar[ Optional[Pattern] ] = None # All matches of this pattern are prepended with the string contained in field_escape. ## Values ### String quoting - str_quote: ClassVar[ - str - ] = "" # string quoting character (added as escaping character) + str_quote: ClassVar[str] = "" # string quoting character (added as escaping character) str_quote_pattern: ClassVar[ Optional[Pattern] ] = None # Quote string values that match (or don't match) this pattern @@ -611,15 +583,9 @@ class variables. If this is not sufficient, the respective methods can be implem escape_char: ClassVar[ Optional[str] ] = None # Escaping character for special characters inside string - wildcard_multi: ClassVar[ - Optional[str] - ] = None # Character used as multi-character wildcard - wildcard_single: ClassVar[ - Optional[str] - ] = None # Character used as single-character wildcard - add_escaped: ClassVar[ - str - ] = "" # Characters quoted in addition to wildcards and string quote + wildcard_multi: ClassVar[Optional[str]] = None # Character used as multi-character wildcard + wildcard_single: ClassVar[Optional[str]] = None # Character used as single-character wildcard + add_escaped: ClassVar[str] = "" # Characters quoted in addition to wildcards and string quote filter_chars: ClassVar[str] = "" # Characters filtered ### Booleans bool_values: ClassVar[ @@ -652,9 +618,7 @@ class variables. If this is not sufficient, the respective methods can be implem # flag_x placeholders in re_expression template. # By default, i, m and s are defined. If a flag is not supported by the target query language, # remove it from re_flags or don't define it to ensure proper error handling in case of appearance. - re_flags: Dict[ - SigmaRegularExpressionFlag, str - ] = SigmaRegularExpression.sigma_to_re_flag + re_flags: Dict[SigmaRegularExpressionFlag, str] = SigmaRegularExpression.sigma_to_re_flag # Case sensitive string matching expression. String is quoted/escaped like a normal string. # Placeholders {field} and {value} are replaced with field name and quoted/escaped string. @@ -763,9 +727,7 @@ def compare_precedence(self, outer: ConditionItem, inner: ConditionItem) -> bool try: idx_inner = self.precedence.index(inner_class) except ValueError: # ConditionItem not in precedence tuple - idx_inner = ( - -1 - ) # Assume precedence of inner condition item is higher than the outer + idx_inner = -1 # Assume precedence of inner condition item is higher than the outer return idx_inner <= self.precedence.index(outer_class) @@ -799,8 +761,7 @@ def convert_condition_or( else self.convert_condition_group(arg, state) for arg in cond.args ) - if converted is not None - and not isinstance(converted, DeferredQueryExpression) + if converted is not None and not isinstance(converted, DeferredQueryExpression) ) ) except TypeError: # pragma: no cover @@ -814,9 +775,7 @@ def convert_condition_as_in_expression( field=self.escape_and_quote_field( cond.args[0].field ), # The assumption that the field is the same for all argument is valid because this is checked before - op=self.or_in_operator - if isinstance(cond, ConditionOR) - else self.and_in_operator, + op=self.or_in_operator if isinstance(cond, ConditionOR) else self.and_in_operator, list=self.list_separator.join( [ self.convert_value_str(arg.value, state) @@ -848,8 +807,7 @@ def convert_condition_and( else self.convert_condition_group(arg, state) for arg in cond.args ) - if converted is not None - and not isinstance(converted, DeferredQueryExpression) + if converted is not None and not isinstance(converted, DeferredQueryExpression) ) ) except TypeError: # pragma: no cover @@ -861,13 +819,9 @@ def convert_condition_not( """Conversion of NOT conditions.""" arg = cond.args[0] try: - if ( - arg.__class__ in self.precedence - ): # group if AND or OR condition is negated + if arg.__class__ in self.precedence: # group if AND or OR condition is negated return ( - self.not_token - + self.token_separator - + self.convert_condition_group(arg, state) + self.not_token + self.token_separator + self.convert_condition_group(arg, state) ) else: expr = self.convert_condition(arg, state) @@ -896,8 +850,7 @@ def escape_and_quote_field(self, field_name: str) -> str: self.field_escape_pattern is not None ): # Match all occurrences of field_escpae_pattern if defined and initialize match position set with result. match_positions = { - match.start() - for match in self.field_escape_pattern.finditer(field_name) + match.start() for match in self.field_escape_pattern.finditer(field_name) } else: match_positions = set() @@ -906,9 +859,7 @@ def escape_and_quote_field(self, field_name: str) -> str: self.field_escape_quote and self.field_quote is not None ): # Add positions of quote string to match position set re_quote = re.compile(re.escape(self.field_quote)) - match_positions.update( - (match.start() for match in re_quote.finditer(field_name)) - ) + match_positions.update((match.start() for match in re_quote.finditer(field_name))) if len(match_positions) > 0: # found matches, escape them r = [0] + list(sorted(match_positions)) + [len(field_name)] @@ -916,14 +867,10 @@ def escape_and_quote_field(self, field_name: str) -> str: for i in range( len(r) - 1 ): # TODO: from Python 3.10 this can be replaced with itertools.pairwise(), but for now we keep support for Python <3.10 - if ( - i == 0 - ): # The first range is passed to the result without escaping + if i == 0: # The first range is passed to the result without escaping escaped_field_name += field_name[r[i] : r[i + 1]] else: # Subsequent ranges are positions of matches and therefore are prepended with field_escape - escaped_field_name += ( - self.field_escape + field_name[r[i] : r[i + 1]] - ) + escaped_field_name += self.field_escape + field_name[r[i] : r[i + 1]] else: # no matches, just pass original field name without escaping escaped_field_name = field_name else: @@ -985,9 +932,7 @@ def convert_condition_field_eq_val_str( if ( # Check conditions for usage of 'startswith' operator self.startswith_expression is not None # 'startswith' operator is defined in backend - and cond.value.endswith( - SpecialChars.WILDCARD_MULTI - ) # String ends with wildcard + and cond.value.endswith(SpecialChars.WILDCARD_MULTI) # String ends with wildcard and not cond.value[ :-1 ].contains_special() # Remainder of string doesn't contains special characters @@ -1012,8 +957,7 @@ def convert_condition_field_eq_val_str( expr = self.contains_expression value = cond.value[1:-1] elif ( # wildcard match expression: string contains wildcard - self.wildcard_match_expression is not None - and cond.value.contains_special() + self.wildcard_match_expression is not None and cond.value.contains_special() ): expr = self.wildcard_match_expression value = cond.value @@ -1037,9 +981,7 @@ def convert_condition_field_eq_val_str_case_sensitive( if ( # Check conditions for usage of 'startswith' operator self.case_sensitive_startswith_expression is not None # 'startswith' operator is defined in backend - and cond.value.endswith( - SpecialChars.WILDCARD_MULTI - ) # String ends with wildcard + and cond.value.endswith(SpecialChars.WILDCARD_MULTI) # String ends with wildcard and not cond.value[ :-1 ].contains_special() # Remainder of string doesn't contains special characters @@ -1084,11 +1026,7 @@ def convert_condition_field_eq_val_num( ) -> Union[str, DeferredQueryExpression]: """Conversion of field = number value expressions""" try: - return ( - self.escape_and_quote_field(cond.field) - + self.eq_token - + str(cond.value) - ) + return self.escape_and_quote_field(cond.field) + self.eq_token + str(cond.value) except TypeError: # pragma: no cover raise NotImplementedError( "Field equals numeric value expressions are not supported by the backend." @@ -1175,9 +1113,7 @@ def convert_condition_field_eq_val_cidr( expanded = cidr.expand(self.wildcard_multi) expanded_cond = ConditionOR( [ - ConditionFieldEqualsValueExpression( - cond.field, SigmaString(network) - ) + ConditionFieldEqualsValueExpression(cond.field, SigmaString(network)) for network in expanded ], cond.source, @@ -1223,17 +1159,13 @@ def convert_condition_field_eq_val_null( self, cond: ConditionFieldEqualsValueExpression, state: ConversionState ) -> Union[str, DeferredQueryExpression]: """Conversion of field is null expression value expressions""" - return self.field_null_expression.format( - field=self.escape_and_quote_field(cond.field) - ) + return self.field_null_expression.format(field=self.escape_and_quote_field(cond.field)) def convert_condition_field_exists( self, cond: ConditionFieldEqualsValueExpression, state: ConversionState ) -> Union[str, DeferredQueryExpression]: """Conversion of field exists expressions""" - return self.field_exists_expression.format( - field=self.escape_and_quote_field(cond.field) - ) + return self.field_exists_expression.format(field=self.escape_and_quote_field(cond.field)) def convert_condition_field_not_exists( self, cond: ConditionFieldEqualsValueExpression, state: ConversionState diff --git a/sigma/modifiers.py b/sigma/modifiers.py index ae51180b..0f609c14 100644 --- a/sigma/modifiers.py +++ b/sigma/modifiers.py @@ -52,27 +52,21 @@ def __init__( self.applied_modifiers = applied_modifiers self.source = source - def type_check( - self, val: Union[SigmaType, Sequence[SigmaType]], explicit_type=None - ) -> bool: + def type_check(self, val: Union[SigmaType, Sequence[SigmaType]], explicit_type=None) -> bool: th = ( explicit_type or get_type_hints(self.modify)["val"] ) # get type annotation from val parameter of apply method or explicit_type parameter to = get_origin(th) # get possible generic type of type hint if to is None: # Plain type in annotation return isinstance(val, th) - elif ( - to is Union - ): # type hint is Union of multiple types, check if val is one of them + elif to is Union: # type hint is Union of multiple types, check if val is one of them for t in get_args(th): if isinstance(val, t): return True return False elif to is SequenceABC: # type hint is sequence inner_type = get_args(th)[0] - return all( - [self.type_check(item, explicit_type=inner_type) for item in val] - ) + return all([self.type_check(item, explicit_type=inner_type) for item in val]) @abstractmethod def modify( @@ -87,9 +81,7 @@ def apply(self, val: Union[SigmaType, Sequence[SigmaType]]) -> List[SigmaType]: * Ensure returned value is a list * Handle values of SigmaExpansion objects separately. """ - if isinstance( - val, SigmaExpansion - ): # Handle each SigmaExpansion item separately + if isinstance(val, SigmaExpansion): # Handle each SigmaExpansion item separately return [SigmaExpansion([va for v in val.values for va in self.apply(v)])] else: if not self.type_check(val): @@ -223,9 +215,7 @@ def modify(self, val: SigmaString) -> SigmaString: ): # put 0x00 after each character by encoding it to utf-16le and decoding it as utf-8 try: r.append(item.encode("utf-16le").decode("utf-8")) - except ( - UnicodeDecodeError - ): # this method only works for ascii characters + except UnicodeDecodeError: # this method only works for ascii characters raise SigmaValueError( f"Wide modifier only allowed for ascii strings, input string '{str(val)}' isn't one", source=self.source, @@ -253,9 +243,9 @@ def callback(p: Placeholder): yield p return SigmaExpansion( - val.replace_with_placeholder( - re.compile("\\B[-/]\\b"), "_windash" - ).replace_placeholders(callback) + val.replace_with_placeholder(re.compile("\\B[-/]\\b"), "_windash").replace_placeholders( + callback + ) ) @@ -370,9 +360,7 @@ class SigmaFieldReferenceModifier(SigmaValueModifier): def modify(self, val: SigmaString) -> SigmaFieldReference: if val.contains_special(): - raise SigmaValueError( - "Field references must not contain wildcards", source=self.source - ) + raise SigmaValueError("Field references must not contain wildcards", source=self.source) return SigmaFieldReference(val.to_plain()) @@ -381,9 +369,7 @@ class SigmaExistsModifier(SigmaValueModifier): def modify(self, val: SigmaBool) -> SigmaExists: if self.detection_item.field is None: - raise SigmaValueError( - "Exists modifier must be applied to field", source=self.source - ) + raise SigmaValueError("Exists modifier must be applied to field", source=self.source) if len(self.applied_modifiers) > 0: raise SigmaValueError( "Exists modifier only applicable to unmodified boolean values", @@ -433,6 +419,5 @@ def modify(self, val: SigmaString) -> SigmaString: # Mapping from modifier class to identifier reverse_modifier_mapping: Dict[str, str] = { - modifier_class.__name__: identifier - for identifier, modifier_class in modifier_mapping.items() + modifier_class.__name__: identifier for identifier, modifier_class in modifier_mapping.items() } diff --git a/sigma/pipelines/common.py b/sigma/pipelines/common.py index 0bd3d826..40a01fe5 100644 --- a/sigma/pipelines/common.py +++ b/sigma/pipelines/common.py @@ -281,9 +281,7 @@ def generate_windows_logsource_items( AddConditionTransformation( { # source is list cond_field_template.format(service=service, source=source): [ - cond_value_template.format( - service=service, source=source_item - ) + cond_value_template.format(service=service, source=source_item) for source_item in source ] } diff --git a/sigma/pipelines/test/pipeline.py b/sigma/pipelines/test/pipeline.py index aaecaad9..7d9d628c 100644 --- a/sigma/pipelines/test/pipeline.py +++ b/sigma/pipelines/test/pipeline.py @@ -46,13 +46,9 @@ def apply(self): allowed_backends={"another"}, items=[ ProcessingItem( - transformation=AddConditionTransformation( - conditions={"EventID": 1} - ), + transformation=AddConditionTransformation(conditions={"EventID": 1}), rule_conditions=[ - LogsourceCondition( - category="process_creation", product="windows" - ) + LogsourceCondition(category="process_creation", product="windows") ], ), ], diff --git a/sigma/plugins.py b/sigma/plugins.py index 02ed18be..a5c4dca3 100644 --- a/sigma/plugins.py +++ b/sigma/plugins.py @@ -122,9 +122,7 @@ def is_pipeline(obj): possible_obj = submodules[obj_name] # OR'd condition ensures backwards compatibility with older plugins - if is_pipeline(possible_obj) or inspect.isfunction( - possible_obj - ): + if is_pipeline(possible_obj) or inspect.isfunction(possible_obj): if inspect.isclass(possible_obj) and issubclass( possible_obj, Pipeline ): @@ -136,8 +134,7 @@ def is_pipeline(obj): if ( inspect.isclass(submodules[cls_name]) and issubclass(submodules[cls_name], SigmaRuleValidator) - and submodules[cls_name].__module__ - != "sigma.validators.base" + and submodules[cls_name].__module__ != "sigma.validators.base" ): result[cls_name] = submodules[cls_name] elif directory_name == "backends": @@ -164,9 +161,7 @@ def autodiscover( """Automatically discovers backends, pipelines and validators in their corresponding module namespaces and return a InstalledSigmaPlugins class containing all identified classes and generators. """ - backends = cls._discover_module_directories( - sigma.backends, "backends", include_backends - ) + backends = cls._discover_module_directories(sigma.backends, "backends", include_backends) pipelines = cls._discover_module_directories( sigma.pipelines, "pipelines", include_pipelines ) @@ -266,9 +261,7 @@ def install(self): def uninstall(self): """Uninstall plugin with pip.""" - subprocess.check_call( - [sys.executable, "-m", "pip", "-q", "uninstall", "-y", self.package] - ) + subprocess.check_call([sys.executable, "-m", "pip", "-q", "uninstall", "-y", self.package]) @dataclass diff --git a/sigma/processing/conditions.py b/sigma/processing/conditions.py index 7f7b457d..2447ff13 100644 --- a/sigma/processing/conditions.py +++ b/sigma/processing/conditions.py @@ -72,9 +72,7 @@ def match_detection_item_value( """Returns True if any value of a detection item contains a field reference to a field name matching the implemented field name condition. Processing actions must only be applied to matching individual values determined by `match_value`.""" - return any( - (self.match_value(pipeline, value) for value in detection_item.value) - ) + return any((self.match_value(pipeline, value) for value in detection_item.value)) def match_value( self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", value: SigmaType @@ -180,9 +178,7 @@ def match( return True return False - def find_detection_item( - self, detection: Union[SigmaDetectionItem, SigmaDetection] - ) -> bool: + def find_detection_item(self, detection: Union[SigmaDetectionItem, SigmaDetection]) -> bool: if isinstance(detection, SigmaDetection): for detection_item in detection.detection_items: if self.find_detection_item(detection_item): @@ -196,9 +192,7 @@ def find_detection_item( ): return True else: - raise TypeError( - "Parameter of type SigmaDetection or SigmaDetectionItem expected." - ) + raise TypeError("Parameter of type SigmaDetection or SigmaDetectionItem expected.") return False diff --git a/sigma/processing/pipeline.py b/sigma/processing/pipeline.py index a4026da4..e49156cc 100644 --- a/sigma/processing/pipeline.py +++ b/sigma/processing/pipeline.py @@ -45,18 +45,12 @@ class ProcessingItem: rule_condition_linking: Callable[[Iterable[bool]], bool] = all # any or all rule_condition_negation: bool = False rule_conditions: List[RuleProcessingCondition] = field(default_factory=list) - detection_item_condition_linking: Callable[ - [Iterable[bool]], bool - ] = all # any or all + detection_item_condition_linking: Callable[[Iterable[bool]], bool] = all # any or all detection_item_condition_negation: bool = False - detection_item_conditions: List[DetectionItemProcessingCondition] = field( - default_factory=list - ) + detection_item_conditions: List[DetectionItemProcessingCondition] = field(default_factory=list) field_name_condition_linking: Callable[[Iterable[bool]], bool] = all # any or all field_name_condition_negation: bool = False - field_name_conditions: List[FieldNameProcessingCondition] = field( - default_factory=list - ) + field_name_conditions: List[FieldNameProcessingCondition] = field(default_factory=list) identifier: Optional[str] = None @classmethod @@ -70,9 +64,7 @@ def from_dict(cls, d: dict): for condition_class_mapping, cond_defs, conds in ( ( # Condition item processing items are defined as follows: rule_conditions, # Dict containing mapping between names used in configuration and classes. - d.get( - "rule_conditions", list() - ), # List of conditions in configuration dict + d.get("rule_conditions", list()), # List of conditions in configuration dict rule_conds := list(), # List where condition classes for ProcessingItem initialization are collected ), ( @@ -189,29 +181,21 @@ def __post_init__(self): f"Rule processing condition '{str(rule_condition)}' is not a RuleProcessingCondition" ) if not isinstance(self.detection_item_conditions, list): - raise SigmaTypeError( - "Detection item processing conditions must be provided as list" - ) + raise SigmaTypeError("Detection item processing conditions must be provided as list") for detection_item_condition in self.detection_item_conditions: - if not isinstance( - detection_item_condition, DetectionItemProcessingCondition - ): + if not isinstance(detection_item_condition, DetectionItemProcessingCondition): raise SigmaTypeError( f"Detection item processing condition '{str(detection_item_condition)}' is not a DetectionItemProcessingCondition" ) if not isinstance(self.field_name_conditions, list): - raise SigmaTypeError( - "Field name processing conditions must be provided as list" - ) + raise SigmaTypeError("Field name processing conditions must be provided as list") for field_name_condition in self.field_name_conditions: if not isinstance(field_name_condition, FieldNameProcessingCondition): raise SigmaTypeError( f"Detection item processing condition '{str(field_name_condition)}' is not a FieldNameProcessingCondition" ) - def apply( - self, pipeline: "ProcessingPipeline", rule: SigmaRule - ) -> Tuple[SigmaRule, bool]: + def apply(self, pipeline: "ProcessingPipeline", rule: SigmaRule) -> Tuple[SigmaRule, bool]: """ Matches condition against rule and performs transformation if condition is true or not present. Returns Sigma rule and bool if transformation was applied. @@ -256,9 +240,7 @@ def match_detection_item( return detection_item_cond_result and field_name_cond_result - def match_field_name( - self, pipeline: "ProcessingPipeline", field: Optional[str] - ) -> bool: + def match_field_name(self, pipeline: "ProcessingPipeline", field: Optional[str]) -> bool: """ Evaluate field name conditions on field names and return result. """ @@ -273,18 +255,13 @@ def match_field_name( return field_name_cond_result - def match_field_in_value( - self, pipeline: "ProcessingPipeline", value: SigmaType - ) -> bool: + def match_field_in_value(self, pipeline: "ProcessingPipeline", value: SigmaType) -> bool: """ Evaluate field name conditions in field reference values and return result. """ if isinstance(value, SigmaFieldReference): field_name_cond_result = self.field_name_condition_linking( - [ - condition.match_value(pipeline, value) - for condition in self.field_name_conditions - ] + [condition.match_value(pipeline, value) for condition in self.field_name_conditions] ) if self.field_name_condition_negation: field_name_cond_result = not field_name_cond_result @@ -387,9 +364,7 @@ def track_field_processing_items( the set of applied processing items from src_field and assigns a copy of this set ass tracking set to all fields in dest_field. """ - if [ - src_field - ] != dest_field: # Only add if source field was mapped to something different. + if [src_field] != dest_field: # Only add if source field was mapped to something different. applied_identifiers: Set = self.field_name_applied_ids[src_field] if processing_item_id is not None: applied_identifiers.add(processing_item_id) @@ -397,9 +372,7 @@ def track_field_processing_items( for field in dest_field: self.field_name_applied_ids[field] = applied_identifiers.copy() - def field_was_processed_by( - self, field: Optional[str], processing_item_id: str - ) -> bool: + def field_was_processed_by(self, field: Optional[str], processing_item_id: str) -> bool: """ Check if field name was processed by a particular processing item. """ @@ -413,9 +386,7 @@ def __add__(self, other: Optional["ProcessingPipeline"]) -> "ProcessingPipeline" return self if not isinstance(other, self.__class__): raise TypeError("Processing pipeline must be merged with another one.") - return self.__class__( - items=self.items + other.items, vars={**self.vars, **other.vars} - ) + return self.__class__(items=self.items + other.items, vars={**self.vars, **other.vars}) def __radd__(self, other: Literal[0]) -> "ProcessingPipeline": """Ignore integer 0 on addition to make sum of list of ProcessingPipelines working.""" diff --git a/sigma/processing/resolver.py b/sigma/processing/resolver.py index a42708be..5f120493 100644 --- a/sigma/processing/resolver.py +++ b/sigma/processing/resolver.py @@ -14,9 +14,9 @@ class ProcessingPipelineResolver: It takes care of sorting by priority and resolution of filenames as well as pipeline name identifiers. """ - pipelines: Dict[ - str, Union[ProcessingPipeline, Callable[[], ProcessingPipeline]] - ] = field(default_factory=dict) + pipelines: Dict[str, Union[ProcessingPipeline, Callable[[], ProcessingPipeline]]] = field( + default_factory=dict + ) def add_pipeline_class(self, pipeline: ProcessingPipeline) -> None: """Add named processing pipeline object to resolver. This pipeline can be resolved by the name.""" @@ -35,9 +35,7 @@ def list_pipelines(self) -> Iterable[Tuple[str, ProcessingPipeline]]: """List identifier/processing pipeline tuples.""" return ((id, self.resolve_pipeline(id)) for id in self.pipelines.keys()) - def resolve_pipeline( - self, spec: str, target: Optional[str] = None - ) -> ProcessingPipeline: + def resolve_pipeline(self, spec: str, target: Optional[str] = None) -> ProcessingPipeline: """ Resolve single processing pipeline. It first tries to find a pipeline with this identifier in the registered pipelines. If this fails, *spec* is treated as file name. If this fails diff --git a/sigma/processing/tracking.py b/sigma/processing/tracking.py index 43840ff5..1a64d49c 100644 --- a/sigma/processing/tracking.py +++ b/sigma/processing/tracking.py @@ -11,9 +11,7 @@ class ProcessingItemTrackingMixin: like detection items and conditions. """ - applied_processing_items: Set[str] = field( - init=False, compare=False, default_factory=set - ) + applied_processing_items: Set[str] = field(init=False, compare=False, default_factory=set) def add_applied_processing_item( self, processing_item: Optional["sigma.processing.pipeline.ProcessingItem"] diff --git a/sigma/processing/transformations.py b/sigma/processing/transformations.py index f67ab2f8..b90d0b72 100644 --- a/sigma/processing/transformations.py +++ b/sigma/processing/transformations.py @@ -46,12 +46,12 @@ def apply( self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule ) -> None: """Apply transformation on Sigma rule.""" - self.pipeline: "sigma.processing.pipeline.ProcessingPipeline" = pipeline # make pipeline accessible from all further options in class property + self.pipeline: "sigma.processing.pipeline.ProcessingPipeline" = ( + pipeline # make pipeline accessible from all further options in class property + ) self.processing_item_applied(rule) - def set_processing_item( - self, processing_item: "sigma.processing.pipeline.ProcessingItem" - ): + def set_processing_item(self, processing_item: "sigma.processing.pipeline.ProcessingItem"): self.processing_item = processing_item def processing_item_applied( @@ -87,16 +87,12 @@ def apply_detection_item( def apply_detection(self, detection: SigmaDetection): for i, detection_item in enumerate(detection.detection_items): - if isinstance( - detection_item, SigmaDetection - ): # recurse into nested detection items + if isinstance(detection_item, SigmaDetection): # recurse into nested detection items self.apply_detection(detection_item) else: if ( self.processing_item is None - or self.processing_item.match_detection_item( - self.pipeline, detection_item - ) + or self.processing_item.match_detection_item(self.pipeline, detection_item) ) and (r := self.apply_detection_item(detection_item)) is not None: if isinstance(r, SigmaDetectionItem): r.disable_conversion_to_plain() @@ -132,9 +128,7 @@ def _apply_field_name( Evaluate field name conditions and perform transformation with apply_field_name() method if condition matches, else return original value. """ - if self.processing_item is None or self.processing_item.match_field_name( - pipeline, field - ): + if self.processing_item is None or self.processing_item.match_field_name(pipeline, field): result = self.apply_field_name(field) if self.processing_item is not None: pipeline.track_field_processing_items( @@ -149,9 +143,7 @@ def apply( ) -> None: """Apply field name transformations to Sigma rule field names listed in 'fields' attribute.""" _apply_field_name = partial(self._apply_field_name, pipeline) - rule.fields = [ - item for mapping in map(_apply_field_name, rule.fields) for item in mapping - ] + rule.fields = [item for mapping in map(_apply_field_name, rule.fields) for item in mapping] return super().apply(pipeline, rule) def apply_detection_item( @@ -161,16 +153,13 @@ def apply_detection_item( new_values = [] match = False for value in detection_item.value: - if ( - self.processing_item is not None - and self.processing_item.match_field_in_value(self.pipeline, value) + if self.processing_item is not None and self.processing_item.match_field_in_value( + self.pipeline, value ): new_values.extend( ( SigmaFieldReference(mapped_field) - for mapped_field in self._apply_field_name( - self.pipeline, value.field - ) + for mapped_field in self._apply_field_name(self.pipeline, value.field) ) ) match = True @@ -260,9 +249,7 @@ def apply( for i, condition in enumerate(rule.detection.parsed_condition): condition_before = condition.condition self.apply_condition(condition) - if ( - condition.condition != condition_before - ): # Condition was changed by transformation, + if condition.condition != condition_before: # Condition was changed by transformation, self.processing_item_applied( condition ) # mark as processed by processing item containing this transformation @@ -290,21 +277,15 @@ def apply_detection_item(self, detection_item: SigmaDetectionItem): super().apply_detection_item(detection_item) field = detection_item.field mapping = self.get_mapping(field) - if mapping is not None and self.processing_item.match_field_name( - self.pipeline, field - ): + if mapping is not None and self.processing_item.match_field_name(self.pipeline, field): self.pipeline.field_mappings.add_mapping(field, mapping) - if isinstance( - mapping, str - ): # 1:1 mapping, map field name of detection item directly + if isinstance(mapping, str): # 1:1 mapping, map field name of detection item directly detection_item.field = mapping self.processing_item_applied(detection_item) else: return SigmaDetection( [ - dataclasses.replace( - detection_item, field=field, auto_modifiers=False - ) + dataclasses.replace(detection_item, field=field, auto_modifiers=False) for field in mapping ], item_linking=ConditionOR, @@ -425,9 +406,7 @@ def is_handled_placeholder(self, p: Placeholder) -> bool: @dataclass -class BasePlaceholderTransformation( - PlaceholderIncludeExcludeMixin, ValueTransformation -): +class BasePlaceholderTransformation(PlaceholderIncludeExcludeMixin, ValueTransformation): """ Placeholder base transformation. The parameters include and exclude can contain variable names that are handled by this transformation. Unhandled placeholders are left as they are and must be handled by @@ -494,9 +473,7 @@ def placeholder_replacements(self, p: Placeholder) -> List[str]: try: values = self.pipeline.vars[p.name] except KeyError: - raise SigmaValueError( - f"Placeholder replacement variable '{ p.name }' doesn't exists." - ) + raise SigmaValueError(f"Placeholder replacement variable '{ p.name }' doesn't exists.") if not isinstance(values, List): values = [values] @@ -510,9 +487,7 @@ def placeholder_replacements(self, p: Placeholder) -> List[str]: @dataclass -class QueryExpressionPlaceholderTransformation( - PlaceholderIncludeExcludeMixin, ValueTransformation -): +class QueryExpressionPlaceholderTransformation(PlaceholderIncludeExcludeMixin, ValueTransformation): """ Replaces a placeholder with a plain query containing the placeholder or an identifier mapped from the placeholder name. The main purpose is the generation of arbitrary @@ -532,14 +507,10 @@ def apply_value( self, field: str, val: SigmaString ) -> Union[SigmaString, Iterable[SigmaString]]: if val.contains_placeholder(): - if ( - len(val.s) == 1 - ): # Sigma string must only contain placeholder, nothing else. + if len(val.s) == 1: # Sigma string must only contain placeholder, nothing else. p = val.s[0] if self.is_handled_placeholder(p): - return SigmaQueryExpression( - self.expression, self.mapping.get(p.name) or p.name - ) + return SigmaQueryExpression(self.expression, self.mapping.get(p.name) or p.name) else: # SigmaString contains placeholder as well as other parts raise SigmaValueError( f"Placeholder query expression transformation only allows placeholder-only strings." @@ -564,9 +535,7 @@ class AddConditionTransformation(ConditionTransformation): def __post_init__(self): if self.name is None: # generate random detection item name if none is given - self.name = "_cond_" + ( - "".join(random.choices(string.ascii_lowercase, k=10)) - ) + self.name = "_cond_" + ("".join(random.choices(string.ascii_lowercase, k=10))) def apply( self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule @@ -594,9 +563,7 @@ def apply( else: conditions = self.conditions - rule.detection.detections[self.name] = SigmaDetection.from_definition( - conditions - ) + rule.detection.detections[self.name] = SigmaDetection.from_definition(conditions) self.processing_item_applied(rule.detection.detections[self.name]) super().apply(pipeline, rule) @@ -653,9 +620,7 @@ class SetStateTransformation(Transformation): key: str val: Any - def apply( - self, pipeline: "sigma.processing.pipeline.Proces", rule: SigmaRule - ) -> None: + def apply(self, pipeline: "sigma.processing.pipeline.Proces", rule: SigmaRule) -> None: super().apply(pipeline, rule) pipeline.state[self.key] = self.val diff --git a/sigma/rule.py b/sigma/rule.py index 68c401dd..867db09b 100644 --- a/sigma/rule.py +++ b/sigma/rule.py @@ -55,9 +55,7 @@ class SigmaRuleTag: source: Optional[SigmaRuleLocation] = field(default=None, compare=False) @classmethod - def from_str( - cls, tag: str, source: Optional[SigmaRuleLocation] = None - ) -> "SigmaRuleTag": + def from_str(cls, tag: str, source: Optional[SigmaRuleLocation] = None) -> "SigmaRuleTag": """Build SigmaRuleTag class from plain text tag string.""" try: ns, n = tag.split(".", maxsplit=1) @@ -174,9 +172,7 @@ def apply_modifiers(self): if isinstance( modifier_instance, SigmaValueModifier ): # Value modifiers are applied to each value separately - self.value = [ - item for val in self.value for item in modifier_instance.apply(val) - ] + self.value = [item for val in self.value for item in modifier_instance.apply(val)] elif isinstance( modifier_instance, SigmaListModifier ): # List modifiers are applied to the whole value list at once @@ -207,9 +203,7 @@ def from_mapping( The value accepts plain values as well as lists of values and resolves them into the value list always contained in a SigmaDetectionItem instance. """ - if ( - key is None - ): # no key at all means pure keyword detection without value modifiers + if key is None: # no key at all means pure keyword detection without value modifiers field = None modifier_ids = list() else: # key-value detection @@ -220,13 +214,9 @@ def from_mapping( try: modifiers = [modifier_mapping[mod_id] for mod_id in modifier_ids] except KeyError as e: - raise sigma_exceptions.SigmaModifierError( - f"Unknown modifier {str(e)}", source=source - ) + raise sigma_exceptions.SigmaModifierError(f"Unknown modifier {str(e)}", source=source) - if isinstance( - val, (int, float, str) - ): # value is plain, convert into single element list + if isinstance(val, (int, float, str)): # value is plain, convert into single element list val = [val] elif val is None: val = [None] @@ -284,8 +274,7 @@ def to_plain(self) -> Union[Dict[str, Union[str, int, None]], List[str]]: self.field or "" ) # field name is empty in case of keyword detection items with modifiers modifier_ids = [ # list of modifier identifiers from reverse mapping - reverse_modifier_mapping[modifier.__name__] - for modifier in self.modifiers + reverse_modifier_mapping[modifier.__name__] for modifier in self.modifiers ] if len(modifier_ids) > 0: modifiers_prefix = "|" @@ -310,31 +299,24 @@ def postprocess( "Null value must be bound to a field", source=self.source ) else: - return ConditionFieldEqualsValueExpression( - self.field, SigmaNull() - ).postprocess(detections, self, self.source) - if ( - len(self.value) == 1 - ): # single value: return key/value or value-only expression + return ConditionFieldEqualsValueExpression(self.field, SigmaNull()).postprocess( + detections, self, self.source + ) + if len(self.value) == 1: # single value: return key/value or value-only expression if self.field is None: return ConditionValueExpression(self.value[0]).postprocess( detections, self, self.source ) else: - return ConditionFieldEqualsValueExpression( - self.field, self.value[0] - ).postprocess(detections, self, self.source) + return ConditionFieldEqualsValueExpression(self.field, self.value[0]).postprocess( + detections, self, self.source + ) else: # more than one value, return logically linked values or an "in" expression if self.field is None: # no field - only values - cond = self.value_linking( - [ConditionValueExpression(v) for v in self.value] - ) + cond = self.value_linking([ConditionValueExpression(v) for v in self.value]) else: # with field - field/value pairs cond = self.value_linking( - [ - ConditionFieldEqualsValueExpression(self.field, v) - for v in self.value - ] + [ConditionFieldEqualsValueExpression(self.field, v) for v in self.value] ) cond.postprocess(detections, parent, self.source) return cond @@ -364,9 +346,7 @@ class SigmaDetection(ParentChainMixin): def __post_init__(self): """Check detection validity.""" if len(self.detection_items) == 0: - raise sigma_exceptions.SigmaDetectionError( - "Detection is empty", source=self.source - ) + raise sigma_exceptions.SigmaDetectionError("Detection is empty", source=self.source) if self.item_linking is None: type_set = {type(item) for item in self.detection_items} @@ -416,23 +396,17 @@ def from_definition( def to_plain(self) -> Union[Dict[str, Union[str, int, None]], List[str]]: """Returns a dictionary or list representation of the detection.""" - detection_items = ( - [ # first convert all detection items into a Python representation. - detection_item.to_plain() for detection_item in self.detection_items - ] - ) + detection_items = [ # first convert all detection items into a Python representation. + detection_item.to_plain() for detection_item in self.detection_items + ] # Filter out where to_plain() returns None, which causes merging errors. # We will have to handle the condition as well in conditions.py detection_items = [ - detection_item - for detection_item in detection_items - if detection_item is not None + detection_item for detection_item in detection_items if detection_item is not None ] - detection_items_types = ( - { # create set of types for decision what has to be returned - type(detection_item) for detection_item in detection_items - } - ) + detection_items_types = { # create set of types for decision what has to be returned + type(detection_item) for detection_item in detection_items + } if len(detection_items) == 0: # pragma: no cover return None # This case is catched by the post init check, so it shouldn't happen. @@ -460,9 +434,7 @@ def to_plain(self) -> Union[Dict[str, Union[str, int, None]], List[str]]: detection_items, self.detection_items ): for k, v in detection_item_converted.items(): - if ( - k not in merged - ): # key doesn't exists in merged dict: just add + if k not in merged: # key doesn't exists in merged dict: just add merged[k] = v else: # key collision, now the things get complicated... if "|all" in k: # key contains 'all' modifier @@ -471,9 +443,7 @@ def to_plain(self) -> Union[Dict[str, Union[str, int, None]], List[str]]: ): # make list from existing all-modified value if it's a plain value merged[k] = [merged[k]] - if isinstance( - v, list - ): # merging two and-linked lists is possible + if isinstance(v, list): # merging two and-linked lists is possible merged[k].extend(v) else: merged[k].append(v) @@ -534,12 +504,9 @@ def postprocess( """Convert detection item into condition tree element""" super().postprocess(detections, parent) items = [ - detection_item.postprocess(detections, self) - for detection_item in self.detection_items + detection_item.postprocess(detections, self) for detection_item in self.detection_items ] - if ( - len(items) == 1 - ): # no boolean linking required, directly return single element + if len(items) == 1: # no boolean linking required, directly return single element return items[0] elif len(items) > 1: condition = self.item_linking(items) @@ -568,9 +535,7 @@ def __post_init__(self): raise sigma_exceptions.SigmaDetectionError( "No detections defined in Sigma rule", source=self.source ) - self.parsed_condition = [ - SigmaCondition(cond, self, self.source) for cond in self.condition - ] + self.parsed_condition = [SigmaCondition(cond, self, self.source) for cond in self.condition] @classmethod def from_dict( @@ -598,8 +563,7 @@ def from_dict( def to_dict(self) -> dict: detections = { - identifier: detection.to_plain() - for identifier, detection in self.detections.items() + identifier: detection.to_plain() for identifier, detection in self.detections.items() } if len(self.condition) > 1: condition = self.condition diff --git a/sigma/types.py b/sigma/types.py index ded61b74..68d3e5a7 100644 --- a/sigma/types.py +++ b/sigma/types.py @@ -119,9 +119,7 @@ def __init__(self, s: Optional[str] = None): r = list() acc = "" # string accumulation until special character appears - escaped = ( - False # escape mode flag: characters in this mode are always accumulated - ) + escaped = False # escape mode flag: characters in this mode are always accumulated for c in s: if escaped: # escaping mode? if ( @@ -131,9 +129,7 @@ def __init__(self, s: Optional[str] = None): else: # accumulate escaping and current character (this allows to use plain backslashes in values) acc += escape_char + c escaped = False - elif ( - c == escape_char - ): # escaping character? enable escaped mode for next character + elif c == escape_char: # escaping character? enable escaped mode for next character escaped = True else: # "normal" string parsing if c in char_mapping: # character is special character? @@ -141,9 +137,7 @@ def __init__(self, s: Optional[str] = None): r.append( acc ) # append accumulated string to parsed result if there was something - r.append( - char_mapping[c] - ) # append special character to parsed result + r.append(char_mapping[c]) # append special character to parsed result acc = "" # accumulation reset else: # characters without special meaning aren't accumulated acc += c @@ -231,9 +225,7 @@ def __getitem__(self, idx: Union[int, slice]) -> "SigmaString": i += 1 - if ( - len(result) == 0 - ): # Special case: start begins after string - return empty string + if len(result) == 0: # Special case: start begins after string - return empty string return SigmaString("") else: # Return calculated result s = SigmaString() @@ -249,9 +241,7 @@ def insert_placeholders(self) -> "SigmaString": for part in self.s: # iterate over all parts and... if isinstance(part, str): # ...search in strings... lastpos = 0 - for m in re.finditer( - "(?\\w+)%", part - ): # ...for placeholders + for m in re.finditer("(?\\w+)%", part): # ...for placeholders s = part[lastpos : m.start()].replace("\\%", "%") if s != "": res.append( @@ -272,9 +262,7 @@ def insert_placeholders(self) -> "SigmaString": return self - def replace_with_placeholder( - self, regex: Pattern, placeholder_name: str - ) -> "SigmaString": + def replace_with_placeholder(self, regex: Pattern, placeholder_name: str) -> "SigmaString": """ Replace all occurrences of string part matching regular expression with placeholder. @@ -330,9 +318,7 @@ def _merge_strs(self) -> "SigmaString": self.s = tuple(res) return self - def __add__( - self, other: Union["SigmaString", str, SpecialChars, Placeholder] - ) -> "SigmaString": + def __add__(self, other: Union["SigmaString", str, SpecialChars, Placeholder]) -> "SigmaString": s = self.__class__() if isinstance(other, self.__class__): s.s = self.s + other.s @@ -361,9 +347,7 @@ def __eq__(self, other: Union["SigmaString", str]) -> bool: ) def __str__(self) -> str: - return "".join( - s if isinstance(s, str) else special_char_mapping[s] for s in self.s - ) + return "".join(s if isinstance(s, str) else special_char_mapping[s] for s in self.s) def __repr__(self) -> str: return str(self.s) @@ -436,9 +420,7 @@ def contains_placeholder( def replace_placeholders( self, - callback: Callable[ - [Placeholder], Iterator[Union[str, SpecialChars, Placeholder]] - ], + callback: Callable[[Placeholder], Iterator[Union[str, SpecialChars, Placeholder]]], ) -> List["SigmaString"]: """ Iterate over all placeholders and call the callback for each one. The callback is called with the placeholder instance @@ -458,9 +440,7 @@ def replace_placeholders( s = self.s for i in range(len(s)): - if isinstance( - s[i], Placeholder - ): # Placeholder instance at index, do replacement + if isinstance(s[i], Placeholder): # Placeholder instance at index, do replacement prefix = SigmaString() prefix.s = s[:i] placeholder = s[i] @@ -504,9 +484,7 @@ def convert( of these characters in a string will raise a SigmaValueError. """ s = "" - escaped_chars = frozenset( - (wildcard_multi or "") + (wildcard_single or "") + add_escaped - ) + escaped_chars = frozenset((wildcard_multi or "") + (wildcard_single or "") + add_escaped) for c in self: if isinstance(c, str): # c is plain character @@ -648,16 +626,12 @@ def escape( pos = [ # determine positions of matches in regular expression m.start() for m in re.finditer(r, self.regexp) ] - ranges = zip( - [None, *pos], [*pos, None] - ) # string chunk ranges with escapes in between + ranges = zip([None, *pos], [*pos, None]) # string chunk ranges with escapes in between ranges = list(ranges) if flag_prefix and self.flags: prefix = ( - "(?" - + "".join(sorted((self.sigma_to_re_flag[flag] for flag in self.flags))) - + ")" + "(?" + "".join(sorted((self.sigma_to_re_flag[flag] for flag in self.flags))) + ")" ) else: prefix = "" @@ -678,9 +652,7 @@ def __post_init__(self): try: self.network = ip_network(self.cidr) except ValueError as e: - raise SigmaTypeError( - "Invalid CIDR expression: " + str(e), source=self.source - ) + raise SigmaTypeError("Invalid CIDR expression: " + str(e), source=self.source) def expand( self, @@ -713,9 +685,7 @@ def expand( if wildcard_group == 0: # Not all groups are static, add wildcard patterns.append(wildcard) elif wildcard_group < 4: # Not all groups are static, add wildcard - patterns.append( - ".".join(subnet_groups[:wildcard_group]) + "." + wildcard - ) + patterns.append(".".join(subnet_groups[:wildcard_group]) + "." + wildcard) else: # /32 - no wildcard is set patterns.append( str(subnet.network_address) @@ -792,9 +762,7 @@ def __post_init__(self): if not isinstance(self.expr, str): raise SigmaTypeError("SigmaQueryExpression expression must be a string") if not isinstance(self.id, str): - raise SigmaTypeError( - "SigmaQueryExpression placeholder identifier must be a string" - ) + raise SigmaTypeError("SigmaQueryExpression placeholder identifier must be a string") def __str__(self): return self.expr diff --git a/sigma/validation.py b/sigma/validation.py index 12529f56..470e622d 100644 --- a/sigma/validation.py +++ b/sigma/validation.py @@ -30,9 +30,7 @@ def __init__( self.exclusions = defaultdict(set, exclusions) @classmethod - def from_dict( - cls, d: Dict, validators: Dict[str, SigmaRuleValidator] - ) -> "SigmaValidator": + def from_dict(cls, d: Dict, validators: Dict[str, SigmaRuleValidator]) -> "SigmaValidator": """ Instantiate SigmaValidator from dict definition. The dict should have the following elements: @@ -78,9 +76,7 @@ def from_dict( exclusion_name ] # main purpose of the generators: resolve identifiers into classes for exclusion_name in ( - rule_exclusions - if isinstance(rule_exclusions, list) - else [rule_exclusions] + rule_exclusions if isinstance(rule_exclusions, list) else [rule_exclusions] ) } for rule_id, rule_exclusions in d.get("exclusions", dict()).items() @@ -112,9 +108,7 @@ def validate_rule(self, rule: SigmaRule) -> List[SigmaValidationIssue]: issues: List[SigmaValidationIssue] = [] exclusions = self.exclusions[rule.id] for validator in self.validators: - if ( - not validator.__class__ in exclusions - ): # Skip if validator is excluded for this rule + if not validator.__class__ in exclusions: # Skip if validator is excluded for this rule issues.extend(validator.validate(rule)) return issues @@ -125,9 +119,7 @@ def finalize(self) -> List[SigmaValidationIssue]: :return: a list of all issues emitted by rule validators on finalization. :rtype: List[SigmaValidationIssue] """ - return [ - issue for validator in self.validators for issue in validator.finalize() - ] + return [issue for validator in self.validators for issue in validator.finalize()] def validate_rules(self, rules: Iterator[SigmaRule]) -> List[SigmaValidationIssue]: """ @@ -139,6 +131,4 @@ def validate_rules(self, rules: Iterator[SigmaRule]) -> List[SigmaValidationIssu :return: A list of SigmaValidationIssue objects describing potential issues. :rtype: List[SigmaValidationIssue] """ - return [ - issue for rule in rules for issue in self.validate_rule(rule) - ] + self.finalize() + return [issue for rule in rules for issue in self.validate_rule(rule)] + self.finalize() diff --git a/sigma/validators/base.py b/sigma/validators/base.py index c4ed1e79..6775f195 100644 --- a/sigma/validators/base.py +++ b/sigma/validators/base.py @@ -46,9 +46,7 @@ def __post_init__(self): def __str__(self): rules = ", ".join( [ - str(rule.source) - if rule.source is not None - else str(rule.id) or rule.title + str(rule.source) if rule.source is not None else str(rule.id) or rule.title for rule in self.rules ] ) diff --git a/sigma/validators/core/condition.py b/sigma/validators/core/condition.py index 600d77b7..3255e43e 100644 --- a/sigma/validators/core/condition.py +++ b/sigma/validators/core/condition.py @@ -13,9 +13,7 @@ @dataclass class DanglingDetectionIssue(SigmaValidationIssue): - description: ClassVar[ - str - ] = "Rule defines detection that is not referenced from condition" + description: ClassVar[str] = "Rule defines detection that is not referenced from condition" severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.HIGH detection_name: str @@ -40,13 +38,8 @@ def condition_referenced_ids( """ if isinstance(cond, ConditionIdentifier): # Only one id referenced. return {cond.identifier} - elif isinstance( - cond, ConditionSelector - ): # Resolve all referenced ids and return - return { - cond.identifier - for cond in cond.resolve_referenced_detections(detections) - } + elif isinstance(cond, ConditionSelector): # Resolve all referenced ids and return + return {cond.identifier for cond in cond.resolve_referenced_detections(detections)} elif isinstance(cond, ConditionItem): # Traverse into subconditions ids = set() for arg in cond.args: @@ -62,14 +55,9 @@ def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]: referenced_ids = set() for condition in rule.detection.parsed_condition: parsed_condition = condition.parse(False) - referenced_ids.update( - self.condition_referenced_ids(parsed_condition, rule.detection) - ) + referenced_ids.update(self.condition_referenced_ids(parsed_condition, rule.detection)) - return [ - DanglingDetectionIssue([rule], name) - for name in detection_names - referenced_ids - ] + return [DanglingDetectionIssue([rule], name) for name in detection_names - referenced_ids] @dataclass @@ -96,9 +84,7 @@ class AllOfThemConditionIssue(SigmaValidationIssue): description: ClassVar[ str ] = "Rule contains discouraged 'all of them' condition, use 'all of selection*' instead." - severity: ClassVar[ - SigmaValidationIssueSeverity - ] = SigmaValidationIssueSeverity.MEDIUM + severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM class AllOfThemConditionValidator(SigmaRuleValidator): @@ -107,12 +93,7 @@ class AllOfThemConditionValidator(SigmaRuleValidator): re_all_of_them: ClassVar[Pattern] = re.compile("all\\s+of\\s+them") def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]: - if any( - [ - self.re_all_of_them.search(condition) - for condition in rule.detection.condition - ] - ): + if any([self.re_all_of_them.search(condition) for condition in rule.detection.condition]): return [AllOfThemConditionIssue([rule])] else: return [] diff --git a/sigma/validators/core/logsources.py b/sigma/validators/core/logsources.py index 4a07b5aa..157b7dd7 100644 --- a/sigma/validators/core/logsources.py +++ b/sigma/validators/core/logsources.py @@ -10,9 +10,7 @@ ) from sigma.rule import SigmaLogSource -specific_to_generic_logsource_mapping: Dict[ - str, Tuple[SigmaLogSource, Dict[int, str]] -] = { +specific_to_generic_logsource_mapping: Dict[str, Tuple[SigmaLogSource, Dict[int, str]]] = { # "Sysmon": (SigmaLogSource(None, None, "sysmon"), { SigmaLogSource(None, "windows", "sysmon"): { 1: "process_creation", @@ -82,9 +80,7 @@ def validate_detection_item( rules=[self.rule], logsource=self.logsource, event_id=event_id.number, - generic_logsource=SigmaLogSource( - self.eventid_mappings[event_id.number] - ), + generic_logsource=SigmaLogSource(self.eventid_mappings[event_id.number]), ) for event_id in detection_item.value if isinstance(event_id, SigmaNumber) diff --git a/sigma/validators/core/metadata.py b/sigma/validators/core/metadata.py index be11985a..f88c3089 100644 --- a/sigma/validators/core/metadata.py +++ b/sigma/validators/core/metadata.py @@ -48,7 +48,5 @@ def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]: def finalize(self) -> List[SigmaValidationIssue]: return [ - IdentifierCollisionIssue(rules, id) - for id, rules in self.ids.items() - if len(rules) > 1 + IdentifierCollisionIssue(rules, id) for id, rules in self.ids.items() if len(rules) > 1 ] diff --git a/sigma/validators/core/modifiers.py b/sigma/validators/core/modifiers.py index dcdd1e6d..6f4a977b 100644 --- a/sigma/validators/core/modifiers.py +++ b/sigma/validators/core/modifiers.py @@ -36,9 +36,7 @@ class Base64OffsetWithoutContainsModifierIssue(SigmaValidationIssue): @dataclass class ModifierAppliedMultipleIssue(SigmaValidationIssue): description: ClassVar[str] = "Modifiers shouldn't be applied multiple times" - severity: ClassVar[ - SigmaValidationIssueSeverity - ] = SigmaValidationIssueSeverity.MEDIUM + severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM detection_item: SigmaDetectionItem modifiers: Set[Type[SigmaModifier]] @@ -64,9 +62,7 @@ def validate_detection_item( SigmaBase64OffsetModifier in detection_item.modifiers and SigmaContainsModifier not in detection_item.modifiers ): - issues.append( - Base64OffsetWithoutContainsModifierIssue([self.rule], detection_item) - ) + issues.append(Base64OffsetWithoutContainsModifierIssue([self.rule], detection_item)) # Check for multiple appliance of modifiers mod_count = Counter(detection_item.modifiers) @@ -77,9 +73,7 @@ def validate_detection_item( } if multiple_modifiers: issues.append( - ModifierAppliedMultipleIssue( - [self.rule], detection_item, multiple_modifiers - ) + ModifierAppliedMultipleIssue([self.rule], detection_item, multiple_modifiers) ) return issues diff --git a/sigma/validators/core/tags.py b/sigma/validators/core/tags.py index 2595ce62..ae3c0a4c 100644 --- a/sigma/validators/core/tags.py +++ b/sigma/validators/core/tags.py @@ -19,9 +19,7 @@ @dataclass class InvalidATTACKTagIssue(SigmaValidationIssue): description: ClassVar[str] = "Invalid MITRE ATT&CK tagging" - severity: ClassVar[ - SigmaValidationIssueSeverity - ] = SigmaValidationIssueSeverity.MEDIUM + severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM tag: SigmaRuleTag @@ -30,14 +28,9 @@ class ATTACKTagValidator(SigmaTagValidator): def __init__(self) -> None: self.allowed_tags = ( - { - tactic.lower().replace("-", "_") - for tactic in mitre_attack_tactics.values() - } + {tactic.lower().replace("-", "_") for tactic in mitre_attack_tactics.values()} .union({technique.lower() for technique in mitre_attack_techniques.keys()}) - .union( - {intrusion_set.lower() for intrusion_set in mitre_attack_intrusion_sets} - ) + .union({intrusion_set.lower() for intrusion_set in mitre_attack_intrusion_sets}) .union({software.lower() for software in mitre_attack_software}) ) @@ -50,9 +43,7 @@ def validate_tag(self, tag: SigmaRuleTag) -> List[SigmaValidationIssue]: @dataclass class InvalidTLPTagIssue(SigmaValidationIssue): description: ClassVar[str] = "Invalid TLP tagging" - severity: ClassVar[ - SigmaValidationIssueSeverity - ] = SigmaValidationIssueSeverity.MEDIUM + severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM tag: SigmaRuleTag @@ -91,17 +82,13 @@ class TLPv2TagValidator(TLPTagValidatorBase): class TLPTagValidator(TLPTagValidatorBase): """Validation of TLP tags from all versions of the TLP standard.""" - allowed_tags: Set[str] = TLPv1TagValidator.allowed_tags.union( - TLPv2TagValidator.allowed_tags - ) + allowed_tags: Set[str] = TLPv1TagValidator.allowed_tags.union(TLPv2TagValidator.allowed_tags) @dataclass class DuplicateTagIssue(SigmaValidationIssue): description: ClassVar[str] = "The same tag appears mutliple times" - severity: ClassVar[ - SigmaValidationIssueSeverity - ] = SigmaValidationIssueSeverity.MEDIUM + severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM tag: SigmaRuleTag @@ -110,6 +97,4 @@ class DuplicateTagValidator(SigmaRuleValidator): def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]: tags = Counter(rule.tags) - return [ - DuplicateTagIssue([rule], tag) for tag, count in tags.items() if count > 1 - ] + return [DuplicateTagIssue([rule], tag) for tag, count in tags.items() if count > 1] diff --git a/sigma/validators/core/values.py b/sigma/validators/core/values.py index ebb89870..297e1abf 100644 --- a/sigma/validators/core/values.py +++ b/sigma/validators/core/values.py @@ -74,9 +74,7 @@ class ControlCharacterValidator(SigmaStringValueValidator): """ def validate_value(self, value: SigmaString) -> List[SigmaValidationIssue]: - if any( - (ord(c) < 31 for s in value.s for c in (s if isinstance(s, str) else "")) - ): + if any((ord(c) < 31 for s in value.s for c in (s if isinstance(s, str) else ""))): return [ControlCharacterIssue([self.rule], value)] else: return [] @@ -129,9 +127,7 @@ def validate_detection_item( ) and SigmaContainsModifier not in detection_item.modifiers ): - return [ - WildcardsInsteadOfContainsModifierIssue([self.rule], detection_item) - ] + return [WildcardsInsteadOfContainsModifierIssue([self.rule], detection_item)] elif ( all( ( diff --git a/tests/test_collection.py b/tests/test_collection.py index 3dbc6138..21024f2f 100644 --- a/tests/test_collection.py +++ b/tests/test_collection.py @@ -22,9 +22,7 @@ def test_single_rule(): }, } - assert SigmaCollection.from_dicts([rule]) == SigmaCollection( - [SigmaRule.from_dict(rule)] - ) + assert SigmaCollection.from_dicts([rule]) == SigmaCollection([SigmaRule.from_dict(rule)]) def test_merge(): @@ -151,9 +149,7 @@ def test_action_reset(): ] ) assert ( - len(c) == 1 - and c[0].title == "Test" - and c[0].logsource == SigmaLogSource(service="testsvc") + len(c) == 1 and c[0].title == "Test" and c[0].logsource == SigmaLogSource(service="testsvc") ) @@ -256,9 +252,7 @@ def test_load_ruleset_path(): def test_load_ruleset_with_error(): - with pytest.raises( - SigmaModifierError, match="Unknown modifier.*test_rule_with_error.yml" - ): + with pytest.raises(SigmaModifierError, match="Unknown modifier.*test_rule_with_error.yml"): SigmaCollection.load_ruleset([Path("tests/files/ruleset_with_errors")]) @@ -275,11 +269,7 @@ def onbeforeload(p): return p assert ( - len( - SigmaCollection.load_ruleset( - ["tests/files/ruleset"], on_beforeload=onbeforeload - ).rules - ) + len(SigmaCollection.load_ruleset(["tests/files/ruleset"], on_beforeload=onbeforeload).rules) == 1 ) @@ -292,13 +282,8 @@ def onload(p, sc): sc.rules[0].title = "changed" return sc - sigma_collection = SigmaCollection.load_ruleset( - ["tests/files/ruleset"], on_load=onload - ) - assert ( - len(sigma_collection.rules) == 1 - and sigma_collection.rules[0].title == "changed" - ) + sigma_collection = SigmaCollection.load_ruleset(["tests/files/ruleset"], on_load=onload) + assert len(sigma_collection.rules) == 1 and sigma_collection.rules[0].title == "changed" def test_index_rule_by_position(ruleset): diff --git a/tests/test_conditions.py b/tests/test_conditions.py index c3db8ba0..8bfb7d56 100644 --- a/tests/test_conditions.py +++ b/tests/test_conditions.py @@ -183,9 +183,9 @@ def test_and(sigma_simple_detections): def test_not(sigma_simple_detections): - assert SigmaCondition( - "not detection1", sigma_simple_detections - ).parsed == ConditionNOT([ConditionValueExpression(SigmaString("val1"))]) + assert SigmaCondition("not detection1", sigma_simple_detections).parsed == ConditionNOT( + [ConditionValueExpression(SigmaString("val1"))] + ) def test_3or(sigma_simple_detections): @@ -228,17 +228,10 @@ def test_precedence_parent_chain_condition_classes(sigma_simple_detections): sigma_simple_detections, ).parsed assert ( - parsed.args[0].args[0].parent_chain_condition_classes() - == [ConditionAND, ConditionOR] - and parsed.args[0] # detection1 - .args[1] - .args[0] - .parent_chain_condition_classes() + parsed.args[0].args[0].parent_chain_condition_classes() == [ConditionAND, ConditionOR] + and parsed.args[0].args[1].args[0].parent_chain_condition_classes() # detection1 == [ConditionNOT, ConditionAND, ConditionOR] - and parsed.args[1] # detection2 - .args[0] - .args[0] - .parent_chain_condition_classes() + and parsed.args[1].args[0].args[0].parent_chain_condition_classes() # detection2 == [ConditionNOT, ConditionAND, ConditionOR] and parsed.args[1].args[1].parent_chain_condition_classes() # detection3 == [ConditionAND, ConditionOR] # detection_4 @@ -309,9 +302,7 @@ def test_precedence_parenthesis_parent_chain_condition_classes(sigma_simple_dete def test_selector_1(sigma_simple_detections): - assert SigmaCondition( - "1 of detection*", sigma_simple_detections - ).parsed == ConditionOR( + assert SigmaCondition("1 of detection*", sigma_simple_detections).parsed == ConditionOR( [ ConditionValueExpression(SigmaString("val1")), ConditionValueExpression(SigmaString("val2")), @@ -345,9 +336,7 @@ def test_selector_1_of_them(sigma_simple_detections): def test_selector_any(sigma_simple_detections): - assert SigmaCondition( - "any of detection*", sigma_simple_detections - ).parsed == ConditionOR( + assert SigmaCondition("any of detection*", sigma_simple_detections).parsed == ConditionOR( [ ConditionValueExpression(SigmaString("val1")), ConditionValueExpression(SigmaString("val2")), @@ -370,9 +359,7 @@ def test_selector_any_of_them(sigma_simple_detections): def test_selector_all(sigma_simple_detections): - assert SigmaCondition( - "all of detection*", sigma_simple_detections - ).parsed == ConditionAND( + assert SigmaCondition("all of detection*", sigma_simple_detections).parsed == ConditionAND( [ ConditionValueExpression(SigmaString("val1")), ConditionValueExpression(SigmaString("val2")), @@ -383,9 +370,7 @@ def test_selector_all(sigma_simple_detections): def test_selector_all_of_them(sigma_simple_detections): - assert SigmaCondition( - "all of them", sigma_simple_detections - ).parsed == ConditionAND( + assert SigmaCondition("all of them", sigma_simple_detections).parsed == ConditionAND( [ ConditionValueExpression(SigmaString("val1")), ConditionValueExpression(SigmaString("val2")), @@ -435,9 +420,7 @@ def test_field_value_detection(sigma_detections): def test_field_valuelist_with_wildcards_detection(sigma_detections): - assert SigmaCondition( - "field-valuelist-wildcards", sigma_detections - ).parsed == ConditionOR( + assert SigmaCondition("field-valuelist-wildcards", sigma_detections).parsed == ConditionOR( [ ConditionFieldEqualsValueExpression("field", SigmaString("simple-value")), ConditionFieldEqualsValueExpression("field", SigmaString("*wildcards*")), @@ -446,14 +429,10 @@ def test_field_valuelist_with_wildcards_detection(sigma_detections): def test_field_valuelist_with_regex_detection(sigma_detections): - assert SigmaCondition( - "field-valuelist-regex", sigma_detections - ).parsed == ConditionOR( + assert SigmaCondition("field-valuelist-regex", sigma_detections).parsed == ConditionOR( [ ConditionFieldEqualsValueExpression("field", SigmaString("simple-value")), - ConditionFieldEqualsValueExpression( - "field", SigmaRegularExpression("reg.*ex") - ), + ConditionFieldEqualsValueExpression("field", SigmaRegularExpression("reg.*ex")), ] ) @@ -495,9 +474,7 @@ def test_invalid_conditions(condition, sigma_simple_detections): def test_deprecated_pipe_syntax(sigma_simple_detections): with pytest.raises(SigmaConditionError, match="deprecated"): - SigmaCondition( - "detection | count() by src_ip > 50", sigma_simple_detections - ).parsed + SigmaCondition("detection | count() by src_ip > 50", sigma_simple_detections).parsed def test_and_condition_has_parent(sigma_simple_detections): diff --git a/tests/test_conversion_base.py b/tests/test_conversion_base.py index f680b1ab..a72a7658 100644 --- a/tests/test_conversion_base.py +++ b/tests/test_conversion_base.py @@ -699,9 +699,7 @@ def test_convert_query_expr(): pipeline = ProcessingPipeline( [ ProcessingItem( - QueryExpressionPlaceholderTransformation( - expression="{field} in list({id})" - ) + QueryExpressionPlaceholderTransformation(expression="{field} in list({id})") ) ] ) @@ -729,11 +727,7 @@ def test_convert_query_expr(): def test_convert_query_expr_unbound(): pipeline = ProcessingPipeline( - [ - ProcessingItem( - QueryExpressionPlaceholderTransformation(expression="_ in list({id})") - ) - ] + [ProcessingItem(QueryExpressionPlaceholderTransformation(expression="_ in list({id})"))] ) backend = TextQueryTestBackend(pipeline) assert ( @@ -796,9 +790,7 @@ def test_convert_value_regex_flag_prefix(test_backend): """ ) ) - == [ - "mappedA=/(?ims)pat.*tern\\/foo\\bar/ and 'field A'=/(?ims)pat.*te\\\\rn\\/foo\\bar/" - ] + == ["mappedA=/(?ims)pat.*tern\\/foo\\bar/ and 'field A'=/(?ims)pat.*te\\\\rn\\/foo\\bar/"] ) @@ -823,9 +815,7 @@ def test_convert_value_regex_flag_explicit(test_backend): """ ) ) - == [ - "mappedA=/pat.*tern\\/foo\\bar/ims and 'field A'=/pat.*te\\\\rn\\/foo\\bar/ims" - ] + == ["mappedA=/pat.*tern\\/foo\\bar/ims and 'field A'=/pat.*te\\\\rn\\/foo\\bar/ims"] ) @@ -1039,9 +1029,7 @@ def test_convert_value_cidr_wildcard_native_ipv4(test_backend): """ ) ) - == [ - "cidrmatch('mappedA', \"192.168.0.0/14\") and cidrmatch('field A', \"192.168.0.0/14\")" - ] + == ["cidrmatch('mappedA', \"192.168.0.0/14\") and cidrmatch('field A', \"192.168.0.0/14\")"] ) @@ -1789,9 +1777,7 @@ def test_convert_unbound_values(test_backend): def test_convert_invalid_unbound_bool(test_backend): - with pytest.raises( - SigmaValueError, match="Boolean values can't appear as standalone" - ): + with pytest.raises(SigmaValueError, match="Boolean values can't appear as standalone"): test_backend.convert( SigmaCollection.from_yaml( """ @@ -1992,9 +1978,7 @@ def test_convert_precedence(test_backend): """ ) ) - == [ - '(mappedA="value1" or mappedB="value2") and not (fieldC="value3" and fieldD="value4")' - ] + == ['(mappedA="value1" or mappedB="value2") and not (fieldC="value3" and fieldD="value4")'] ) @@ -2077,9 +2061,7 @@ def test_convert_list_cidr_wildcard_none(test_backend): """ ) ) - == [ - "cidrmatch('mappedA', \"192.168.0.0/14\") or cidrmatch('mappedA', \"10.10.10.0/24\")" - ] + == ["cidrmatch('mappedA', \"192.168.0.0/14\") or cidrmatch('mappedA', \"10.10.10.0/24\")"] ) @@ -2209,38 +2191,40 @@ def test_convert_dropped_detection_item_or(): quote_always_escape_quote_config = ("'", None, True, "\\", True, None) quote_only_config = ("'", re.compile("^.*\s"), False, None, False, None) escape_only_config = (None, None, True, "\\", False, re.compile("[\s]")) -quoting_escaping_testcases = { # test name -> quoting/escaping parameters (see test below), test field name, expected result - "escape_quote_nothing": (quote_escape_config, "foo.bar", "foo.bar"), - "escape_quote_quoteonly": (quote_escape_config, "foo bar", "'foo bar'"), - "escape_quote_quote_and_escape": ( - quote_escape_config, - "foo bar;foo;bar", - "'foo bar\\;foo\\;bar'", - ), - "escape_quote_escapeonly": ( - quote_escape_config, - "foobar;foo;bar", - "foobar\\;foo\\;bar", - ), - "escape_quote_escapeonly_start_end": ( - quote_escape_config, - ";foo;bar;", - "\\;foo\\;bar\\;", - ), - "escape_quote_quote_and_escape_quotechar": ( - quote_escape_config, - "foo bar';foo;bar", - "'foo bar\\'\\;foo\\;bar'", - ), - "quote_always_escape_quote_only": ( - quote_always_escape_quote_config, - "foo 'bar'", - "'foo \\'bar\\''", - ), - "quote_only": (quote_only_config, "foo bar", "'foo bar'"), - "escape_only": (escape_only_config, "foo bar", "foo\\ bar"), - "escape_only_start_end": (escape_only_config, " foo bar ", "\\ foo\\ bar\\ "), -} +quoting_escaping_testcases = ( + { # test name -> quoting/escaping parameters (see test below), test field name, expected result + "escape_quote_nothing": (quote_escape_config, "foo.bar", "foo.bar"), + "escape_quote_quoteonly": (quote_escape_config, "foo bar", "'foo bar'"), + "escape_quote_quote_and_escape": ( + quote_escape_config, + "foo bar;foo;bar", + "'foo bar\\;foo\\;bar'", + ), + "escape_quote_escapeonly": ( + quote_escape_config, + "foobar;foo;bar", + "foobar\\;foo\\;bar", + ), + "escape_quote_escapeonly_start_end": ( + quote_escape_config, + ";foo;bar;", + "\\;foo\\;bar\\;", + ), + "escape_quote_quote_and_escape_quotechar": ( + quote_escape_config, + "foo bar';foo;bar", + "'foo bar\\'\\;foo\\;bar'", + ), + "quote_always_escape_quote_only": ( + quote_always_escape_quote_config, + "foo 'bar'", + "'foo \\'bar\\''", + ), + "quote_only": (quote_only_config, "foo bar", "'foo bar'"), + "escape_only": (escape_only_config, "foo bar", "foo\\ bar"), + "escape_only_start_end": (escape_only_config, " foo bar ", "\\ foo\\ bar\\ "), + } +) @pytest.mark.parametrize( diff --git a/tests/test_conversion_state.py b/tests/test_conversion_state.py index 80b5996e..60c872a7 100644 --- a/tests/test_conversion_state.py +++ b/tests/test_conversion_state.py @@ -18,8 +18,6 @@ def test_conversion_state_empty_has_deferred(conversion_state: ConversionState): assert conversion_state.has_deferred() == False -def test_conversion_state_has_deferred( - conversion_state: ConversionState, deferred_expression -): +def test_conversion_state_has_deferred(conversion_state: ConversionState, deferred_expression): conversion_state.add_deferred_expression(deferred_expression) assert conversion_state.has_deferred() == True diff --git a/tests/test_exceptions.py b/tests/test_exceptions.py index 95504729..e3fdac52 100644 --- a/tests/test_exceptions.py +++ b/tests/test_exceptions.py @@ -36,9 +36,7 @@ def test_sigmalocation_file(sigma_rule_location): def test_sigmalocation_file_with_line(sigma_rule_location_with_line): locstr = str(sigma_rule_location_with_line) - assert locstr == "/path/to/sigma_rule.yml:5" or locstr.endswith( - "\\path\\to\\sigma_rule.yml:5" - ) + assert locstr == "/path/to/sigma_rule.yml:5" or locstr.endswith("\\path\\to\\sigma_rule.yml:5") def test_sigmalocation_file_with_line_and_char(sigma_rule_location_with_line_and_char): @@ -49,9 +47,7 @@ def test_sigmalocation_file_with_line_and_char(sigma_rule_location_with_line_and def test_exception_with_location(sigma_rule_location_with_line_and_char): - errstr = str( - SigmaDetectionError("Test", source=sigma_rule_location_with_line_and_char) - ) + errstr = str(SigmaDetectionError("Test", source=sigma_rule_location_with_line_and_char)) assert errstr == "Test in /path/to/sigma_rule.yml:5:3" or re.match( "Test in \\w:\\\\path\\\\to\\\\sigma_rule.yml:5:3", errstr ) diff --git a/tests/test_modifiers.py b/tests/test_modifiers.py index a5248e7a..0ef973a5 100644 --- a/tests/test_modifiers.py +++ b/tests/test_modifiers.py @@ -109,70 +109,68 @@ def test_typecheck_sequence_wrong(dummy_sequence_modifier): def test_contains_nowildcards(dummy_detection_item): - assert SigmaContainsModifier(dummy_detection_item, []).apply( - SigmaString("foobar") - ) == [SigmaString("*foobar*")] + assert SigmaContainsModifier(dummy_detection_item, []).apply(SigmaString("foobar")) == [ + SigmaString("*foobar*") + ] def test_contains_leading_wildcard(dummy_detection_item): - assert SigmaContainsModifier(dummy_detection_item, []).apply( - SigmaString("*foobar") - ) == [SigmaString("*foobar*")] + assert SigmaContainsModifier(dummy_detection_item, []).apply(SigmaString("*foobar")) == [ + SigmaString("*foobar*") + ] def test_contains_trailing_wildcard(dummy_detection_item): - assert SigmaContainsModifier(dummy_detection_item, []).apply( - SigmaString("foobar*") - ) == [SigmaString("*foobar*")] + assert SigmaContainsModifier(dummy_detection_item, []).apply(SigmaString("foobar*")) == [ + SigmaString("*foobar*") + ] def test_contains_leading_and_trailing_wildcard(dummy_detection_item): - assert SigmaContainsModifier(dummy_detection_item, []).apply( + assert SigmaContainsModifier(dummy_detection_item, []).apply(SigmaString("*foobar*")) == [ SigmaString("*foobar*") - ) == [SigmaString("*foobar*")] + ] def test_startswith_nowildcards(dummy_detection_item): - assert SigmaStartswithModifier(dummy_detection_item, []).apply( - SigmaString("foobar") - ) == [SigmaString("foobar*")] + assert SigmaStartswithModifier(dummy_detection_item, []).apply(SigmaString("foobar")) == [ + SigmaString("foobar*") + ] def test_startswith_trailing_wildcard(dummy_detection_item): - assert SigmaStartswithModifier(dummy_detection_item, []).apply( + assert SigmaStartswithModifier(dummy_detection_item, []).apply(SigmaString("foobar*")) == [ SigmaString("foobar*") - ) == [SigmaString("foobar*")] + ] def test_endswith_nowildcards(dummy_detection_item): - assert SigmaEndswithModifier(dummy_detection_item, []).apply( - SigmaString("foobar") - ) == [SigmaString("*foobar")] + assert SigmaEndswithModifier(dummy_detection_item, []).apply(SigmaString("foobar")) == [ + SigmaString("*foobar") + ] def test_endswith_trailing_wildcard(dummy_detection_item): - assert SigmaEndswithModifier(dummy_detection_item, []).apply( + assert SigmaEndswithModifier(dummy_detection_item, []).apply(SigmaString("*foobar")) == [ SigmaString("*foobar") - ) == [SigmaString("*foobar")] + ] def test_base64(dummy_detection_item): - assert SigmaBase64Modifier(dummy_detection_item, []).apply( - SigmaString("foobar") - ) == [SigmaString("Zm9vYmFy")] + assert SigmaBase64Modifier(dummy_detection_item, []).apply(SigmaString("foobar")) == [ + SigmaString("Zm9vYmFy") + ] def test_base64_wildcards(dummy_detection_item): with pytest.raises(SigmaValueError, match="wildcards is not allowed.*test.yml"): - SigmaBase64Modifier( - dummy_detection_item, [], SigmaRuleLocation("test.yml") - ).apply(SigmaString("foo*bar")) + SigmaBase64Modifier(dummy_detection_item, [], SigmaRuleLocation("test.yml")).apply( + SigmaString("foo*bar") + ) def test_base64offset(dummy_detection_item): - assert SigmaBase64OffsetModifier(dummy_detection_item, []).apply( - SigmaString("foobar") - ) == [ + assert SigmaBase64OffsetModifier(dummy_detection_item, []).apply(SigmaString("foobar")) == [ SigmaExpansion( [ SigmaString("Zm9vYmFy"), @@ -185,29 +183,29 @@ def test_base64offset(dummy_detection_item): def test_base64offset_wildcards(dummy_detection_item): with pytest.raises(SigmaValueError, match="wildcards is not allowed.*test.yml"): - SigmaBase64OffsetModifier( - dummy_detection_item, [], SigmaRuleLocation("test.yml") - ).apply(SigmaString("foo*bar")) + SigmaBase64OffsetModifier(dummy_detection_item, [], SigmaRuleLocation("test.yml")).apply( + SigmaString("foo*bar") + ) def test_base64offset_re(dummy_detection_item): with pytest.raises(SigmaTypeError, match="incompatible.*type.*test.yml"): - SigmaBase64OffsetModifier( - dummy_detection_item, [], SigmaRuleLocation("test.yml") - ).apply(SigmaRegularExpression("foo.*bar")) + SigmaBase64OffsetModifier(dummy_detection_item, [], SigmaRuleLocation("test.yml")).apply( + SigmaRegularExpression("foo.*bar") + ) def test_wide(dummy_detection_item): - assert SigmaWideModifier(dummy_detection_item, []).apply( - SigmaString("*foobar*") - ) == [SigmaString("*f\x00o\x00o\x00b\x00a\x00r\x00*")] + assert SigmaWideModifier(dummy_detection_item, []).apply(SigmaString("*foobar*")) == [ + SigmaString("*f\x00o\x00o\x00b\x00a\x00r\x00*") + ] def test_wide_noascii(dummy_detection_item): with pytest.raises(SigmaValueError, match="ascii strings.*test.yml"): - SigmaWideModifier( - dummy_detection_item, [], SigmaRuleLocation("test.yml") - ).apply(SigmaString("foobär")) + SigmaWideModifier(dummy_detection_item, [], SigmaRuleLocation("test.yml")).apply( + SigmaString("foobär") + ) def test_windash(dummy_detection_item): @@ -342,9 +340,7 @@ def test_re_startswith_endswith_wildcard(dummy_detection_item): def test_re_with_other(dummy_detection_item): - with pytest.raises( - SigmaValueError, match="only applicable to unmodified values.*test.yml" - ): + with pytest.raises(SigmaValueError, match="only applicable to unmodified values.*test.yml"): SigmaRegularExpressionModifier( dummy_detection_item, [SigmaBase64Modifier], SigmaRuleLocation("test.yml") ).modify(SigmaString("foo?bar.*")) @@ -371,33 +367,25 @@ def test_all(dummy_detection_item): def test_lt(dummy_detection_item): assert SigmaLessThanModifier(dummy_detection_item, []).modify( SigmaNumber(123) - ) == SigmaCompareExpression( - SigmaNumber(123), SigmaCompareExpression.CompareOperators.LT - ) + ) == SigmaCompareExpression(SigmaNumber(123), SigmaCompareExpression.CompareOperators.LT) def test_lte(dummy_detection_item): assert SigmaLessThanEqualModifier(dummy_detection_item, []).modify( SigmaNumber(123) - ) == SigmaCompareExpression( - SigmaNumber(123), SigmaCompareExpression.CompareOperators.LTE - ) + ) == SigmaCompareExpression(SigmaNumber(123), SigmaCompareExpression.CompareOperators.LTE) def test_gt(dummy_detection_item): assert SigmaGreaterThanModifier(dummy_detection_item, []).modify( SigmaNumber(123) - ) == SigmaCompareExpression( - SigmaNumber(123), SigmaCompareExpression.CompareOperators.GT - ) + ) == SigmaCompareExpression(SigmaNumber(123), SigmaCompareExpression.CompareOperators.GT) def test_gte(dummy_detection_item): assert SigmaGreaterThanEqualModifier(dummy_detection_item, []).modify( SigmaNumber(123) - ) == SigmaCompareExpression( - SigmaNumber(123), SigmaCompareExpression.CompareOperators.GTE - ) + ) == SigmaCompareExpression(SigmaNumber(123), SigmaCompareExpression.CompareOperators.GTE) def test_fieldref(dummy_detection_item): @@ -408,23 +396,21 @@ def test_fieldref(dummy_detection_item): def test_fieldref_wildcard(dummy_detection_item): with pytest.raises(SigmaValueError, match="must not contain wildcards"): - SigmaFieldReferenceModifier(dummy_detection_item, []).modify( - SigmaString("field*") - ) + SigmaFieldReferenceModifier(dummy_detection_item, []).modify(SigmaString("field*")) def test_exists(dummy_detection_item): dummy_detection_item.field = "test" - assert SigmaExistsModifier(dummy_detection_item, []).modify( - SigmaBool(True) - ) == SigmaExists(True) + assert SigmaExistsModifier(dummy_detection_item, []).modify(SigmaBool(True)) == SigmaExists( + True + ) def test_exists_without_field(dummy_detection_item): with pytest.raises(SigmaValueError, match="must be applied to field.*test.yml"): - SigmaExistsModifier( - dummy_detection_item, [], SigmaRuleLocation("test.yml") - ).modify(SigmaBool(True)) == SigmaExists(True) + SigmaExistsModifier(dummy_detection_item, [], SigmaRuleLocation("test.yml")).modify( + SigmaBool(True) + ) == SigmaExists(True) def test_exists_with_other(dummy_detection_item): @@ -445,9 +431,11 @@ def test_compare_string(dummy_detection_item): def test_expand(dummy_detection_item): - assert SigmaExpandModifier(dummy_detection_item, []).modify( - SigmaString("test%var%test") - ).s == ("test", Placeholder("var"), "test") + assert SigmaExpandModifier(dummy_detection_item, []).modify(SigmaString("test%var%test")).s == ( + "test", + Placeholder("var"), + "test", + ) def test_cidr(dummy_detection_item): @@ -457,9 +445,7 @@ def test_cidr(dummy_detection_item): def test_cidr_with_other(dummy_detection_item): - with pytest.raises( - SigmaValueError, match="only applicable to unmodified values.*test.yml" - ): + with pytest.raises(SigmaValueError, match="only applicable to unmodified values.*test.yml"): SigmaCIDRModifier( dummy_detection_item, [SigmaBase64Modifier], SigmaRuleLocation("test.yml") ).modify(SigmaString("192.168.1.0/24")) @@ -467,6 +453,6 @@ def test_cidr_with_other(dummy_detection_item): def test_cidr_invalid(dummy_detection_item): with pytest.raises(SigmaTypeError, match="Invalid CIDR expression.*test.yml"): - SigmaCIDRModifier( - dummy_detection_item, [], SigmaRuleLocation("test.yml") - ).modify(SigmaString("192.168.1.1/24")) + SigmaCIDRModifier(dummy_detection_item, [], SigmaRuleLocation("test.yml")).modify( + SigmaString("192.168.1.1/24") + ) diff --git a/tests/test_pipelines_common.py b/tests/test_pipelines_common.py index db72b77d..3424c005 100644 --- a/tests/test_pipelines_common.py +++ b/tests/test_pipelines_common.py @@ -104,9 +104,7 @@ def test_logsource_windows_network_connection_initiated(initiated, result): def test_generate_windows_logsource_items(): - items = generate_windows_logsource_items( - "logsource", "Windows:{source}", "test-{service}" - ) + items = generate_windows_logsource_items("logsource", "Windows:{source}", "test-{service}") assert items[0] == ProcessingItem( identifier="test-security", transformation=AddConditionTransformation({"logsource": "Windows:Security"}), diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 7ed65b83..4998c9f4 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -20,9 +20,7 @@ def test_autodiscover_backends(): - plugins = InstalledSigmaPlugins.autodiscover( - include_pipelines=False, include_validators=False - ) + plugins = InstalledSigmaPlugins.autodiscover(include_pipelines=False, include_validators=False) assert plugins == InstalledSigmaPlugins( backends={ "TextQueryTestBackend": TextQueryTestBackend, @@ -34,9 +32,7 @@ def test_autodiscover_backends(): def test_autodiscover_pipelines(): - plugins = InstalledSigmaPlugins.autodiscover( - include_backends=False, include_validators=False - ) + plugins = InstalledSigmaPlugins.autodiscover(include_backends=False, include_validators=False) assert plugins == InstalledSigmaPlugins( backends=dict(), pipelines={ @@ -49,9 +45,7 @@ def test_autodiscover_pipelines(): def test_autodiscover_validators(): - plugins = InstalledSigmaPlugins.autodiscover( - include_backends=False, include_pipelines=False - ) + plugins = InstalledSigmaPlugins.autodiscover(include_backends=False, include_pipelines=False) assert len(plugins.validators) > 10 @@ -143,12 +137,8 @@ def check_module(name: str) -> bool: def test_sigma_plugin_installation(): plugin_dir = SigmaPluginDirectory.default_plugin_directory() - plugin = plugin_dir.get_plugin_by_uuid( - "4af37b53-f1ec-4567-8017-2fb9315397a1" - ) # Splunk backend - assert not check_module( - "sigma.backends.splunk" - ) # ensure it's not already installed + plugin = plugin_dir.get_plugin_by_uuid("4af37b53-f1ec-4567-8017-2fb9315397a1") # Splunk backend + assert not check_module("sigma.backends.splunk") # ensure it's not already installed plugin.install() assert check_module("sigma.backends.splunk") plugin.uninstall() @@ -202,9 +192,7 @@ def test_sigma_plugin_directory_count(plugin_directory: SigmaPluginDirectory): def test_sigma_plugin_directory_get_by_uuid(plugin_directory: SigmaPluginDirectory): assert ( - plugin_directory.get_plugin_by_uuid( - UUID("09b0cefd-f3d9-49d2-894b-2920e10a9f73") - ).id + plugin_directory.get_plugin_by_uuid(UUID("09b0cefd-f3d9-49d2-894b-2920e10a9f73")).id == "test_pipeline" ) @@ -232,9 +220,7 @@ def test_sigma_plugin_directory_get_by_id(plugin_directory: SigmaPluginDirectory def test_sigma_plugin_directory_get_by_id_not_found( plugin_directory: SigmaPluginDirectory, ): - with pytest.raises( - SigmaPluginNotFoundError, match="Plugin with identifier.*not found" - ): + with pytest.raises(SigmaPluginNotFoundError, match="Plugin with identifier.*not found"): plugin_directory.get_plugin_by_id("not_existing") @@ -263,7 +249,4 @@ def test_sigma_plugin_directory_get_plugins_compatible( sigma_plugin = SigmaPlugin.from_dict(sigma_plugin_dict_incompatible) plugin_directory.register_plugin(sigma_plugin) - assert ( - plugin_directory.get_plugins(compatible_only=True) - < plugin_directory.get_plugins() - ) + assert plugin_directory.get_plugins(compatible_only=True) < plugin_directory.get_plugins() diff --git a/tests/test_processing_conditions.py b/tests/test_processing_conditions.py index 7f22ee00..dcb15343 100644 --- a/tests/test_processing_conditions.py +++ b/tests/test_processing_conditions.py @@ -59,9 +59,7 @@ def test_logsource_match(dummy_processing_pipeline, sigma_rule): def test_logsource_no_match(dummy_processing_pipeline, sigma_rule): - assert not LogsourceCondition( - category="test_category", product="other_product" - ).match( + assert not LogsourceCondition(category="test_category", product="other_product").match( dummy_processing_pipeline, sigma_rule, ) @@ -113,9 +111,7 @@ def test_include_field_condition_match(dummy_processing_pipeline, detection_item ) -def test_include_field_condition_match_nofield( - dummy_processing_pipeline, detection_item_nofield -): +def test_include_field_condition_match_nofield(dummy_processing_pipeline, detection_item_nofield): assert ( IncludeFieldCondition(["field", "otherfield"]).match_field_name( dummy_processing_pipeline, None @@ -278,9 +274,7 @@ def test_detection_item_processing_item_applied( def test_detection_item_processing_item_not_applied( dummy_processing_pipeline, processing_item, detection_item: SigmaDetectionItem ): - assert not DetectionItemProcessingItemAppliedCondition( - processing_item_id="test" - ).match( + assert not DetectionItemProcessingItemAppliedCondition(processing_item_id="test").match( dummy_processing_pipeline, detection_item, ) @@ -289,9 +283,7 @@ def test_detection_item_processing_item_not_applied( @pytest.fixture def pipeline_field_tracking(): pipeline = ProcessingPipeline() - pipeline.track_field_processing_items( - "field1", ["fieldA", "fieldB"], "processing_item" - ) + pipeline.track_field_processing_items("field1", ["fieldA", "fieldB"], "processing_item") return pipeline diff --git a/tests/test_processing_pipeline.py b/tests/test_processing_pipeline.py index abcba0f4..e54568e3 100644 --- a/tests/test_processing_pipeline.py +++ b/tests/test_processing_pipeline.py @@ -45,9 +45,7 @@ def match(self, pipeline: ProcessingPipeline, rule: SigmaRule) -> bool: class DetectionItemConditionTrue(DetectionItemProcessingCondition): dummy: str - def match( - self, pipeline: ProcessingPipeline, detection_item: SigmaDetectionItem - ) -> bool: + def match(self, pipeline: ProcessingPipeline, detection_item: SigmaDetectionItem) -> bool: return True @@ -55,9 +53,7 @@ def match( class DetectionItemConditionFalse(DetectionItemProcessingCondition): dummy: str - def match( - self, pipeline: ProcessingPipeline, detection_item: SigmaDetectionItem - ) -> bool: + def match(self, pipeline: ProcessingPipeline, detection_item: SigmaDetectionItem) -> bool: return False @@ -289,9 +285,7 @@ def test_processingitem_apply(processing_item, dummy_processing_pipeline, sigma_ assert applied and sigma_rule.title == "TestTest" -def test_processingitem_apply_notapplied_all_with_false( - dummy_processing_pipeline, sigma_rule -): +def test_processingitem_apply_notapplied_all_with_false(dummy_processing_pipeline, sigma_rule): processing_item = ProcessingItem( transformation=TransformationAppend(s="Test"), rule_condition_linking=all, @@ -328,9 +322,7 @@ def test_processingitem_apply_negated_false(dummy_processing_pipeline, sigma_rul assert applied and sigma_rule.title == "TestTest" -def test_processingitem_apply_notapplied_all_with_false( - dummy_processing_pipeline, sigma_rule -): +def test_processingitem_apply_notapplied_all_with_false(dummy_processing_pipeline, sigma_rule): processing_item = ProcessingItem( transformation=TransformationAppend(s="Test"), rule_condition_linking=all, @@ -352,10 +344,7 @@ def test_processingitem_match_detection_item(dummy_processing_pipeline, detectio DetectionItemConditionFalse(dummy="test-false"), ], ) - assert ( - processing_item.match_detection_item(dummy_processing_pipeline, detection_item) - == True - ) + assert processing_item.match_detection_item(dummy_processing_pipeline, detection_item) == True def test_processingitem_match_detection_item_all_with_false( @@ -369,10 +358,7 @@ def test_processingitem_match_detection_item_all_with_false( DetectionItemConditionFalse(dummy="test-false"), ], ) - assert ( - processing_item.match_detection_item(dummy_processing_pipeline, detection_item) - == False - ) + assert processing_item.match_detection_item(dummy_processing_pipeline, detection_item) == False def test_processingitem_match_detection_item_any_without_true( @@ -386,10 +372,7 @@ def test_processingitem_match_detection_item_any_without_true( DetectionItemConditionFalse(dummy="test-false"), ], ) - assert ( - processing_item.match_detection_item(dummy_processing_pipeline, detection_item) - == False - ) + assert processing_item.match_detection_item(dummy_processing_pipeline, detection_item) == False def test_processingitem_match_detection_item_negated_true( @@ -402,10 +385,7 @@ def test_processingitem_match_detection_item_negated_true( DetectionItemConditionTrue(dummy="test-true"), ], ) - assert ( - processing_item.match_detection_item(dummy_processing_pipeline, detection_item) - == False - ) + assert processing_item.match_detection_item(dummy_processing_pipeline, detection_item) == False def test_processingitem_match_detection_item_negated_false( @@ -418,9 +398,7 @@ def test_processingitem_match_detection_item_negated_false( DetectionItemConditionFalse(dummy="test-false"), ], ) - assert processing_item.match_detection_item( - dummy_processing_pipeline, detection_item - ) + assert processing_item.match_detection_item(dummy_processing_pipeline, detection_item) def test_processingitem_rule_condition_nolist(): @@ -434,9 +412,7 @@ def test_processingitem_rule_condition_nolist(): def test_processingitem_detection_item_condition_nolist(): with pytest.raises(SigmaTypeError, match="Detection item processing conditions"): ProcessingItem( - detection_item_conditions=DetectionItemProcessingItemAppliedCondition( - "test" - ), + detection_item_conditions=DetectionItemProcessingItemAppliedCondition("test"), transformation=SetStateTransformation("test", True), ) @@ -556,12 +532,8 @@ def test_processingpipeline_error_direct_transofrmations(sigma_rule): def test_processingpipeline_apply(sigma_rule): pipeline = ProcessingPipeline( items=[ - ProcessingItem( - transformation=TransformationPrepend(s="Pre"), identifier="pre" - ), - ProcessingItem( - transformation=TransformationAppend(s="Appended"), identifier="append" - ), + ProcessingItem(transformation=TransformationPrepend(s="Pre"), identifier="pre"), + ProcessingItem(transformation=TransformationAppend(s="Appended"), identifier="append"), ] ) result_rule = pipeline.apply(sigma_rule) @@ -580,9 +552,7 @@ def test_processingpipeline_apply_partial(sigma_rule): rule_conditions=[RuleConditionFalse(dummy="test")], identifier="pre", ), - ProcessingItem( - transformation=TransformationAppend(s="Appended"), identifier="append" - ), + ProcessingItem(transformation=TransformationAppend(s="Appended"), identifier="append"), ] ) result_rule = pipeline.apply(sigma_rule) @@ -595,15 +565,11 @@ def test_processingpipeline_apply_partial(sigma_rule): def test_processingpipeline_field_processing_item_tracking(): pipeline = ProcessingPipeline() - pipeline.track_field_processing_items( - "field1", ["fieldA", "fieldB"], "processing_item_1" - ) + pipeline.track_field_processing_items("field1", ["fieldA", "fieldB"], "processing_item_1") pipeline.track_field_processing_items( "fieldA", ["fieldA", "fieldC", "fieldD"], "processing_item_2" ) - pipeline.track_field_processing_items( - "fieldB", ["fieldD", "fieldE"], "processing_item_3" - ) + pipeline.track_field_processing_items("fieldB", ["fieldD", "fieldE"], "processing_item_3") pipeline.track_field_processing_items("fieldE", ["fieldF"], None) assert pipeline.field_name_applied_ids == { "fieldA": {"processing_item_1", "processing_item_2"}, @@ -613,10 +579,7 @@ def test_processingpipeline_field_processing_item_tracking(): } assert pipeline.field_was_processed_by("fieldF", "processing_item_3") == True assert pipeline.field_was_processed_by("fieldF", "processing_item_2") == False - assert ( - pipeline.field_was_processed_by("nonexistingfield", "processing_item_2") - == False - ) + assert pipeline.field_was_processed_by("nonexistingfield", "processing_item_2") == False assert pipeline.field_was_processed_by(None, "processing_item_3") == False diff --git a/tests/test_processing_resolver.py b/tests/test_processing_resolver.py index 97acc24e..46f7749c 100644 --- a/tests/test_processing_resolver.py +++ b/tests/test_processing_resolver.py @@ -79,9 +79,7 @@ def pipeline_func(): def test_resolve_failed_not_found( processing_pipeline_resolver: ProcessingPipelineResolver, ): - with pytest.raises( - SigmaPipelineNotFoundError, match="pipeline.*notexisting.*not found" - ): + with pytest.raises(SigmaPipelineNotFoundError, match="pipeline.*notexisting.*not found"): processing_pipeline_resolver.resolve_pipeline("notexisting") diff --git a/tests/test_processing_transformations.py b/tests/test_processing_transformations.py index e81b8ffa..d9c07c5b 100644 --- a/tests/test_processing_transformations.py +++ b/tests/test_processing_transformations.py @@ -163,9 +163,9 @@ def test_field_mapping_from_dict(): "multi_mapping_2", ], } - assert FieldMappingTransformation.from_dict( - {"mapping": mapping} - ) == FieldMappingTransformation(mapping) + assert FieldMappingTransformation.from_dict({"mapping": mapping}) == FieldMappingTransformation( + mapping + ) @pytest.fixture @@ -217,9 +217,7 @@ def test_field_mapping(field_mapping_transformation_sigma_rule): def test_field_mapping_tracking(field_mapping_transformation_sigma_rule): transformation, sigma_rule = field_mapping_transformation_sigma_rule - detection_items = ( - sigma_rule.detection.detections["test"].detection_items[0].detection_items - ) + detection_items = sigma_rule.detection.detections["test"].detection_items[0].detection_items updated_detection_items = { detection_item.field: detection_item.was_processed_by("test") for detection_item in detection_items @@ -289,12 +287,8 @@ def test_field_prefix_mapping(dummy_pipeline): SigmaDetectionItem("mapped1.field", [], [SigmaString("value1")]), SigmaDetection( [ - SigmaDetectionItem( - "mapped2a.field", [], [SigmaString("value2")] - ), - SigmaDetectionItem( - "mapped2b.field", [], [SigmaString("value2")] - ), + SigmaDetectionItem("mapped2a.field", [], [SigmaString("value2")]), + SigmaDetectionItem("mapped2b.field", [], [SigmaString("value2")]), ], item_linking=ConditionOR, ), @@ -345,9 +339,7 @@ def test_drop_detection_item_transformation_with_set_state(sigma_rule: SigmaRule ProcessingItem( identifier="test", transformation=SetStateTransformation("state", "test"), - rule_conditions=[ - RuleContainsDetectionItemCondition("field2", "value2") - ], + rule_conditions=[RuleContainsDetectionItemCondition("field2", "value2")], ), ProcessingItem( transformation=DropDetectionItemTransformation(), @@ -392,14 +384,10 @@ def test_drop_detection_item_transformation_all(sigma_rule: SigmaRule, dummy_pip transformation = DropDetectionItemTransformation() processing_item = ProcessingItem( transformation, - field_name_conditions=[ - IncludeFieldCondition(fields=["field1", "field2", "field3"]) - ], + field_name_conditions=[IncludeFieldCondition(fields=["field1", "field2", "field3"])], ) transformation.apply(dummy_pipeline, sigma_rule) - assert ( - sigma_rule.detection.detections["test"].detection_items[0].detection_items == [] - ) + assert sigma_rule.detection.detections["test"].detection_items[0].detection_items == [] @pytest.fixture @@ -411,9 +399,7 @@ def add_fieldname_suffix_transformation(): ) -def test_add_fieldname_suffix( - dummy_pipeline, sigma_rule, add_fieldname_suffix_transformation -): +def test_add_fieldname_suffix(dummy_pipeline, sigma_rule, add_fieldname_suffix_transformation): add_fieldname_suffix_transformation.apply(dummy_pipeline, sigma_rule) assert sigma_rule.detection.detections["test"] == SigmaDetection( [ @@ -463,21 +449,19 @@ def test_add_fieldname_suffix_tracking( identifier="test", ) processing_item.apply(dummy_pipeline, sigma_rule) - detection_items = ( - sigma_rule.detection.detections["test"].detection_items[0].detection_items - ) + detection_items = sigma_rule.detection.detections["test"].detection_items[0].detection_items assert detection_items == [ SigmaDetectionItem("field1.test", [], [SigmaString("value1")]), SigmaDetectionItem("field2", [], [SigmaString("value2")]), SigmaDetectionItem("field3", [], [SigmaString("value3")]), ] - assert [ - detection_item.was_processed_by("test") for detection_item in detection_items - ] == [True, False, False] + assert [detection_item.was_processed_by("test") for detection_item in detection_items] == [ + True, + False, + False, + ] assert sigma_rule.was_processed_by("test") - assert processing_item.transformation.pipeline.field_mappings == { - "field1": {"field1.test"} - } + assert processing_item.transformation.pipeline.field_mappings == {"field1": {"field1.test"}} @pytest.fixture @@ -489,9 +473,7 @@ def add_fieldname_prefix_transformation(): ) -def test_add_fieldname_prefix( - dummy_pipeline, sigma_rule, add_fieldname_prefix_transformation -): +def test_add_fieldname_prefix(dummy_pipeline, sigma_rule, add_fieldname_prefix_transformation): add_fieldname_prefix_transformation.apply(dummy_pipeline, sigma_rule) assert sigma_rule.detection.detections["test"] == SigmaDetection( [ @@ -541,21 +523,19 @@ def test_add_fieldname_prefix_tracking( identifier="test", ) processing_item.apply(dummy_pipeline, sigma_rule) - detection_items = ( - sigma_rule.detection.detections["test"].detection_items[0].detection_items - ) + detection_items = sigma_rule.detection.detections["test"].detection_items[0].detection_items assert detection_items == [ SigmaDetectionItem("test.field1", [], [SigmaString("value1")]), SigmaDetectionItem("field2", [], [SigmaString("value2")]), SigmaDetectionItem("field3", [], [SigmaString("value3")]), ] - assert [ - detection_item.was_processed_by("test") for detection_item in detection_items - ] == [True, False, False] + assert [detection_item.was_processed_by("test") for detection_item in detection_items] == [ + True, + False, + False, + ] assert sigma_rule.was_processed_by("test") - assert processing_item.transformation.pipeline.field_mappings == { - "field1": {"test.field1"} - } + assert processing_item.transformation.pipeline.field_mappings == {"field1": {"test.field1"}} def test_fields_list_mapping_with_detection_item_condition(sigma_rule: SigmaRule): @@ -617,14 +597,10 @@ def test_wildcard_placeholders(dummy_pipeline, sigma_rule_placeholders: SigmaRul def test_wildcard_placeholders_include_and_exclude_error(): with pytest.raises(SigmaConfigurationError, match="exclusively"): - WildcardPlaceholderTransformation( - include=["included_field"], exclude=["excluded_field"] - ) + WildcardPlaceholderTransformation(include=["included_field"], exclude=["excluded_field"]) -def test_wildcard_placeholders_included( - dummy_pipeline, sigma_rule_placeholders: SigmaRule -): +def test_wildcard_placeholders_included(dummy_pipeline, sigma_rule_placeholders: SigmaRule): transformation = WildcardPlaceholderTransformation(include=["var1"]) transformation.set_processing_item( ProcessingItem( @@ -634,9 +610,7 @@ def test_wildcard_placeholders_included( ) transformation.apply(dummy_pipeline, sigma_rule_placeholders) detection_items = ( - sigma_rule_placeholders.detection.detections["test"] - .detection_items[0] - .detection_items + sigma_rule_placeholders.detection.detections["test"].detection_items[0].detection_items ) assert ( detection_items[0].value[0] == SigmaString("value*test") @@ -659,9 +633,7 @@ def test_wildcard_placeholders_included( ) -def test_wildcard_placeholders_excluded( - dummy_pipeline, sigma_rule_placeholders: SigmaRule -): +def test_wildcard_placeholders_excluded(dummy_pipeline, sigma_rule_placeholders: SigmaRule): transformation = WildcardPlaceholderTransformation(exclude=["var2", "var3"]) transformation.set_processing_item( ProcessingItem( @@ -671,9 +643,7 @@ def test_wildcard_placeholders_excluded( ) transformation.apply(dummy_pipeline, sigma_rule_placeholders) detection_items = ( - sigma_rule_placeholders.detection.detections["test"] - .detection_items[0] - .detection_items + sigma_rule_placeholders.detection.detections["test"].detection_items[0].detection_items ) assert ( detection_items[0].value[0] == SigmaString("value*test") @@ -696,9 +666,7 @@ def test_wildcard_placeholders_excluded( ) -def test_wildcard_placeholders_without_placeholders( - dummy_pipeline, sigma_rule: SigmaRule -): +def test_wildcard_placeholders_without_placeholders(dummy_pipeline, sigma_rule: SigmaRule): transformation = WildcardPlaceholderTransformation() transformation.apply(dummy_pipeline, sigma_rule) assert sigma_rule.detection.detections["test"] == SigmaDetection( @@ -718,9 +686,7 @@ def test_valuelist_placeholders(sigma_rule_placeholders_simple: SigmaRule): transformation = ValueListPlaceholderTransformation() pipeline = ProcessingPipeline([], {"var1": ["val1", 123], "var2": "val3*"}) transformation.apply(pipeline, sigma_rule_placeholders_simple) - assert sigma_rule_placeholders_simple.detection.detections[ - "test" - ] == SigmaDetection( + assert sigma_rule_placeholders_simple.detection.detections["test"] == SigmaDetection( [ SigmaDetection( [ @@ -752,9 +718,7 @@ def test_valuelist_placeholders_wrong_type(sigma_rule_placeholders_simple: Sigma transformation.apply(pipeline, sigma_rule_placeholders_simple) -def test_queryexpr_placeholders( - dummy_pipeline, sigma_rule_placeholders_only: SigmaRule -): +def test_queryexpr_placeholders(dummy_pipeline, sigma_rule_placeholders_only: SigmaRule): expr = "{field} lookup {id}" transformation = QueryExpressionPlaceholderTransformation( expression=expr, mapping={"var2": "placeholder2"} @@ -788,9 +752,7 @@ def test_queryexpr_placeholders( ) -def test_queryexpr_placeholders_without_placeholders( - dummy_pipeline, sigma_rule: SigmaRule -): +def test_queryexpr_placeholders_without_placeholders(dummy_pipeline, sigma_rule: SigmaRule): transformation = QueryExpressionPlaceholderTransformation( expression="{field} lookup {id}", ) @@ -808,9 +770,7 @@ def test_queryexpr_placeholders_without_placeholders( ) -def test_queryexpr_placeholders_mixed_string( - dummy_pipeline, sigma_rule_placeholders: SigmaRule -): +def test_queryexpr_placeholders_mixed_string(dummy_pipeline, sigma_rule_placeholders: SigmaRule): transformation = QueryExpressionPlaceholderTransformation( expression="{field} lookup {id}", ) @@ -844,9 +804,7 @@ def test_conditiontransformation_tracking_change(dummy_pipeline, sigma_rule: Sig ) and sigma_rule.was_processed_by("test") -def test_conditiontransformation_tracking_nochange( - dummy_pipeline, sigma_rule: SigmaRule -): +def test_conditiontransformation_tracking_nochange(dummy_pipeline, sigma_rule: SigmaRule): transformation = DummyConditionTransformation(False) transformation.set_processing_item( ProcessingItem( @@ -887,16 +845,12 @@ def test_addconditiontransformation(dummy_pipeline, sigma_rule: SigmaRule): SigmaDetectionItem("newfield1", [], [SigmaString("test")]), SigmaDetectionItem("newfield2", [], [SigmaNumber(123)]), SigmaDetectionItem("newfield3", [], [SigmaString("$category")]), - SigmaDetectionItem( - "listfield", [], [SigmaString("value1"), SigmaString("value2")] - ), + SigmaDetectionItem("listfield", [], [SigmaString("value1"), SigmaString("value2")]), ] ) and all( # detection items are marked as processed by processing item detection_item.was_processed_by("test") - for detection_item in sigma_rule.detection.detections[ - "additional" - ].detection_items + for detection_item in sigma_rule.detection.detections["additional"].detection_items ) and sigma_rule.was_processed_by("test") ) @@ -927,16 +881,12 @@ def test_addconditiontransformation_template(dummy_pipeline, sigma_rule: SigmaRu [ # additional detection item referred by condition SigmaDetectionItem("newfield1", [], [SigmaString("test")]), SigmaDetectionItem("newfield2", [], [SigmaString("$something")]), - SigmaDetectionItem( - "listfield", [], [SigmaString("test"), SigmaString("value")] - ), + SigmaDetectionItem("listfield", [], [SigmaString("test"), SigmaString("value")]), ] ) and all( # detection items are marked as processed by processing item detection_item.was_processed_by("test") - for detection_item in sigma_rule.detection.detections[ - "additional" - ].detection_items + for detection_item in sigma_rule.detection.detections["additional"].detection_items ) and sigma_rule.was_processed_by("test") ) @@ -1008,9 +958,7 @@ def test_replace_string_wildcard(dummy_pipeline): def test_replace_string_invalid(): - with pytest.raises( - SigmaRegularExpressionError, match="Regular expression.*invalid" - ): + with pytest.raises(SigmaRegularExpressionError, match="Regular expression.*invalid"): ReplaceStringTransformation("*", "test") diff --git a/tests/test_rule.py b/tests/test_rule.py index 675a5aa1..c7e16a1f 100644 --- a/tests/test_rule.py +++ b/tests/test_rule.py @@ -55,9 +55,7 @@ def test_sigmaruletag_fromstr(): def test_sigmaruletag_fromstr_nodot(): - with pytest.raises( - sigma_exceptions.SigmaValueError, match="must start with namespace" - ): + with pytest.raises(sigma_exceptions.SigmaValueError, match="must start with namespace"): SigmaRuleTag.from_str("tag") @@ -112,12 +110,8 @@ def test_sigmalogsource_fromdict_no_service(): def test_sigmalogsource_empty(): - with pytest.raises( - sigma_exceptions.SigmaLogsourceError, match="can't be empty.*test.yml" - ): - SigmaLogSource( - None, None, None, source=sigma_exceptions.SigmaRuleLocation("test.yml") - ) + with pytest.raises(sigma_exceptions.SigmaLogsourceError, match="can't be empty.*test.yml"): + SigmaLogSource(None, None, None, source=sigma_exceptions.SigmaRuleLocation("test.yml")) def test_sigmalogsource_eq(): @@ -214,9 +208,10 @@ def test_sigmadetectionitem_keyword_list(): def test_sigmadetectionitem_keyword_list_to_plain(): """Keyword list detection.""" - assert SigmaDetectionItem( - None, [], [SigmaString("string"), SigmaNumber(123)] - ).to_plain() == ["string", 123] + assert SigmaDetectionItem(None, [], [SigmaString("string"), SigmaNumber(123)]).to_plain() == [ + "string", + 123, + ] def test_sigmadetectionitem_keyword_modifiers(): @@ -239,9 +234,7 @@ def test_sigmadetectionitem_keyword_modifiers_to_plain(): def test_sigmadetectionitem_unknown_modifier(): """Keyword detection with modifier chain.""" - with pytest.raises( - sigma_exceptions.SigmaModifierError, match="Unknown modifier.*test.yml" - ): + with pytest.raises(sigma_exceptions.SigmaModifierError, match="Unknown modifier.*test.yml"): SigmaDetectionItem.from_mapping( "|foobar", "foobar", source=sigma_exceptions.SigmaRuleLocation("test.yml") ) @@ -256,9 +249,7 @@ def test_sigmadetectionitem_key_value_single_string(): def test_sigmadetectionitem_key_value_single_string_to_plain(): """Key-value detection with one value.""" - assert SigmaDetectionItem("key", [], [SigmaString("value")]).to_plain() == { - "key": "value" - } + assert SigmaDetectionItem("key", [], [SigmaString("value")]).to_plain() == {"key": "value"} def test_sigmadetectionitem_key_value_single_string_modifier_to_plain(): @@ -266,9 +257,7 @@ def test_sigmadetectionitem_key_value_single_string_modifier_to_plain(): Key-value detection with one value and contains modifier converted to plain. Important: the original value should appear instead of the modified. """ - detection_item = SigmaDetectionItem( - "key", [SigmaContainsModifier], [SigmaString("value")] - ) + detection_item = SigmaDetectionItem("key", [SigmaContainsModifier], [SigmaString("value")]) detection_item.apply_modifiers() assert detection_item.to_plain() == {"key|contains": "value"} @@ -313,9 +302,9 @@ def test_sigmadetectionitem_key_value_single_regexp_to_plain(): def test_sigmadetectionitem_key_value_list(): """Key-value detection with value list.""" - assert SigmaDetectionItem.from_mapping( - "key", ["string", 123] - ) == SigmaDetectionItem("key", [], [SigmaString("string"), SigmaNumber(123)]) + assert SigmaDetectionItem.from_mapping("key", ["string", 123]) == SigmaDetectionItem( + "key", [], [SigmaString("string"), SigmaNumber(123)] + ) def test_sigmadetectionitem_key_value_list_to_plain(): @@ -355,9 +344,7 @@ def test_sigmadetectionitem_key_value_modifiers_to_plain(): def test_sigmadetectionitem_key_value_modifiers_invalid_re(): """Invalid regular expression modifier chain.""" - with pytest.raises( - sigma_exceptions.SigmaValueError, match="only applicable.*test.yml" - ): + with pytest.raises(sigma_exceptions.SigmaValueError, match="only applicable.*test.yml"): SigmaDetectionItem.from_mapping( "key|base64|re", "value", @@ -366,9 +353,7 @@ def test_sigmadetectionitem_key_value_modifiers_invalid_re(): def test_sigmadetectionitem_fromvalue(): - SigmaDetectionItem.from_value("test") == SigmaDetectionItem( - None, [], [SigmaString("test")] - ) + SigmaDetectionItem.from_value("test") == SigmaDetectionItem(None, [], [SigmaString("test")]) def test_sigmadetectionitem_processing_item_tracking(processing_item): @@ -397,12 +382,8 @@ def test_sigmadetection_detections(): assert ( SigmaDetection( [ - SigmaDetection( - [SigmaDetectionItem("key_1", [], [SigmaString("value_1")])] - ), - SigmaDetection( - [SigmaDetectionItem("key_2", [], [SigmaString("value_2")])] - ), + SigmaDetection([SigmaDetectionItem("key_1", [], [SigmaString("value_1")])]), + SigmaDetection([SigmaDetectionItem("key_2", [], [SigmaString("value_2")])]), ] ).item_linking == ConditionOR @@ -478,9 +459,7 @@ def test_sigmadetections_fromdict(): ), "test_list_of_maps": SigmaDetection( [ - SigmaDetection( - [SigmaDetectionItem("key1", [], [SigmaString("value1")])] - ), + SigmaDetection([SigmaDetectionItem("key1", [], [SigmaString("value1")])]), SigmaDetection([SigmaDetectionItem("key2", [], [SigmaNumber(2)])]), ] ), @@ -502,9 +481,7 @@ def test_sigmadetections_fromdict(): def test_sigmadetections_to_dict_single_condition(): assert SigmaDetections( detections={ - "test": SigmaDetection( - [SigmaDetectionItem("field", [], SigmaString("value"))] - ) + "test": SigmaDetection([SigmaDetectionItem("field", [], SigmaString("value"))]) }, condition=["test"], ).to_dict() == {"test": {"field": "value"}, "condition": "test"} @@ -513,9 +490,7 @@ def test_sigmadetections_to_dict_single_condition(): def test_sigmadetections_to_dict_single_condition(): assert SigmaDetections( detections={ - "test": SigmaDetection( - [SigmaDetectionItem("field", [], SigmaString("value"))] - ) + "test": SigmaDetection([SigmaDetectionItem("field", [], SigmaString("value"))]) }, condition=["test", "all of them"], ).to_dict() == {"test": {"field": "value"}, "condition": ["test", "all of them"]} @@ -544,9 +519,7 @@ def test_sigmadetections_index(): def test_sigmadetections_fromdict_no_detections(): - with pytest.raises( - sigma_exceptions.SigmaDetectionError, match="No detections.*test.yml" - ): + with pytest.raises(sigma_exceptions.SigmaDetectionError, match="No detections.*test.yml"): SigmaDetections.from_dict( {"condition": ["selection"]}, sigma_exceptions.SigmaRuleLocation("test.yml") ) @@ -685,12 +658,8 @@ def test_sigmadetection_multi_dict_to_plain_all_key_collision_list_values(): """Two field names exist in distinct detection items and have to be merged.""" assert SigmaDetection( detection_items=[ - SigmaDetectionItem( - "field|all", [], [SigmaString("value1"), SigmaString("value2")] - ), - SigmaDetectionItem( - "field|all", [], [SigmaString("value3"), SigmaString("value4")] - ), + SigmaDetectionItem("field|all", [], [SigmaString("value1"), SigmaString("value2")]), + SigmaDetectionItem("field|all", [], [SigmaString("value3"), SigmaString("value4")]), ] ).to_plain() == {"field|all": ["value1", "value2", "value3", "value4"]} @@ -699,9 +668,7 @@ def test_sigmadetection_multi_dict_to_plain_key_collision_all(): assert SigmaDetection( detection_items=[ SigmaDetectionItem("field", [], SigmaString("value1")), - SigmaDetectionItem( - "field|all", [], [SigmaString("value2"), SigmaString("value3")] - ), + SigmaDetectionItem("field|all", [], [SigmaString("value2"), SigmaString("value3")]), ] ).to_plain() == { "field": "value1", @@ -720,12 +687,8 @@ def test_sigmadetection_multi_dict_to_plain_key_collision_lists(): ): SigmaDetection( detection_items=[ - SigmaDetectionItem( - "field", [], [SigmaString("value1"), SigmaString("value2")] - ), - SigmaDetectionItem( - "field", [], [SigmaString("value3"), SigmaString("value4")] - ), + SigmaDetectionItem("field", [], [SigmaString("value1"), SigmaString("value2")]), + SigmaDetectionItem("field", [], [SigmaString("value3"), SigmaString("value4")]), ], source=sigma_exceptions.SigmaRuleLocation("test.yml"), ).to_plain() @@ -734,9 +697,7 @@ def test_sigmadetection_multi_dict_to_plain_key_collision_lists(): def test_sigmadetection_multi_dict_to_plain_key_collision_all_2single(): assert SigmaDetection( detection_items=[ - SigmaDetectionItem( - "field|all", [], [SigmaString("value1"), SigmaString("value2")] - ), + SigmaDetectionItem("field|all", [], [SigmaString("value1"), SigmaString("value2")]), SigmaDetectionItem("field", [], [SigmaString("value3")]), SigmaDetectionItem("field", [], [SigmaString("value4")]), ] @@ -754,20 +715,14 @@ def test_sigmadetection_lists_and_plain_to_plain(): assert SigmaDetection( detection_items=[ SigmaDetectionItem(None, [], SigmaString("value1")), - SigmaDetectionItem( - None, [], [SigmaString("value2"), SigmaString("value3")] - ), - SigmaDetectionItem( - None, [], [SigmaString("value4"), SigmaString("value5")] - ), + SigmaDetectionItem(None, [], [SigmaString("value2"), SigmaString("value3")]), + SigmaDetectionItem(None, [], [SigmaString("value4"), SigmaString("value5")]), ] ).to_plain() == ["value1", "value2", "value3", "value4", "value5"] def test_sigmadetection_dict_and_keyword_to_plain(): - with pytest.raises( - sigma_exceptions.SigmaValueError, match="Can't convert detection.*test.yml" - ): + with pytest.raises(sigma_exceptions.SigmaValueError, match="Can't convert detection.*test.yml"): SigmaDetection( detection_items=[ SigmaDetectionItem("field", [], SigmaString("value")), @@ -781,9 +736,7 @@ def test_sigmadetection_dict_and_keyword_to_plain(): def test_sigmarule_bad_uuid(): - with pytest.raises( - sigma_exceptions.SigmaIdentifierError, match="must be an UUID.*test.yml" - ): + with pytest.raises(sigma_exceptions.SigmaIdentifierError, match="must be an UUID.*test.yml"): SigmaRule.from_dict( {"id": "no-uuid"}, source=sigma_exceptions.SigmaRuleLocation("test.yml") ) @@ -793,9 +746,7 @@ def test_sigmarule_bad_level(): with pytest.raises( sigma_exceptions.SigmaLevelError, match="no valid Sigma rule level.*test.yml" ): - SigmaRule.from_dict( - {"level": "bad"}, source=sigma_exceptions.SigmaRuleLocation("test.yml") - ) + SigmaRule.from_dict({"level": "bad"}, source=sigma_exceptions.SigmaRuleLocation("test.yml")) def test_sigmarule_bad_status(): @@ -809,9 +760,7 @@ def test_sigmarule_bad_status(): def test_sigmarule_bad_date(): with pytest.raises(sigma_exceptions.SigmaDateError, match="Rule date.*test.yml"): - SigmaRule.from_dict( - {"date": "bad"}, source=sigma_exceptions.SigmaRuleLocation("test.yml") - ) + SigmaRule.from_dict({"date": "bad"}, source=sigma_exceptions.SigmaRuleLocation("test.yml")) def test_sigmarule_date(): @@ -1139,9 +1088,7 @@ def test_sigmarule_to_dict(sigma_rule: SigmaRule): def test_empty_detection(): - with pytest.raises( - sigma_exceptions.SigmaDetectionError, match="Detection is empty.*test.yml" - ): + with pytest.raises(sigma_exceptions.SigmaDetectionError, match="Detection is empty.*test.yml"): SigmaDetection([], sigma_exceptions.SigmaRuleLocation("test.yml")) diff --git a/tests/test_types.py b/tests/test_types.py index be7bf576..e875cc21 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -158,9 +158,9 @@ def test_string_placeholders_replace(): def callback(p): yield from ["A", SpecialChars.WILDCARD_MULTI] - assert SigmaString( - "test%var1%something%var2%end" - ).insert_placeholders().replace_placeholders(callback) == [ + assert SigmaString("test%var1%something%var2%end").insert_placeholders().replace_placeholders( + callback + ) == [ SigmaString("testAsomethingAend"), SigmaString("testAsomething*end"), SigmaString("test*somethingAend"), @@ -169,9 +169,11 @@ def callback(p): def test_string_placeholders_escape(): - assert SigmaString( - "\\%test1\\%test2\\%%var%\\%test3\\%" - ).insert_placeholders().s == ("%test1%test2%", Placeholder("var"), "%test3%") + assert SigmaString("\\%test1\\%test2\\%%var%\\%test3\\%").insert_placeholders().s == ( + "%test1%test2%", + Placeholder("var"), + "%test3%", + ) def test_string_contains_placeholders(): @@ -179,9 +181,7 @@ def test_string_contains_placeholders(): def test_string_contains_placeholders_none(): - assert ( - SigmaString("test1test2").insert_placeholders().contains_placeholder() == False - ) + assert SigmaString("test1test2").insert_placeholders().contains_placeholder() == False def test_string_contains_placeholders_included(): @@ -338,10 +338,7 @@ def test_strings_iter(): def test_strings_convert(): - assert ( - SigmaString("foo?\\*bar*").convert(add_escaped="f", filter_chars="o") - == "\\f?\\*bar*" - ) + assert SigmaString("foo?\\*bar*").convert(add_escaped="f", filter_chars="o") == "\\f?\\*bar*" def test_strings_convert_no_multiwildcard(): @@ -425,9 +422,7 @@ def test_string_index_slice_with_step(sigma_string): def test_cased_string(sigma_string): - assert SigmaCasedString.from_sigma_string(sigma_string) == SigmaCasedString( - "*Test*Str\\*ing*" - ) + assert SigmaCasedString.from_sigma_string(sigma_string) == SigmaCasedString("*Test*Str\\*ing*") def test_number_int(): @@ -563,17 +558,11 @@ def test_conversion_none(): def test_query_expression(): - assert ( - str(SigmaQueryExpression("test\\test*test?test[]", "id")) - == "test\\test*test?test[]" - ) + assert str(SigmaQueryExpression("test\\test*test?test[]", "id")) == "test\\test*test?test[]" def test_query_expression_finalize(): - assert ( - SigmaQueryExpression("{field} in list({id})", "id").finalize("xxx") - == "xxx in list(id)" - ) + assert SigmaQueryExpression("{field} in list({id})", "id").finalize("xxx") == "xxx in list(id)" def test_query_expression_finalize_nofield_error(): @@ -582,9 +571,7 @@ def test_query_expression_finalize_nofield_error(): def test_query_expression_to_plain(): - with pytest.raises( - SigmaValueError, match="can't be converted into a plain representation" - ): + with pytest.raises(SigmaValueError, match="can't be converted into a plain representation"): SigmaQueryExpression("test", "id").to_plain() @@ -609,9 +596,7 @@ def test_cidr_ipv6(): def test_cidr_to_plain(): - with pytest.raises( - SigmaValueError, match="can't be converted into a plain representation" - ): + with pytest.raises(SigmaValueError, match="can't be converted into a plain representation"): SigmaCIDRExpression("192.168.1.0/24").to_plain() @@ -668,9 +653,7 @@ def test_cidr_expand_ipv6_0(): def test_cidr_expand_ipv6_56(): - assert SigmaCIDRExpression("1234:5678:0:ab00::/56").expand(wildcard="*") == [ - "1234:5678:0:ab*" - ] + assert SigmaCIDRExpression("1234:5678:0:ab00::/56").expand(wildcard="*") == ["1234:5678:0:ab*"] def test_cidr_expand_ipv6_58(): @@ -683,9 +666,7 @@ def test_cidr_expand_ipv6_58(): def test_cidr_expand_ipv6_60(): - assert SigmaCIDRExpression("1234:5678:0:ab00::/60").expand(wildcard="*") == [ - "1234:5678:0:ab0*" - ] + assert SigmaCIDRExpression("1234:5678:0:ab00::/60").expand(wildcard="*") == ["1234:5678:0:ab0*"] def test_cidr_expand_ipv6_64(): @@ -704,9 +685,7 @@ def test_cidr_invalid(): def test_compare_to_plain(): - with pytest.raises( - SigmaValueError, match="can't be converted into a plain representation" - ): + with pytest.raises(SigmaValueError, match="can't be converted into a plain representation"): SigmaCompareExpression( SigmaNumber(123), SigmaCompareExpression.CompareOperators.LTE ).to_plain() diff --git a/tests/test_validation.py b/tests/test_validation.py index aec5ea9f..af2c949f 100644 --- a/tests/test_validation.py +++ b/tests/test_validation.py @@ -21,13 +21,9 @@ def validators(): return InstalledSigmaPlugins.autodiscover().validators -def test_sigmavalidator_validate_rules( - rule_with_id, rule_without_id, rules_with_id_collision -): +def test_sigmavalidator_validate_rules(rule_with_id, rule_without_id, rules_with_id_collision): rules = SigmaCollection([rule_with_id, rule_without_id, *rules_with_id_collision]) - validator = SigmaValidator( - {IdentifierExistenceValidator, IdentifierUniquenessValidator} - ) + validator = SigmaValidator({IdentifierExistenceValidator, IdentifierUniquenessValidator}) issues = validator.validate_rules(rules) assert issues == [ IdentifierExistenceIssue([rule_without_id]), @@ -37,9 +33,7 @@ def test_sigmavalidator_validate_rules( ] -def test_sigmavalidator_exclusions( - rule_with_id, rule_without_id, rules_with_id_collision -): +def test_sigmavalidator_exclusions(rule_with_id, rule_without_id, rules_with_id_collision): rules = SigmaCollection([rule_with_id, rule_without_id, *rules_with_id_collision]) exclusions = { UUID("32532a0b-e56c-47c9-bcbb-3d88bd670c37"): {IdentifierUniquenessValidator}, @@ -127,9 +121,7 @@ def test_sigmavalidator_fromdict_explicit_validator(validators): def test_sigmavalidator_fromdict_remove_nonexisting(validators): - with pytest.raises( - SigmaConfigurationError, match="Attempting to remove.*identifier_existence" - ): + with pytest.raises(SigmaConfigurationError, match="Attempting to remove.*identifier_existence"): SigmaValidator.from_dict( { "validators": [ @@ -142,9 +134,7 @@ def test_sigmavalidator_fromdict_remove_nonexisting(validators): def test_sigmavalidator_fromdict_unknown_validator_in_validators(validators): - with pytest.raises( - SigmaConfigurationError, match="Unknown validator 'non_existing'" - ): + with pytest.raises(SigmaConfigurationError, match="Unknown validator 'non_existing'"): SigmaValidator.from_dict( { "validators": [ @@ -156,9 +146,7 @@ def test_sigmavalidator_fromdict_unknown_validator_in_validators(validators): def test_sigmavalidator_fromdict_unknown_validator_in_exclusions(validators): - with pytest.raises( - SigmaConfigurationError, match="Unknown validator 'non_existing'" - ): + with pytest.raises(SigmaConfigurationError, match="Unknown validator 'non_existing'"): SigmaValidator.from_dict( { "validators": [ diff --git a/tests/test_validators.py b/tests/test_validators.py index febfa7d6..e8c40b4a 100644 --- a/tests/test_validators.py +++ b/tests/test_validators.py @@ -114,8 +114,7 @@ def rules_with_id_collision(): def test_validator_identifier_existence(rule_without_id): validator = IdentifierExistenceValidator() assert ( - validator.validate(rule_without_id) - == [IdentifierExistenceIssue([rule_without_id])] + validator.validate(rule_without_id) == [IdentifierExistenceIssue([rule_without_id])] and validator.finalize() == [] ) @@ -292,9 +291,7 @@ def test_validator_double_wildcard(): condition: sel """ ) - assert validator.validate(rule) == [ - DoubleWildcardIssue([rule], SigmaString("te**st")) - ] + assert validator.validate(rule) == [DoubleWildcardIssue([rule], SigmaString("te**st"))] def test_validator_double_wildcard_valid(): @@ -348,9 +345,7 @@ def test_validator_control_characters(): condition: sel """ ) - assert validator.validate(rule) == [ - ControlCharacterIssue([rule], SigmaString("\temp")) - ] + assert validator.validate(rule) == [ControlCharacterIssue([rule], SigmaString("\temp"))] def test_validator_wildcards_instead_of_contains(): @@ -492,9 +487,7 @@ def test_validator_all_without_contains(): assert validator.validate(rule) == [ AllWithoutContainsModifierIssue( [rule], - SigmaDetectionItem( - "field", [SigmaAllModifier], ["value1", "value2", "value3"] - ), + SigmaDetectionItem("field", [SigmaAllModifier], ["value1", "value2", "value3"]), ) ] @@ -744,9 +737,7 @@ def test_validator_duplicate_tags(): - attack.s0005 """ ) - assert validator.validate(rule) == [ - DuplicateTagIssue([rule], SigmaRuleTag("attack", "g0001")) - ] + assert validator.validate(rule) == [DuplicateTagIssue([rule], SigmaRuleTag("attack", "g0001"))] def test_validator_sysmon_insteadof_generic_logsource(): diff --git a/tools/update-mitre_attack.py b/tools/update-mitre_attack.py index 88e89284..650c84a7 100644 --- a/tools/update-mitre_attack.py +++ b/tools/update-mitre_attack.py @@ -40,9 +40,7 @@ def get_attack_id(refs): for stix_file in args.stix: stix = json.load(stix_file) for obj in stix["objects"]: # iterate over all STIX objects - if not ( - obj.get("revoked") or obj.get("x_mitre_deprecated") - ): # ignore deprecated items + if not (obj.get("revoked") or obj.get("x_mitre_deprecated")): # ignore deprecated items if (obj_type := obj.get("type")) is not None: if obj_type == "x-mitre-tactic": # Tactic tactic_id = get_attack_id(obj["external_references"]) @@ -73,13 +71,11 @@ def get_attack_id(refs): print("from typing import Dict, List", file=args.output) print(f'mitre_attack_version: str = "{ attack_version }"', file=args.output) print( - "mitre_attack_tactics: Dict[str, str] = " - + pformat(tactics, indent=4, sort_dicts=True), + "mitre_attack_tactics: Dict[str, str] = " + pformat(tactics, indent=4, sort_dicts=True), file=args.output, ) print( - "mitre_attack_techniques: Dict[str, str] = " - + pformat(techniques, indent=4, sort_dicts=True), + "mitre_attack_techniques: Dict[str, str] = " + pformat(techniques, indent=4, sort_dicts=True), file=args.output, ) print( @@ -93,7 +89,6 @@ def get_attack_id(refs): file=args.output, ) print( - "mitre_attack_software: Dict[str, str] = " - + pformat(software, indent=4, sort_dicts=True), + "mitre_attack_software: Dict[str, str] = " + pformat(software, indent=4, sort_dicts=True), file=args.output, )