diff --git a/README.md b/README.md index b2d9b30..ce77b35 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,12 @@ sigma plugin install logpoint pip3 install pysigma-backend-logpoint ``` +You can verify that the backend is registered with Sigma CLI via: + +```bash +sigma plugin list --plugin-type backend +``` + #### Converting Sigma Rules Once the packages are successfully installed, you can convert Sigma rules into Logpoint queries using the command below. For example, to convert the [Suspicious Process Masquerading As SvcHost.EXE](https://github.com/SigmaHQ/sigma/blob/598d29f811c1859ba18e05b8c419cc94410c9a55/rules/windows/process_creation/proc_creation_win_svchost_masqueraded_execution.yml) @@ -61,7 +67,7 @@ sigma convert -t logpoint -p logpoint_windows rules/windows/process_creation/pro ╭─ubuntu@ubuntu ╰─$ sigma convert -t logpoint -p logpoint_windows rules/windows/process_creation/proc_creation_win_svchost_masqueraded_execution.yml Parsing Sigma rules [####################################] 100% -label="Create" label="Process" "process"="*\svchost.exe" - ("process" IN ["C:\Windows\System32\svchost.exe", "C:\Windows\SysWOW64\svchost.exe"] OR file="svchost.exe") +label="Create" label="Process" "process"="*\svchost.exe" -("process" IN ["C:\Windows\System32\svchost.exe", "C:\Windows\SysWOW64\svchost.exe"] OR file="svchost.exe") ``` ## Limitations and Constraints diff --git a/examples/logpoint_conversion_samples.py b/examples/logpoint_conversion_samples.py new file mode 100644 index 0000000..e17d67c --- /dev/null +++ b/examples/logpoint_conversion_samples.py @@ -0,0 +1,183 @@ +from textwrap import dedent + +from sigma.backends.logpoint import Logpoint +from sigma.collection import SigmaCollection + + +SAMPLE_RULES = [ + ( + "Single selection", + """ + title: Simple equality + status: test + logsource: + product: test_product + category: test_category + detection: + sel: + fieldA: valueA + condition: sel + """, + ), + ( + "OR combination", + """ + title: OR combination + status: test + logsource: + product: test_product + category: test_category + detection: + sel1: + fieldA: valueA + sel2: + fieldB: valueB + condition: 1 of sel* + """, + ), + ( + "AND with lists", + """ + title: List expansion + status: test + logsource: + product: test_product + category: test_category + detection: + sel: + fieldA: + - valueA1 + - valueA2 + fieldB: + - valueB1 + - valueB2 + condition: sel + """, + ), + ( + "Field name with whitespace", + """ + title: Whitespace field + status: test + logsource: + product: test_product + category: test_category + detection: + sel: + field name: value + condition: sel + """, + ), + ( + "Compact NOT", + """ + title: Compact NOT + status: test + logsource: + product: test_product + category: test_category + detection: + sel1: + fieldA: valueA + sel2: + fieldB: valueB + condition: not (sel1 or sel2) + """, + ), + ( + "Null filter", + """ + title: Null filter + status: test + logsource: + product: test_product + category: test_category + detection: + filter: + fieldA: null + condition: not filter + """, + ), + ( + "Endswith and null filters", + """ + title: Endswith null filters + status: test + logsource: + product: test_product + category: test_category + detection: + selection: + FieldA|endswith: 'valueA' + filter_1: + FieldB: null + filter_2: + FieldB: '' + condition: selection and not filter_1 and not filter_2 + """, + ), + ( + "CIDR", + """ + title: CIDR filter + status: test + logsource: + product: test_product + category: test_category + detection: + sel: + field|cidr: + - 192.168.0.0/16 + - 10.0.0.0/8 + fieldB: foo + fieldC: bar + condition: sel + """, + ), + ( + "Regex", + """ + title: Regex test + status: test + logsource: + product: test_product + category: test_category + detection: + sel: + fieldA|re: foo.*bar + fieldB: foo + condition: sel + """, + ), + ( + "Contains all", + """ + title: Contains all + status: test + logsource: + product: test_product + category: test_category + detection: + sel: + fieldA|contains|all: + - valueA + - valueB + condition: sel + """, + ), +] + + +def convert_samples() -> None: + backend = Logpoint() + for index, (title, yaml_rule) in enumerate(SAMPLE_RULES, start=1): + collection = SigmaCollection.from_yaml(dedent(yaml_rule)) + conversions = backend.convert(collection) + print(f"{index}. {title}:") + for conversion in conversions: + print(f" {conversion}") + print() + + +if __name__ == "__main__": + convert_samples() diff --git a/pyproject.toml b/pyproject.toml index 54e0f8c..46541f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,14 @@ pytest-cov = "^5.0.0" coverage = "^7.5.3" defusedxml = "^0.7.1" +[tool.poetry.plugins."sigma.backends"] +logpoint = "sigma.backends.logpoint:backends" + +[tool.poetry.plugins."sigma.pipelines"] +logpoint_windows = "sigma.pipelines.logpoint:pipelines" +logpoint_o365 = "sigma.pipelines.logpoint:pipelines" +logpoint_azure = "sigma.pipelines.logpoint:pipelines" + [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" diff --git a/sigma/backends/logpoint/logpoint.py b/sigma/backends/logpoint/logpoint.py index 3a2552c..49cb269 100644 --- a/sigma/backends/logpoint/logpoint.py +++ b/sigma/backends/logpoint/logpoint.py @@ -11,6 +11,7 @@ ) from sigma.conversion.base import TextQueryBackend from sigma.conversion.deferred import ( + DeferredQueryExpression, DeferredTextQueryExpression, ) from sigma.conversion.state import ConversionState @@ -271,3 +272,23 @@ def convert_condition(self, cond: ConditionType, state: ConversionState) -> Any: elif isinstance(cond, ConditionFieldEqualsValueExpression): self.modify_condition_from_json_value_construction(cond) return super().convert_condition(cond, state) + + def convert_condition_not( + self, cond: ConditionNOT, state: ConversionState + ) -> Union[str, DeferredQueryExpression]: + """Conversion of NOT conditions.""" + arg = cond.args[0] + try: + if ( + arg.__class__ in self.precedence + ): # group if AND or OR condition is negated + grouped = self.convert_condition_group(arg, state) + return f"{self.not_token}{grouped.lstrip()}" + expr = self.convert_condition(arg, state) + if isinstance( + expr, DeferredQueryExpression + ): # negate deferred expression and pass it to parent + return expr.negate() + return f"{self.not_token}{str(expr).lstrip()}" # convert negated expression to string + except TypeError: # pragma: no cover + raise NotImplementedError("Operator 'not' not supported by the backend") diff --git a/tests/test_backend_logpoint.py b/tests/test_backend_logpoint.py index a058fe9..b7c7b44 100644 --- a/tests/test_backend_logpoint.py +++ b/tests/test_backend_logpoint.py @@ -205,9 +205,7 @@ def test_logpoint_not_filter_null_and(logpoint_backend: Logpoint): """ ) - assert logpoint_backend.convert(rule) == [ - 'FieldA="*valueA" - FieldB!=* - FieldB=""' - ] + assert logpoint_backend.convert(rule) == ['FieldA="*valueA" -FieldB!=* -FieldB=""'] def test_logpoint_filter_null_and(logpoint_backend: Logpoint): @@ -229,7 +227,7 @@ def test_logpoint_filter_null_and(logpoint_backend: Logpoint): """ ) - assert logpoint_backend.convert(rule) == ['FieldA="*valueA" FieldB!=* - FieldB=""'] + assert logpoint_backend.convert(rule) == ['FieldA="*valueA" FieldB!=* -FieldB=""'] def test_logpoint_not_filter_null_or(logpoint_backend: Logpoint): @@ -252,7 +250,7 @@ def test_logpoint_not_filter_null_or(logpoint_backend: Logpoint): ) assert logpoint_backend.convert(rule) == [ - 'FieldA="*valueA" - FieldB!=* OR - FieldB=""' + 'FieldA="*valueA" -FieldB!=* OR -FieldB=""' ] @@ -276,7 +274,7 @@ def test_logpoint_filter_null_or(logpoint_backend: Logpoint): ) assert logpoint_backend.convert(rule) == [ - 'FieldA="*valueA" FieldB!=* OR - FieldB=""' + 'FieldA="*valueA" FieldB!=* OR -FieldB=""' ] @@ -300,7 +298,7 @@ def test_logpoint_filter_not_or_null(logpoint_backend: Logpoint): ) assert logpoint_backend.convert(rule) == [ - 'FieldA="*valueA" - (FieldB!=* OR FieldB="")' + 'FieldA="*valueA" -(FieldB!=* OR FieldB="")' ] @@ -319,7 +317,50 @@ def test_logpoint_filter_not(logpoint_backend: Logpoint): """ ) - assert logpoint_backend.convert(rule) == ["- Field!=*"] + assert logpoint_backend.convert(rule) == ["-Field!=*"] + + +def test_logpoint_compact_not_grouping(logpoint_backend: Logpoint): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel1: + fieldA: valueA + sel2: + fieldB: valueB + condition: not (sel1 or sel2) + """ + ) + + assert logpoint_backend.convert(rule) == ['-(fieldA="valueA" OR fieldB="valueB")'] + + +def test_logpoint_not_spacing(logpoint_backend: Logpoint): + rule = SigmaCollection.from_yaml( + """ + title: Test NOT spacing + status: test + logsource: + category: test_category + product: test_product + detection: + sel1: + fieldA: valueA + sel2: + fieldB: valueB + condition: not (sel1 or sel2) + """ + ) + + converted = logpoint_backend.convert(rule)[0] + + assert "- (" not in converted + assert converted.startswith("-(") def test_logpoint_angle_brackets(logpoint_backend: Logpoint): diff --git a/tests/test_pipelines_windows.py b/tests/test_pipelines_windows.py index fa8c490..b85ad9b 100644 --- a/tests/test_pipelines_windows.py +++ b/tests/test_pipelines_windows.py @@ -161,7 +161,7 @@ def test_logpoint_windows_variable_mapping_registry_event(): ) ) == [ - 'event_id IN [12, 13, 14] norm_id="WindowsSysmon" target_object="*System\CurrentControlSet\Services\VSS*" "process"="*esentutl.exe" - target_object="*System\CurrentControlSet\Services\VSS\Start*"' + 'event_id IN [12, 13, 14] norm_id="WindowsSysmon" target_object="*System\CurrentControlSet\Services\VSS*" "process"="*esentutl.exe" -target_object="*System\CurrentControlSet\Services\VSS\Start*"' ] )